Changeset 243


Ignore:
Timestamp:
07/11/13 19:08:33 (10 years ago)
Author:
bowman
Message:

Completed ghttp test-backend (now supports /f/ operation) and tweaked CacheCoveragePlugin http forwarder to match.

Location:
trunk
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • trunk/plugins/CacheCoveragePlugin/src/com/bowman/cardserv/HttpCacheForwarder.java

    r238 r243  
    88
    99import java.io.*;
     10import java.lang.System;
    1011import java.net.*;
    1112import java.util.*;
     
    3738  private int port;
    3839  private boolean connected, redundant;
    39   private int counter, reconnects, errors, ecmForwards, delayAlerts, filtered;
     40  private int counter, reconnects, timeouts, errors, ecmForwards, delayAlerts, filtered;
    4041  private int maxDelay;
    4142  private Set profiles, caids;
     
    105106    p.setProperty("reconnects", String.valueOf(reconnects));
    106107    p.setProperty("errors", String.valueOf(errors));
     108    p.setProperty("timeouts", String.valueOf(timeouts));
    107109    return p;
    108110  }
     
    223225        headers.setProperty(s[0], s[1]);
    224226        if(s[0].equals("Set-Cookie")) {
    225           s = s[1].split("=");
     227          if(s[1].indexOf(";") != -1) {
     228            s = s[1].split(";"); // ignore all except 1st
     229            s = s[0].split("=");
     230          } else s = s[1].split("=");
    226231          cookies.setProperty(s[0], s[1]);
    227232        }
     
    345350        reply.body = sb.toString().toCharArray();
    346351      } else {
    347         reply.body = new char[reply.getContentLength()];
    348         if(br.read(reply.body) != reply.body.length) {
    349           throw new IOException("Assertation failed"); // todo
    350         }
     352        if(reply.getContentLength() > 0) {
     353          reply.body = new char[reply.getContentLength()];
     354          if(br.read(reply.body) != reply.body.length) {
     355            throw new IOException("Assertation failed"); // todo
     356          }
     357        } else reply.body = new char[0];
    351358      }
    352359
     
    488495              int curSize = redundant?localQ.size():singleQ.size();
    489496              if(maxDelay > 0 && curSize < 100) Thread.sleep(maxDelay);
    490             } catch (SocketException se) {
     497            } catch(SocketException se) {
    491498              // socket was closed gracefully, reconnect immediately
    492499              parent.logger.throwing(se);
     
    496503          }
    497504        } catch(SocketException e) { // probably connect failure?
    498           parent.logger.info("HttpCacheForwarder[" + name + "] disconnected");
     505          parent.logger.info("HttpCacheForwarder[" + name + "] failed to connect: " + e);
    499506          parent.logger.throwing(e);
     507          handleError(myQ, false);
     508        } catch(SocketTimeoutException e) { // timeout
     509          parent.logger.warning("HttpCacheForwarder[" + name + "] timeout (sendQ size: " + myQ.size() + "): " + e);
     510          parent.logger.throwing(e);
     511          timeouts++;
    500512          handleError(myQ, false);
    501513        } catch(IOException e) { // abnormal disconnect
  • trunk/src/com/bowman/cardserv/CardServProxy.java

    r241 r243  
    401401
    402402        if(modified.isEmpty() && !(profile.isCacheOnly() || config.isCatchAll())) {
    403           logger.fine("Denying message with no connector candidates from '" + session + "': " + msg.hashCodeStr());
    404           denyMessage(session, msg);
    405           return;
     403          if(!cacheHandler.containsCaid(msg.getCaId())) {
     404            logger.fine("Denying message with no connector candidates from '" + session + "': " + msg.hashCodeStr());
     405            denyMessage(session, msg);
     406            return;
     407          }
    406408        }
    407409
  • trunk/src/com/bowman/cardserv/DefaultCache.java

    r235 r243  
    2424
    2525  private int timeouts, instantHits, waitHits, remoteHits, pendingPeak, contested;
     26
     27  private Set caids = new HashSet();
    2628
    2729  public DefaultCache() {
     
    162164  }
    163165
     166  public synchronized void processReplies(Map replies) {
     167    boolean lockFound = false;
     168    CamdNetMessage request, reply;
     169    for(Iterator iter = replies.keySet().iterator(); iter.hasNext(); ) {
     170      request = (CamdNetMessage)iter.next();
     171      reply = (CamdNetMessage)replies.get(request);
     172      // if(monitor != null) monitor.onReply(request, reply);
     173      if(listener != null) listener.onReply(request, reply);
     174      if(pendingEcms.containsKey(request)) {
     175        removeRequest(request);
     176        lockFound = true;
     177      }
     178      addReply(request, reply);
     179    }
     180    if(lockFound) notifyAll();
     181  }
     182
    164183  public CamdNetMessage peekReply(CamdNetMessage request) {
    165184    return (CamdNetMessage)ecmMap.get(request);
     
    186205  }
    187206
     207  public boolean containsCaid(int caid) {
     208    return caids.contains(new Integer(caid));
     209  }
     210
    188211  protected synchronized void addRequest(int successFactor, CamdNetMessage request, boolean alwaysWait) {
    189212    CamdNetMessage oldRequest = (CamdNetMessage)pendingEcms.put(request, request);
    190213    if(pendingEcms.size() > pendingPeak) pendingPeak = pendingEcms.size();
    191214    if(oldRequest == null) {
     215      caids.add(new Integer(request.getCaId()));
    192216      if(monitor != null) monitor.onRequest(successFactor, request);
    193217      if(listener != null) listener.onRequest(successFactor, request);
  • trunk/src/com/bowman/cardserv/interfaces/CacheHandler.java

    r191 r243  
    33import com.bowman.cardserv.CamdNetMessage;
    44
    5 import java.util.Properties;
     5import java.util.*;
    66
    77/**
     
    1818  CamdNetMessage peekReply(CamdNetMessage request);
    1919  long getMaxCacheWait(long maxCwWait);
     20  boolean containsCaid(int caid);
     21  void processReplies(Map replies);
    2022
    2123  Properties getUsageStats();
  • trunk/src/com/bowman/cardserv/web/GHttpBackend.java

    r238 r243  
    4040  private Map sessionsByUser = new HashMap();
    4141  private Map contexts = Collections.synchronizedMap(new LinkedHashMap());
     42  private Set blackList = new HashSet();
    4243
    4344  private StatusCommand contextsCmd;
     
    144145      }
    145146
     147      blackList.clear();
     148      String bl = FileFetcher.getProperty("cache.bl");
     149      if(bl != null) blackList.addAll(Arrays.asList(bl.split(" ")));
     150
    146151    } else {
    147152      if(httpd != null) {
     
    191196
    192197  private CamdNetMessage waitForReply(CamdNetMessage request) {
     198    // long start = System.currentTimeMillis();
    193199    CamdNetMessage reply = cache.peekReply(request);
    194200    if(reply != null) return reply;
    195     else reply = cache.processRequest(-1, request, true, request.getMaxWait() * 3); // todo
     201    else reply = cache.processRequest(-1, request, true, request.getMaxWait() * 2); // todo
     202    // System.out.println((System.currentTimeMillis() - start) + " - " + reply);
    196203    return reply;
    197204  }
     
    211218  }
    212219
    213   private GHttpSession getSession(String id, String ip) {
     220  private GHttpSession getSession(String id, String ip) throws GHttpAuthException {
    214221    GHttpSession gs = (GHttpSession)sessions.get(id);
    215222    if(gs != null && gs.isExpired()) {
    216223      sessionsByUser.remove(gs.getUser());
    217224      sessions.remove(id);
    218       gs = null;
     225      throw new GHttpAuthException(getErrorResponse(401, "Authorization required"));
    219226    }
    220227    if(gs != null && ip.equals(gs.getRemoteAddress())) return gs;
     
    222229  }
    223230
    224   private GHttpSession findSession(String user, String ip) {
     231  private GHttpSession findSession(String user, String ip) throws GHttpAuthException {
    225232    GHttpSession gs = (GHttpSession)sessionsByUser.get(user);
    226233    if(gs != null && gs.isExpired()) {
    227234      sessionsByUser.remove(user);
    228235      sessions.remove(gs.getGhttpSessionId());
    229       gs = null;
     236      throw new GHttpAuthException(getErrorResponse(401, "Authorization required"));
    230237    }
    231238    if(gs != null && ip.equals(gs.getRemoteAddress())) return gs;
     
    234241
    235242  private synchronized GHttpSession doGhttpAuth(HttpRequest req, String[] s) throws GHttpAuthException {
     243    return doGhttpAuth(req, s, accessPasswd);
     244  }
     245
     246  private synchronized GHttpSession doGhttpAuth(HttpRequest req, String[] s, String passwd) throws GHttpAuthException {
    236247    GHttpSession gs = null;
    237     if(s[3].length() >= 6) { // session id included
     248    if(s.length > 3 && s[3].length() >= 6) { // session id included
    238249      gs = getSession(s[3], req.getRemoteAddress());
    239250    }
    240251    if(gs == null) { // no session id in query string (or invalid/expired session), check for auth
    241       String user = parent.checkBasicAuth(req, accessPasswd);
    242       if(user == null) throw new GHttpAuthException(getErrorResponse(401, "Authorization required"));
     252      String user = parent.checkBasicAuth(req, passwd);
     253      if(user == null) throw new GHttpAuthException(getErrorResponse(403, "Forbidden"));
    243254      if(!user.startsWith(WebBackend.anonPrefix)) {
    244255        HttpResponse error = parent.doIpCheck(req, user);
     
    298309         return doPmtGetOrPost(req);
    299310      } else if(q.startsWith(API_FEEDER_POST)) {
    300         // todo
     311        return doCachePost(req);
    301312      }
    302313      return getErrorResponse(503, "Not implemented"); // todo
     314    }
     315  }
     316
     317  private HttpResponse doCachePost(HttpRequest req) {
     318    if(feederPasswd == null) return getErrorResponse(403, "Disabled");
     319    if(cache == null) cache = config.getCacheHandler();
     320    try {
     321      String[] s = req.getQueryString().split("/");
     322      GHttpSession gs = doGhttpAuth(req, s, feederPasswd);
     323      if(gs == null) return getErrorResponse(403, "Forbidden");
     324      byte[] content = req.getContent();
     325      String addr = req.getRemoteAddress();
     326      if(content.length >= RECORD_SIZE) {
     327        Map replies = new HashMap();
     328        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content));
     329        while(dis.available() >= RECORD_SIZE) {
     330          CamdNetMessage request = CamdNetMessage.parseCacheReq(dis, false);
     331          CamdNetMessage reply = CamdNetMessage.parseCacheRpl(dis, request, false);
     332          if(!blackList.contains(addr)) {
     333            request.setOriginAddress(addr);
     334            reply.setOriginAddress(addr);
     335          }
     336          replies.put(request, reply);
     337        }
     338        cache.processReplies(replies);
     339        if(dis.available() != 0) parent.logger.warning("Trailing bytes in feed post: " + dis.available());
     340        HttpResponse res = new HttpResponse(204, "No Content"); // todo: include stats
     341        if(s.length <= 3) res.setCookie("GSSID", gs.getGhttpSessionId());
     342        return res;
     343      } else throw new IOException("Invalid content length (" + content.length + ")");
     344    } catch (GHttpAuthException e) {
     345      return getErrorResponse(403, "Forbidden");
     346    } catch (Exception e) {
     347      parent.logger.throwing("Bad ghttp feeder request: " + req.getQueryString(), e);
     348      return HttpResponse.getErrorResponse(400, req.getQueryString());
    303349    }
    304350  }
     
    317363        if(ProxyConfig.getInstance().getProfileById(ecmReq.getNetworkId(), ecmReq.getCaId()) == null)
    318364          return getErrorResponse(503, "Unknown system: " + CaProfile.getKeyStr(ecmReq.getNetworkId(), ecmReq.getCaId()));
     365        else if(!cache.containsCaid(ecmReq.getCaId()))
     366          return getErrorResponse(503, "Unknown caid: " + CaProfile.getKeyStr(ecmReq.getNetworkId(), ecmReq.getCaId()));
    319367        gs.fireCamdMessage(ecmReq, false);
    320368        reply = gs.waitForReply(ecmReq);
    321369      } else instant = true;
    322370
    323       if(reply == null) cache.peekReply(ecmReq);
     371      if(reply == null) reply = cache.peekReply(ecmReq);
    324372      if(reply == null) return getErrorResponse(503, "Ecm timeout");
    325373      else {
Note: See TracChangeset for help on using the changeset viewer.