1 | package com.bowman.cardserv;
|
---|
2 |
|
---|
3 | import com.bowman.cardserv.crypto.DESUtil;
|
---|
4 | import com.bowman.cardserv.interfaces.*;
|
---|
5 | import com.bowman.cardserv.session.CacheDummySession;
|
---|
6 | import com.bowman.cardserv.util.*;
|
---|
7 | import com.bowman.cardserv.web.FileFetcher;
|
---|
8 |
|
---|
9 | import java.io.*;
|
---|
10 | import java.net.*;
|
---|
11 | import java.util.*;
|
---|
12 |
|
---|
13 | /**
|
---|
14 | * Created by IntelliJ IDEA.
|
---|
15 | * User: bowman
|
---|
16 | * Date: 2011-07-08
|
---|
17 | * Time: 04:01
|
---|
18 | */
|
---|
19 | public class HttpCacheForwarder implements GHttpConstants, CacheForwarder {
|
---|
20 |
|
---|
21 | private static final int RETRY_WAIT = 2000;
|
---|
22 | private static final int RECORD_SIZE = 28, MAX_QSIZE = 3000, INSTANCE_MAXAGE = 60 * 20 * 1000;
|
---|
23 | private static final int CONNECT_TIMEOUT = 10 * 1000;
|
---|
24 | private static final String CRLF = "" + (char)0x0D + (char)0x0A;
|
---|
25 |
|
---|
26 | private FeederThread[] threads = new FeederThread[2];
|
---|
27 | private TimedAverageList latency = new TimedAverageList(ServiceCacheEntry.WINDOW_SIZE);
|
---|
28 | private TimedAverageList msgSize = new TimedAverageList(ServiceCacheEntry.WINDOW_SIZE);
|
---|
29 | private Map remoteInstances = Collections.synchronizedMap(new HashMap());
|
---|
30 | private final List singleQ = Collections.synchronizedList(new ArrayList());
|
---|
31 |
|
---|
32 | private final String name;
|
---|
33 | private final CacheCoveragePlugin parent;
|
---|
34 |
|
---|
35 | private boolean ssl;
|
---|
36 | private String host, prefix, passwd;
|
---|
37 | private int port;
|
---|
38 | private boolean connected, redundant;
|
---|
39 | private int counter, reconnects, errors, ecmForwards, delayAlerts, filtered;
|
---|
40 | private int maxDelay;
|
---|
41 | private Set profiles, caids;
|
---|
42 |
|
---|
43 | private long bytesOut, bytesIn;
|
---|
44 | private TimedAverageList sentAvg = new TimedAverageList(10), recvAvg = new TimedAverageList(10);
|
---|
45 |
|
---|
46 | public HttpCacheForwarder(CacheCoveragePlugin parent, String name) {
|
---|
47 | this.parent = parent;
|
---|
48 | this.name = name;
|
---|
49 | }
|
---|
50 |
|
---|
51 | public void configUpdated(ProxyXmlConfig xml) throws ConfigException {
|
---|
52 | String urlStr = xml.getStringValue("url");
|
---|
53 | try {
|
---|
54 | URL url = new URL(urlStr);
|
---|
55 | if(!url.getProtocol().startsWith("http")) throw new ConfigException(xml.getFullName(), "url", "Non-http url: " + urlStr);
|
---|
56 | ssl = "https".equals(url.getProtocol());
|
---|
57 | host = url.getHost();
|
---|
58 | port = url.getPort();
|
---|
59 | if(port == -1) port = ssl?443:80;
|
---|
60 | prefix = url.getPath();
|
---|
61 | if(prefix == null) prefix = "";
|
---|
62 | if(!prefix.endsWith("/")) prefix += "/";
|
---|
63 | } catch(MalformedURLException e) {
|
---|
64 | throw new ConfigException(xml.getFullName(), "url", "Malformed url: " + urlStr);
|
---|
65 | }
|
---|
66 | if("true".equalsIgnoreCase(xml.getStringValue("enabled", "true"))) {
|
---|
67 | passwd = xml.getStringValue("password");
|
---|
68 | redundant = "true".equalsIgnoreCase(xml.getStringValue("redundant-forwarding", "false"));
|
---|
69 | maxDelay = xml.getTimeValue("max-delay", 200, "ms");
|
---|
70 | String profileStr = xml.getStringValue("profiles", "").trim().toLowerCase();
|
---|
71 | if(profileStr.length() > 0) profiles = new HashSet(Arrays.asList(profileStr.split(" ")));
|
---|
72 | else profiles = null;
|
---|
73 | String caidStr = xml.getStringValue("caids", "").trim();
|
---|
74 | if(caidStr.length() > 0) caids = ProxyXmlConfig.getIntTokens("caids", caidStr);
|
---|
75 | else caids = null;
|
---|
76 | for(int i = 0; i < threads.length; i++) {
|
---|
77 | threads[i] = new FeederThread(i);
|
---|
78 | threads[i].start();
|
---|
79 | }
|
---|
80 | } else close();
|
---|
81 | }
|
---|
82 |
|
---|
83 | public String getName() {
|
---|
84 | return name;
|
---|
85 | }
|
---|
86 |
|
---|
87 | public boolean isConnected() {
|
---|
88 | return connected;
|
---|
89 | }
|
---|
90 |
|
---|
91 | public Properties getProperties() {
|
---|
92 | Properties p = new Properties();
|
---|
93 | p.setProperty("max-delay", String.valueOf(maxDelay));
|
---|
94 | p.setProperty("avg-latency", String.valueOf(latency.getAverage(true)));
|
---|
95 | p.setProperty("peak-latency", String.valueOf(getPeakLatency()));
|
---|
96 | p.setProperty("avg-msize", String.valueOf(msgSize.getAverage(true)));
|
---|
97 | p.setProperty("peak-msize", String.valueOf(msgSize.getMaxValue()));
|
---|
98 | p.setProperty("cur-qsize", String.valueOf(getSendQSize()));
|
---|
99 | p.setProperty("msg-count", String.valueOf(counter));
|
---|
100 | p.setProperty("filtered", String.valueOf(filtered));
|
---|
101 | p.setProperty("avg-sent-rate", String.valueOf(sentAvg.getTotal(true) / 10));
|
---|
102 | p.setProperty("avg-recv-rate", String.valueOf(recvAvg.getTotal(true) / 10));
|
---|
103 | p.setProperty("ecms", String.valueOf(ecmForwards));
|
---|
104 | p.setProperty("delay-alerts", String.valueOf(delayAlerts));
|
---|
105 | p.setProperty("reconnects", String.valueOf(reconnects));
|
---|
106 | p.setProperty("errors", String.valueOf(errors));
|
---|
107 | return p;
|
---|
108 | }
|
---|
109 |
|
---|
110 | private int getPeakLatency() {
|
---|
111 | if(redundant) return Math.min(threads[0].peakLatency, threads[1].peakLatency);
|
---|
112 | else return Math.max(threads[0].peakLatency, threads[1].peakLatency);
|
---|
113 | }
|
---|
114 |
|
---|
115 | private int getSendQSize() {
|
---|
116 | if(redundant) return threads[0].localQ.size() + threads[1].localQ.size();
|
---|
117 | else return singleQ.size();
|
---|
118 | }
|
---|
119 |
|
---|
120 | public Map getRemoteInstances() {
|
---|
121 | String instance; Properties p; long timeStamp, now = System.currentTimeMillis();
|
---|
122 | for(Iterator iter = remoteInstances.keySet().iterator(); iter.hasNext(); ) {
|
---|
123 | instance = (String)iter.next();
|
---|
124 | p = (Properties)remoteInstances.get(instance);
|
---|
125 | timeStamp = Long.parseLong(p.getProperty("tstamp"));
|
---|
126 | if(now - timeStamp > INSTANCE_MAXAGE) iter.remove();
|
---|
127 | }
|
---|
128 | return remoteInstances;
|
---|
129 | }
|
---|
130 |
|
---|
131 | private void sentBytes(int count) {
|
---|
132 | bytesOut += count;
|
---|
133 | sentAvg.addRecord(count);
|
---|
134 | }
|
---|
135 |
|
---|
136 | private void recvBytes(int count) {
|
---|
137 | bytesIn += count;
|
---|
138 | recvAvg.addRecord(count);
|
---|
139 | }
|
---|
140 |
|
---|
141 | public void close() {
|
---|
142 | if(threads != null) {
|
---|
143 | for(int i = 0; i < threads.length; i++)
|
---|
144 | if(threads[i] != null) threads[i].interrupt();
|
---|
145 | threads = null;
|
---|
146 | }
|
---|
147 | connected = false;
|
---|
148 | }
|
---|
149 |
|
---|
150 | public void forwardRequest(CamdNetMessage req) {
|
---|
151 | return; // not applicable
|
---|
152 | }
|
---|
153 |
|
---|
154 | public void forwardReply(CamdNetMessage req, CamdNetMessage reply) {
|
---|
155 | if(threads != null && connected) {
|
---|
156 | if(reply.getDataLength() == 16) {
|
---|
157 | if(profiles != null)
|
---|
158 | if(req.getProfileName() == null || !profiles.contains(req.getProfileName().toLowerCase())) {
|
---|
159 | filtered++;
|
---|
160 | return;
|
---|
161 | }
|
---|
162 | if(caids != null)
|
---|
163 | if(!caids.contains(new Integer(req.getCaId()))) {
|
---|
164 | filtered++;
|
---|
165 | return;
|
---|
166 | }
|
---|
167 | if(redundant) forwardRedundant(req, reply);
|
---|
168 | else {
|
---|
169 | synchronized(singleQ) {
|
---|
170 | if(singleQ.size() > MAX_QSIZE) {
|
---|
171 | singleQ.clear();
|
---|
172 | parent.logger.warning("HttpCacheForwarder[" + name + "] discarding sendQ, no working connections?");
|
---|
173 | }
|
---|
174 | singleQ.add(new RequestReplyPair(req, reply));
|
---|
175 | singleQ.notifyAll();
|
---|
176 | }
|
---|
177 | }
|
---|
178 | }
|
---|
179 | }
|
---|
180 | }
|
---|
181 |
|
---|
182 | private void forwardRedundant(CamdNetMessage req, CamdNetMessage reply) {
|
---|
183 | for(int i = 0; i < threads.length; i++) { // individual queues for each sender thread, duplicate traffic but lower latency
|
---|
184 | synchronized(threads[i].localQ) {
|
---|
185 | if(threads[i].localQ.size() > MAX_QSIZE) {
|
---|
186 | threads[i].localQ.clear();
|
---|
187 | parent.logger.warning("HttpCacheForwarder[" + name + "] discarding sendQ, no working connections?");
|
---|
188 | }
|
---|
189 | threads[i].localQ.add(new RequestReplyPair(req, reply));
|
---|
190 | threads[i].localQ.notifyAll();
|
---|
191 | }
|
---|
192 | }
|
---|
193 | }
|
---|
194 |
|
---|
195 | private static class RequestReplyPair {
|
---|
196 | CamdNetMessage request, reply;
|
---|
197 |
|
---|
198 | private RequestReplyPair(CamdNetMessage request, CamdNetMessage reply) {
|
---|
199 | this.request = request;
|
---|
200 | this.reply = reply;
|
---|
201 | }
|
---|
202 | }
|
---|
203 |
|
---|
204 | private static class HttpReply {
|
---|
205 | int code, hdrSize;
|
---|
206 | String msg;
|
---|
207 | Properties headers = new Properties(), cookies = new Properties();
|
---|
208 | char[] body;
|
---|
209 | boolean headerEnd;
|
---|
210 |
|
---|
211 | private HttpReply(String line) {
|
---|
212 | hdrSize += line.length() + 2;
|
---|
213 | String[] s = line.split(" ");
|
---|
214 | code = Integer.parseInt(s[1]);
|
---|
215 | msg = s[2];
|
---|
216 | }
|
---|
217 |
|
---|
218 | void addHeader(String line) {
|
---|
219 | hdrSize += line.length() + 2;
|
---|
220 | if("".equals(line)) headerEnd = true;
|
---|
221 | else {
|
---|
222 | String[] s = line.split(": ");
|
---|
223 | headers.setProperty(s[0], s[1]);
|
---|
224 | if(s[0].equals("Set-Cookie")) {
|
---|
225 | s = s[1].split("=");
|
---|
226 | cookies.setProperty(s[0], s[1]);
|
---|
227 | }
|
---|
228 | }
|
---|
229 | }
|
---|
230 |
|
---|
231 | String getCookie(String name) {
|
---|
232 | return cookies.getProperty(name);
|
---|
233 | }
|
---|
234 |
|
---|
235 | String getHeader(String name) {
|
---|
236 | return headers.getProperty(name);
|
---|
237 | }
|
---|
238 |
|
---|
239 | int getContentLength() {
|
---|
240 | return Integer.parseInt(headers.getProperty("Content-Length", "-1"));
|
---|
241 | }
|
---|
242 |
|
---|
243 | String getContentType() {
|
---|
244 | return headers.getProperty("Content-Type");
|
---|
245 | }
|
---|
246 |
|
---|
247 | String getContentAsString() {
|
---|
248 | return new String(body);
|
---|
249 | }
|
---|
250 |
|
---|
251 | SimpleTlvBlob getContentAsTlv() {
|
---|
252 | byte[] buf = null;
|
---|
253 | try {
|
---|
254 | buf = getContentAsString().getBytes("ISO-8859-1");
|
---|
255 | return new SimpleTlvBlob(buf);
|
---|
256 | } catch(UnsupportedEncodingException e) {
|
---|
257 | e.printStackTrace();
|
---|
258 | return null;
|
---|
259 | } catch(Exception e) {
|
---|
260 | e.printStackTrace();
|
---|
261 | System.out.println(headers);
|
---|
262 | System.out.println(DESUtil.bytesToString(buf));
|
---|
263 | System.out.println();
|
---|
264 | return null;
|
---|
265 | }
|
---|
266 | }
|
---|
267 |
|
---|
268 | int getSize() {
|
---|
269 | return hdrSize + body.length;
|
---|
270 | }
|
---|
271 | }
|
---|
272 |
|
---|
273 | private class FeederThread extends Thread {
|
---|
274 |
|
---|
275 | private final List localQ = Collections.synchronizedList(new ArrayList());
|
---|
276 |
|
---|
277 | private Socket conn;
|
---|
278 | private DataOutputStream dos;
|
---|
279 | private BufferedReader br;
|
---|
280 |
|
---|
281 | private String sessionId;
|
---|
282 | protected int peakLatency;
|
---|
283 |
|
---|
284 | public FeederThread(int i) {
|
---|
285 | super("GHttpFeederThread-" + i);
|
---|
286 | }
|
---|
287 |
|
---|
288 | private void initConn() throws IOException {
|
---|
289 | conn = ssl?FileFetcher.socketFactory.createSocket():new Socket();
|
---|
290 | conn.connect(new InetSocketAddress(host, port), CONNECT_TIMEOUT);
|
---|
291 | if(conn == null) return;
|
---|
292 | conn.setSoTimeout(CONNECT_TIMEOUT / 2);
|
---|
293 | dos = new DataOutputStream(new BufferedOutputStream(conn.getOutputStream()));
|
---|
294 | br = new BufferedReader(new InputStreamReader(conn.getInputStream(), "ISO-8859-1"));
|
---|
295 | connected = true;
|
---|
296 | }
|
---|
297 |
|
---|
298 | private void httpPost(List sendQ) throws IOException {
|
---|
299 | counter++;
|
---|
300 | long start = System.currentTimeMillis();
|
---|
301 | int size = dos.size();
|
---|
302 |
|
---|
303 | String path = "api/f/" + (sessionId == null?"":sessionId);
|
---|
304 | dos.writeBytes("POST " + prefix + path + " HTTP/1.1" + CRLF);
|
---|
305 | dos.writeBytes("Host: " + host + CRLF);
|
---|
306 | dos.writeBytes("Content-Length: " + (RECORD_SIZE * sendQ.size()) + CRLF);
|
---|
307 | if(sessionId == null) {
|
---|
308 | String auth = Integer.toHexString(ProxyConfig.getInstance().getProxyOriginId()) + ":" + passwd;
|
---|
309 | auth = new String(com.bowman.util.Base64Encoder.encode(auth.getBytes("ISO-8859-1")));
|
---|
310 | dos.writeBytes("Authorization: Basic " + auth + CRLF);
|
---|
311 | }
|
---|
312 | dos.writeBytes(CRLF);
|
---|
313 | msgSize.addRecord(sendQ.size());
|
---|
314 |
|
---|
315 | RequestReplyPair pair;
|
---|
316 | for(Iterator iter = sendQ.iterator(); iter.hasNext(); ) {
|
---|
317 | pair = (RequestReplyPair)iter.next();
|
---|
318 |
|
---|
319 | ClusteredCache.writeCacheReq(dos, pair.request, false);
|
---|
320 | ClusteredCache.writeCacheRpl(dos, pair.reply, false);
|
---|
321 | }
|
---|
322 | dos.flush();
|
---|
323 | sentBytes(dos.size() - size);
|
---|
324 |
|
---|
325 | String resp = br.readLine();
|
---|
326 | if(resp == null) {
|
---|
327 | parent.logger.warning("HttpCacheForwarder[" + name + "] connection closed after post.");
|
---|
328 | handleError(sendQ, true);
|
---|
329 | return;
|
---|
330 | }
|
---|
331 |
|
---|
332 | HttpReply reply = new HttpReply(resp);
|
---|
333 |
|
---|
334 | do { // read remaining header
|
---|
335 | reply.addHeader(br.readLine());
|
---|
336 | } while(!reply.headerEnd);
|
---|
337 |
|
---|
338 | // read body
|
---|
339 | if("chunked".equalsIgnoreCase(reply.getHeader("Transfer-Encoding"))) {
|
---|
340 | StringBuffer sb = new StringBuffer(); String line;
|
---|
341 | do {
|
---|
342 | line = br.readLine();
|
---|
343 | sb.append(line).append(CRLF);
|
---|
344 | } while(!"".equals(line));
|
---|
345 | reply.body = sb.toString().toCharArray();
|
---|
346 | } else {
|
---|
347 | reply.body = new char[reply.getContentLength()];
|
---|
348 | if(br.read(reply.body) != reply.body.length) {
|
---|
349 | throw new IOException("Assertation failed"); // todo
|
---|
350 | }
|
---|
351 | }
|
---|
352 |
|
---|
353 | recvBytes(reply.getSize());
|
---|
354 |
|
---|
355 | String sessionCookie = reply.getCookie("GSSID");
|
---|
356 | if(sessionCookie != null) sessionId = sessionCookie;
|
---|
357 |
|
---|
358 | switch(reply.code) {
|
---|
359 | case 200:
|
---|
360 | case 204:
|
---|
361 | sendQ.clear();
|
---|
362 | int time = (int)(System.currentTimeMillis() - start);
|
---|
363 | if(time > peakLatency) peakLatency = time;
|
---|
364 | latency.addRecord(time);
|
---|
365 | if(reply.body.length > 0) handleReply(reply.getContentAsTlv());
|
---|
366 | break;
|
---|
367 | case 401:
|
---|
368 | parent.logger.warning("HttpCacheForwarder[" + name + "] session expired.");
|
---|
369 | sessionId = null;
|
---|
370 | break;
|
---|
371 | case 400:
|
---|
372 | parent.logger.warning("HttpCacheForwarder[" + name + "] received bad request error, disabling: " +
|
---|
373 | reply.getContentAsString());
|
---|
374 | close();
|
---|
375 | return;
|
---|
376 | case 403:
|
---|
377 | parent.logger.warning("HttpCacheForwarder[" + name + "] invalid password, disabling.");
|
---|
378 | close();
|
---|
379 | return;
|
---|
380 | case 503:
|
---|
381 | // possibly over quota
|
---|
382 | if(reply.getContentAsString().indexOf("quota") != -1)
|
---|
383 | parent.logger.warning("HttpCacheForwarder[" + name + "] backend temporarily over quota.");
|
---|
384 | else {
|
---|
385 | parent.logger.warning("HttpCacheForwarder[" + name + "] backend temporarily unavailable: " + resp);
|
---|
386 | System.out.println(reply.getContentAsString());
|
---|
387 | }
|
---|
388 | try {
|
---|
389 | Thread.sleep(5000);
|
---|
390 | } catch(InterruptedException ignored) {}
|
---|
391 | return;
|
---|
392 | default: // all other 5xx errors
|
---|
393 | parent.logger.warning("HttpCacheForwarder[" + name + "] received error reply: " + resp);
|
---|
394 | System.out.println(reply.getContentAsString());
|
---|
395 | handleError(sendQ, true);
|
---|
396 | break;
|
---|
397 | }
|
---|
398 | }
|
---|
399 |
|
---|
400 | void handleReply(SimpleTlvBlob tb) {
|
---|
401 | if(tb == null) return;
|
---|
402 | int key; String instance = null;
|
---|
403 | for(Iterator iter = tb.keySet().iterator(); iter.hasNext(); ) {
|
---|
404 | key = ((Integer)iter.next()).intValue();
|
---|
405 | switch(key) {
|
---|
406 | case T_INSTANCE_ID:
|
---|
407 | instance = new String(tb.getSingle(key)); // always first
|
---|
408 | break;
|
---|
409 | case T_STAT_UPDATE:
|
---|
410 | Properties p = new Properties();
|
---|
411 | p.setProperty("tstamp", String.valueOf(System.currentTimeMillis()));
|
---|
412 | int[] stats = tb.getIntArray(key);
|
---|
413 | for(int i = 0; i < stats.length; i++) p.setProperty(STAT_KEYS[i], String.valueOf(stats[i]));
|
---|
414 | if(remoteInstances.put(instance, p) == null) {
|
---|
415 | parent.logger.fine("HttpCacheForwarder[" + name + "] encountered new remote instance: " + instance +
|
---|
416 | " (" + getRemoteInstances().size() + ")");
|
---|
417 | }
|
---|
418 | break;
|
---|
419 | case T_ECM_REQ:
|
---|
420 | List ecms = tb.get(key);
|
---|
421 | parent.logger.fine("HttpCacheForwarder[" + name + "] received " + ecms.size() + " ecm requests...");
|
---|
422 | CamdNetMessage ecmReq;
|
---|
423 | for(Iterator i = ecms.iterator(); i.hasNext(); ) {
|
---|
424 | try {
|
---|
425 | ecmReq = CamdNetMessage.parseGHttpReq(new DataInputStream(new ByteArrayInputStream((byte[])i.next())),
|
---|
426 | conn.getInetAddress().getHostAddress(), true);
|
---|
427 | if(parent.tester != null) parent.tester.testMessage(ecmReq);
|
---|
428 | ecmForwards++;
|
---|
429 | } catch(Exception e) {
|
---|
430 | parent.logger.throwing(e);
|
---|
431 | }
|
---|
432 | }
|
---|
433 | break;
|
---|
434 | case T_CACHE_MISS:
|
---|
435 | if(parent.cache instanceof ClusteredCache) {
|
---|
436 | List delayed = tb.get(key);
|
---|
437 | parent.logger.fine("HttpCacheForwarder[" + name + "] received " + delayed.size() + " cache requests for resend...");
|
---|
438 | CamdNetMessage req;
|
---|
439 | for(Iterator i = delayed.iterator(); i.hasNext(); ) {
|
---|
440 | try {
|
---|
441 | req = CamdNetMessage.parseCacheReq(new DataInputStream(new ByteArrayInputStream((byte[])i.next())), false);
|
---|
442 | ((ClusteredCache)parent.cache).delayAlert(-1, req, false, -1);
|
---|
443 | delayAlerts++;
|
---|
444 | } catch(IOException e) {
|
---|
445 | parent.logger.throwing(e);
|
---|
446 | }
|
---|
447 | }
|
---|
448 | }
|
---|
449 | break;
|
---|
450 | default:
|
---|
451 | parent.logger.fine("HttpCacheForwarder[" + name + "] received unknown TLV field in reply: " + key);
|
---|
452 | break;
|
---|
453 | }
|
---|
454 |
|
---|
455 | }
|
---|
456 | }
|
---|
457 |
|
---|
458 | public void run() {
|
---|
459 | List myQ = new ArrayList();
|
---|
460 |
|
---|
461 | while(threads != null) {
|
---|
462 | try {
|
---|
463 | if(conn == null) initConn();
|
---|
464 | if(redundant) {
|
---|
465 | synchronized(localQ) { // fetch from local
|
---|
466 | while(localQ.isEmpty()) {
|
---|
467 | localQ.wait();
|
---|
468 | }
|
---|
469 | if(!localQ.isEmpty()) {
|
---|
470 | myQ.addAll(localQ);
|
---|
471 | localQ.clear();
|
---|
472 | }
|
---|
473 | }
|
---|
474 | } else {
|
---|
475 | synchronized(singleQ) { // fetch from shared
|
---|
476 | while(singleQ.isEmpty()) {
|
---|
477 | singleQ.wait();
|
---|
478 | }
|
---|
479 | if(!singleQ.isEmpty()) {
|
---|
480 | myQ.addAll(singleQ);
|
---|
481 | singleQ.clear();
|
---|
482 | }
|
---|
483 | }
|
---|
484 | }
|
---|
485 | if(!myQ.isEmpty()) {
|
---|
486 | try {
|
---|
487 | httpPost(myQ);
|
---|
488 | int curSize = redundant?localQ.size():singleQ.size();
|
---|
489 | if(maxDelay > 0 && curSize < 100) Thread.sleep(maxDelay);
|
---|
490 | } catch (SocketException se) {
|
---|
491 | // socket was closed gracefully, reconnect immediately
|
---|
492 | parent.logger.throwing(se);
|
---|
493 | initConn();
|
---|
494 | reconnects++;
|
---|
495 | }
|
---|
496 | }
|
---|
497 | } catch(SocketException e) { // probably connect failure?
|
---|
498 | parent.logger.info("HttpCacheForwarder[" + name + "] disconnected");
|
---|
499 | parent.logger.throwing(e);
|
---|
500 | handleError(myQ, false);
|
---|
501 | } catch(IOException e) { // abnormal disconnect
|
---|
502 | parent.logger.warning("HttpCacheForwarder[" + name + "] disconnected (sendQ size: " + myQ.size() + "): " + e);
|
---|
503 | parent.logger.throwing(e);
|
---|
504 | e.printStackTrace();
|
---|
505 | handleError(myQ, false);
|
---|
506 | } catch(InterruptedException e) {
|
---|
507 | return;
|
---|
508 | }
|
---|
509 | }
|
---|
510 | }
|
---|
511 |
|
---|
512 | private void handleError(List sendQ, boolean disconnect) {
|
---|
513 | if(!sendQ.isEmpty()) {
|
---|
514 | if(redundant) localQ.addAll(sendQ);
|
---|
515 | else singleQ.addAll(sendQ);
|
---|
516 | sendQ.clear();
|
---|
517 | }
|
---|
518 | errors++;
|
---|
519 | if(disconnect) disconnect();
|
---|
520 | else conn = null;
|
---|
521 | try {
|
---|
522 | Thread.sleep(RETRY_WAIT);
|
---|
523 | } catch(InterruptedException ie) {
|
---|
524 | ie.printStackTrace();
|
---|
525 | }
|
---|
526 | }
|
---|
527 |
|
---|
528 | public void disconnect() {
|
---|
529 | if(conn != null) {
|
---|
530 | try {
|
---|
531 | conn.close();
|
---|
532 | } catch(IOException e) {
|
---|
533 | // ignore
|
---|
534 | }
|
---|
535 | conn = null;
|
---|
536 | }
|
---|
537 | }
|
---|
538 |
|
---|
539 | public void interrupt() {
|
---|
540 | disconnect();
|
---|
541 | super.interrupt();
|
---|
542 | }
|
---|
543 | }
|
---|
544 |
|
---|
545 | }
|
---|