source: trunk/module-stat.c@ 8452

Last change on this file since 8452 was 8452, checked in by gf, 9 years ago

Do not forget to close open file on error.

Patch by DaytonaHack.

  • Property svn:eol-style set to LF
File size: 45.4 KB
Line 
1#include "globals.h"
2
3#ifdef WITH_LB
4#include "cscrypt/md5.h"
5#include "module-cacheex.h"
6#include "module-cccam.h"
7#include "oscam-chk.h"
8#include "oscam-client.h"
9#include "oscam-ecm.h"
10#include "oscam-files.h"
11#include "oscam-lock.h"
12#include "oscam-string.h"
13#include "oscam-time.h"
14
15#define UNDEF_AVG_TIME 80000
16#define MAX_ECM_SEND_CACHE 16
17
18#define LB_REOPEN_MODE_STANDARD 0
19#define LB_REOPEN_MODE_FAST 1
20
21#define LB_NONE 0
22#define LB_FASTEST_READER_FIRST 1
23#define LB_OLDEST_READER_FIRST 2
24#define LB_LOWEST_USAGELEVEL 3
25#define LB_LOG_ONLY 10
26
27#define DEFAULT_LOCK_TIMEOUT 1000
28
29static int32_t stat_load_save;
30static time_t last_housekeeping = 0;
31
32void init_stat(void)
33{
34 stat_load_save = -100;
35
36 //checking config
37 if (cfg.lb_nbest_readers < 2)
38 cfg.lb_nbest_readers = DEFAULT_NBEST;
39 if (cfg.lb_nfb_readers < 2)
40 cfg.lb_nfb_readers = DEFAULT_NFB;
41 if (cfg.lb_min_ecmcount < 2)
42 cfg.lb_min_ecmcount = DEFAULT_MIN_ECM_COUNT;
43 if (cfg.lb_max_ecmcount < 3)
44 cfg.lb_max_ecmcount = DEFAULT_MAX_ECM_COUNT;
45 if (cfg.lb_reopen_seconds < 10)
46 cfg.lb_reopen_seconds = DEFAULT_REOPEN_SECONDS;
47 if (cfg.lb_retrylimit <= 0)
48 cfg.lb_retrylimit = DEFAULT_RETRYLIMIT;
49 if (cfg.lb_stat_cleanup <= 0)
50 cfg.lb_stat_cleanup = DEFAULT_LB_STAT_CLEANUP;
51}
52
53#define LINESIZE 1024
54
55static uint32_t get_prid(uint16_t caid, uint32_t prid)
56{
57 int32_t i;
58 for (i=0;i<CS_MAXCAIDTAB;i++) {
59 uint16_t tcaid = cfg.lb_noproviderforcaid.caid[i];
60 if (!tcaid) break;
61 if ((tcaid == caid) || (tcaid < 0x0100 && (caid >> 8) == tcaid)) {
62 prid = 0;
63 break;
64 }
65
66 }
67 return prid;
68}
69
70static uint32_t get_subid(ECM_REQUEST *er)
71{
72 if (!er->ecmlen)
73 return 0;
74
75 uint32_t id = 0;
76 switch (er->caid>>8)
77 {
78 case 0x01: id = b2i(2, er->ecm+7); break;
79 case 0x06: id = b2i(2, er->ecm+6); break;
80 case 0x09: id = b2i(2, er->ecm+11); break;
81 case 0x4A: // DRE-Crypt, Bulcrypt, others?
82 if (er->caid != 0x4AEE) // Bulcrypt
83 id = er->ecm[7];
84 break;
85 }
86 return id;
87}
88
89
90static void get_stat_query(ECM_REQUEST *er, STAT_QUERY *q)
91{
92 memset(q, 0, sizeof(STAT_QUERY));
93
94 q->caid = er->caid;
95 q->prid = get_prid(er->caid, er->prid);
96 q->srvid = er->srvid;
97 q->chid = get_subid(er);
98 q->ecmlen = er->ecmlen;
99}
100
101void load_stat_from_file(void)
102{
103 stat_load_save = 0;
104 char buf[256];
105 char *line;
106 char *fname;
107 FILE *file;
108
109 if (!cfg.lb_savepath) {
110 get_tmp_dir_filename(buf, sizeof(buf), "stat");
111 fname = buf;
112 }
113 else
114 fname = cfg.lb_savepath;
115
116 file = fopen(fname, "r");
117
118 if (!file) {
119 cs_log("loadbalancer: can't read from file %s", fname);
120 return;
121 }
122
123 if (!cs_malloc(&line, LINESIZE)) {
124 fclose(file);
125 return;
126 }
127
128 setvbuf(file, NULL, _IOFBF, 128*1024);
129
130 cs_debug_mask(D_LB, "loadbalancer: load statistics from %s", fname);
131
132 struct timeb ts, te;
133 cs_ftime(&ts);
134
135 struct s_reader *rdr = NULL;
136 READER_STAT *s;
137
138 int32_t i=1;
139 int32_t valid=0;
140 int32_t count=0;
141 int32_t type=0;
142 char *ptr, *saveptr1 = NULL;
143 char *split[12];
144
145 while (fgets(line, LINESIZE, file))
146 {
147 if (!line[0] || line[0] == '#' || line[0] == ';')
148 continue;
149
150 if (!cs_malloc(&s, sizeof(READER_STAT)))
151 continue;
152
153 //get type by evaluating first line:
154 if (type==0) {
155 if (strstr(line, " rc ")) type = 2;
156 else type = 1;
157 }
158
159 if (type==1) { //New format - faster parsing:
160 for (i = 0, ptr = strtok_r(line, ",", &saveptr1); ptr && i<12 ; ptr = strtok_r(NULL, ",", &saveptr1), i++)
161 split[i] = ptr;
162 valid = (i==11);
163 if (valid) {
164 strncpy(buf, split[0], sizeof(buf)-1);
165 s->rc = atoi(split[1]);
166 s->caid = a2i(split[2], 4);
167 s->prid = a2i(split[3], 6);
168 s->srvid = a2i(split[4], 4);
169 s->chid = a2i(split[5], 4);
170 s->time_avg = atoi(split[6]);
171 s->ecm_count = atoi(split[7]);
172 s->last_received = atol(split[8]);
173 s->fail_factor = atoi(split[9]);
174 s->ecmlen = a2i(split[10], 2);
175 }
176 } else { //Old format - keep for compatibility:
177 i = sscanf(line, "%255s rc %04d caid %04hX prid %06X srvid %04hX time avg %dms ecms %d last %ld fail %d len %02hX\n",
178 buf, &s->rc, &s->caid, &s->prid, &s->srvid,
179 &s->time_avg, &s->ecm_count, &s->last_received, &s->fail_factor, &s->ecmlen);
180 valid = i>5;
181 }
182
183 if (valid && s->ecmlen > 0) {
184 if (rdr == NULL || strcmp(buf, rdr->label) != 0) {
185 LL_ITER itr = ll_iter_create(configured_readers);
186 while ((rdr=ll_iter_next(&itr))) {
187 if (strcmp(rdr->label, buf) == 0) {
188 break;
189 }
190 }
191 }
192
193 if (rdr != NULL && strcmp(buf, rdr->label) == 0) {
194 if (!rdr->lb_stat) {
195 rdr->lb_stat = ll_create("lb_stat");
196 cs_lock_create(&rdr->lb_stat_lock, DEFAULT_LOCK_TIMEOUT, rdr->label);
197 }
198
199 ll_append(rdr->lb_stat, s);
200 count++;
201 }
202 else
203 {
204 cs_log("loadbalancer: statistics could not be loaded for %s", buf);
205 free(s);
206 }
207 }
208 else
209 {
210 cs_debug_mask(D_LB, "loadbalancer: statistics ERROR: %s rc=%d i=%d", buf, s->rc, i);
211 free(s);
212 }
213 }
214 fclose(file);
215 free(line);
216
217 cs_ftime(&te);
218#ifdef WITH_DEBUG
219 int32_t load_time = 1000*(te.time-ts.time)+te.millitm-ts.millitm;
220
221 cs_debug_mask(D_LB, "loadbalancer: statistics loaded %d records in %dms", count, load_time);
222#endif
223}
224
225/**
226 * get statistic values for reader ridx and caid/prid/srvid/ecmlen
227 **/
228static READER_STAT *get_stat_lock(struct s_reader *rdr, STAT_QUERY *q, int8_t lock)
229{
230 if (!rdr->lb_stat) {
231 rdr->lb_stat = ll_create("lb_stat");
232 cs_lock_create(&rdr->lb_stat_lock, DEFAULT_LOCK_TIMEOUT, rdr->label);
233 }
234
235 if (lock) cs_readlock(&rdr->lb_stat_lock);
236
237 LL_ITER it = ll_iter_create(rdr->lb_stat);
238 READER_STAT *s;
239 int32_t i = 0;
240 while ((s = ll_iter_next(&it))) {
241 i++;
242 if (s->caid==q->caid && s->prid==q->prid && s->srvid==q->srvid && s->chid==q->chid) {
243 if (s->ecmlen == q->ecmlen)
244 break;
245 if (!s->ecmlen) {
246 s->ecmlen = q->ecmlen;
247 break;
248 }
249 if (!q->ecmlen) //Query without ecmlen from dvbapi
250 break;
251 }
252 }
253 if (lock) cs_readunlock(&rdr->lb_stat_lock);
254
255 //Move stat to list start for faster access:
256// if (i > 10 && s) {
257// if (lock) cs_writelock(&rdr->lb_stat_lock);
258// ll_iter_move_first(&it);
259// if (lock) cs_writeunlock(&rdr->lb_stat_lock);
260// } Corsair removed, could cause crashes!
261
262 return s;
263}
264
265/**
266 * get statistic values for reader ridx and caid/prid/srvid/ecmlen
267 **/
268static READER_STAT *get_stat(struct s_reader *rdr, STAT_QUERY *q)
269{
270 return get_stat_lock(rdr, q, 1);
271}
272
273/**
274 * Calculates average time
275 */
276static void calc_stat(READER_STAT *s)
277{
278 int32_t i, c=0, t = 0;
279 for (i = 0; i < LB_MAX_STAT_TIME; i++) {
280 if (s->time_stat[i] > 0) {
281 t += (int32_t)s->time_stat[i];
282 c++;
283 }
284 }
285 if (!c)
286 s->time_avg = UNDEF_AVG_TIME;
287 else
288 s->time_avg = t / c;
289}
290
291/**
292 * Saves statistik to /tmp/.oscam/stat.n where n is reader-index
293 */
294static void save_stat_to_file_thread(void)
295{
296 stat_load_save = 0;
297 char buf[256];
298
299 set_thread_name(__func__);
300
301 char *fname;
302 if (!cfg.lb_savepath) {
303 get_tmp_dir_filename(buf, sizeof(buf), "stat");
304 fname = buf;
305 }
306 else
307 fname = cfg.lb_savepath;
308
309 FILE *file = fopen(fname, "w");
310
311 if (!file) {
312 cs_log("can't write to file %s", fname);
313 return;
314 }
315
316 setvbuf(file, NULL, _IOFBF, 128*1024);
317
318 struct timeb ts, te;
319 cs_ftime(&ts);
320
321 time_t cleanup_time = time(NULL) - (cfg.lb_stat_cleanup*60*60);
322
323 int32_t count=0;
324 struct s_reader *rdr;
325 LL_ITER itr = ll_iter_create(configured_readers);
326 while ((rdr=ll_iter_next(&itr))) {
327
328 if (rdr->lb_stat) {
329 cs_writelock(&rdr->lb_stat_lock);
330 LL_ITER it = ll_iter_create(rdr->lb_stat);
331 READER_STAT *s;
332 while ((s = ll_iter_next(&it))) {
333
334 if (s->last_received < cleanup_time || !s->ecmlen) { //cleanup old stats
335 ll_iter_remove_data(&it);
336 continue;
337 }
338
339 //Old version, too slow to parse:
340 //fprintf(file, "%s rc %d caid %04hX prid %06X srvid %04hX time avg %dms ecms %d last %ld fail %d len %02hX\n",
341 // rdr->label, s->rc, s->caid, s->prid,
342 // s->srvid, s->time_avg, s->ecm_count, s->last_received, s->fail_factor, s->ecmlen);
343
344 //New version:
345 fprintf(file, "%s,%d,%04hX,%06X,%04hX,%04hX,%d,%d,%ld,%d,%02hX\n",
346 rdr->label, s->rc, s->caid, s->prid,
347 s->srvid, (uint16_t)s->chid, s->time_avg, s->ecm_count, s->last_received, s->fail_factor, s->ecmlen);
348
349 count++;
350// if (count % 500 == 0) { //Saving stats is using too much cpu and causes high file load. so we need a break
351// cs_readunlock(&rdr->lb_stat_lock);
352// cs_sleepms(100);
353// cs_readlock(&rdr->lb_stat_lock);
354// }
355 }
356 cs_writeunlock(&rdr->lb_stat_lock);
357 }
358 }
359
360 fclose(file);
361
362 cs_ftime(&te);
363 int32_t load_time = 1000*(te.time-ts.time)+te.millitm-ts.millitm;
364
365
366 cs_log("loadbalancer: statistic saved %d records to %s in %dms", count, fname, load_time);
367}
368
369void save_stat_to_file(int32_t thread)
370{
371 stat_load_save = 0;
372 if (thread)
373 start_thread((void*)&save_stat_to_file_thread, "save lb stats");
374 else
375 save_stat_to_file_thread();
376}
377
378/**
379 * fail_factor is multiplied to the reopen_time. This function increases the fail_factor
380 **/
381static void inc_fail(READER_STAT *s)
382{
383 if (s->fail_factor <= 0)
384 s->fail_factor = 1;
385 else
386 s->fail_factor *= 2;
387}
388
389static READER_STAT *get_add_stat(struct s_reader *rdr, STAT_QUERY *q)
390{
391 if (!rdr->lb_stat) {
392 rdr->lb_stat = ll_create("lb_stat");
393 cs_lock_create(&rdr->lb_stat_lock, DEFAULT_LOCK_TIMEOUT, rdr->label);
394 }
395
396 cs_writelock(&rdr->lb_stat_lock);
397
398 READER_STAT *s = get_stat_lock(rdr, q, 0);
399 if (!s) {
400 if (cs_malloc(&s,sizeof(READER_STAT))) {
401 s->caid = q->caid;
402 s->prid = q->prid;
403 s->srvid = q->srvid;
404 s->chid = q->chid;
405 s->ecmlen = q->ecmlen;
406 s->time_avg = UNDEF_AVG_TIME; //dummy placeholder
407 s->rc = E_NOTFOUND;
408 s->last_received = time(NULL);
409 ll_append(rdr->lb_stat, s);
410 }
411 }
412
413 if (s->ecm_count < 0)
414 s->ecm_count=0;
415
416 cs_writeunlock(&rdr->lb_stat_lock);
417
418 return s;
419}
420
421static void housekeeping_stat(int32_t force);
422
423/**
424 * Adds caid/prid/srvid/ecmlen to stat-list for reader ridx with time/rc
425 */
426static void add_stat(struct s_reader *rdr, ECM_REQUEST *er, int32_t ecm_time, int32_t rc)
427{
428 if (!rdr || !er || !cfg.lb_mode ||!er->ecmlen || !er->client)
429 return;
430
431 struct s_client *cl = rdr->client;
432 if (!cl)
433 return;
434
435 READER_STAT *s;
436
437 //inc ecm_count if found, drop to 0 if not found:
438 // rc codes:
439 // 0 = found +
440 // 1 = cache1 #
441 // 2 = cache2 #
442 // 3 = cacheex #
443 // 4 = not found -
444 // 5 = timeout -2
445 // 6 = sleeping #
446 // 7 = fake #
447 // 8 = invalid #
448 // 9 = corrupt #
449 // 10= no card #
450 // 11= expdate #
451 // 12= disabled #
452 // 13= stopped #
453 // 100= unhandled #
454 // + = adds statistic values
455 // # = ignored because of duplicate values, temporary failures or softblocks
456 // - = causes loadbalancer to block this reader for this caid/prov/sid
457 // -2 = causes loadbalancer to block if happens too often
458
459
460 if (rc == E_NOTFOUND && (uint32_t)ecm_time >= cfg.ctimeout) //Map "not found" to "timeout" if ecm_time>client time out
461 rc = E_TIMEOUT;
462
463 if ((uint32_t)ecm_time >= 3*cfg.ctimeout) //ignore too old ecms
464 return;
465
466 STAT_QUERY q;
467 get_stat_query(er, &q);
468
469 time_t now = time(NULL);
470
471 if (rc == E_FOUND) { //found
472 s = get_add_stat(rdr, &q);
473 s->rc = E_FOUND;
474 s->ecm_count++;
475 s->last_received = now;
476 s->fail_factor = 0;
477
478 //FASTEST READER:
479 s->time_idx++;
480 if (s->time_idx >= LB_MAX_STAT_TIME)
481 s->time_idx = 0;
482 s->time_stat[s->time_idx] = ecm_time;
483 calc_stat(s);
484
485 //OLDEST READER now set by get best reader!
486
487
488 //USAGELEVEL:
489 /* Assign a value to rdr->lb_usagelevel_ecmcount,
490 because no determined value was assigned before. */
491 if (rdr->lb_usagelevel_ecmcount < 0)
492 rdr->lb_usagelevel_ecmcount = 0;
493
494 rdr->lb_usagelevel_ecmcount++; /* ecm is found so counter should increase */
495 if ((rdr->lb_usagelevel_ecmcount % cfg.lb_min_ecmcount) == 0) //update every MIN_ECM_COUNT usagelevel:
496 {
497 time_t t = (now - rdr->lb_usagelevel_time);
498 rdr->lb_usagelevel = 1000 / (t<1?1:t);
499 /* Reset of usagelevel time and counter */
500 rdr->lb_usagelevel_time = now;
501 rdr->lb_usagelevel_ecmcount = 0;
502 }
503
504 }
505 else if (rc < E_NOTFOUND ) { //cache1+2+3
506 //no increase of statistics here, cachetime is not real time
507 s = get_stat(rdr, &q);
508 if (s != NULL)
509 s->last_received = now;
510 return;
511 }
512 else if (rc == E_NOTFOUND||rc == E_INVALID) { //not found / invalid
513 //special rcEx codes means temporary problems, so do not block!
514 //CCcam card can't decode, 0x28=NOK1, 0x29=NOK2
515 //CCcam loop detection = E2_CCCAM_LOOP
516 if (er->rcEx >= LB_NONBLOCK_E2_FIRST) {
517 s = get_stat(rdr, &q);
518 if (s != NULL)
519 s->last_received = now; //to avoid timeouts
520 return;
521 }
522
523 s = get_add_stat(rdr, &q);
524 if (s->rc == E_NOTFOUND) { //we have already "not found", so we change the time. In some cases (with services/ident set) the failing reader is selected again:
525 if (ecm_time < 100)
526 ecm_time = 100;
527 s->time_avg += ecm_time;
528 }
529
530 if (s->ecm_count > cfg.lb_min_ecmcount) //there were many founds? Do not close, give them another chance
531 s->ecm_count = 0;
532 else
533 s->rc = rc;
534
535 inc_fail(s);
536 s->last_received = now;
537
538 //reduce ecm_count step by step
539 if (!cfg.lb_reopen_mode)
540 s->ecm_count /= 10;
541 }
542 else if (rc == E_TIMEOUT) { //timeout
543 s = get_add_stat(rdr, &q);
544
545 //catch suddenly occuring timeouts and block reader:
546// if ((int)(now-s->last_received) < (int)(5*cfg.ctimeout) &&
547// s->rc == E_FOUND && s->ecm_count == 0) {
548// s->rc = E_TIMEOUT;
549// //inc_fail(s); //do not inc fail factor in this case
550// }
551 //reader is longer than 5s connected && not more then 5 pending ecms:
552// else if ((cl->login+(int)(2*cfg.ctimeout/1000)) < now && cl->pending < 5 &&
553// s->rc == E_FOUND && s->ecm_count == 0) {
554// s->rc = E_TIMEOUT;
555// inc_fail(s);
556// }
557// else
558 if (!s->ecm_count)
559 s->rc = E_TIMEOUT;
560 else if (s->rc == E_FOUND && now > s->last_received+1) {
561 //search for alternate readers. If we have one, block this reader:
562 int8_t n = 0;
563 struct s_ecm_answer *ea;
564 for (ea = er->matching_rdr; ea; ea = ea->next) {
565 if (ea->reader != rdr && ea->rc < E_NOTFOUND){
566 n = 1;
567 break;
568 }
569 }
570 if (n > 0) //We have alternative readers, so we can block this one:
571 s->rc = E_TIMEOUT;
572 else { //No other reader found. Inc fail factor and retry lb_min_ecmount times:
573 inc_fail(s);
574 if (s->fail_factor > cfg.lb_min_ecmcount) {
575 s->fail_factor = 0;
576 s->rc = E_TIMEOUT;
577 }
578 }
579 }
580
581 s->last_received = now;
582
583 //add timeout to s:
584 if (ecm_time<=0 || ecm_time > (int)cfg.ctimeout)
585 ecm_time = cfg.ctimeout;
586 s->time_idx++;
587 if (s->time_idx >= LB_MAX_STAT_TIME)
588 s->time_idx = 0;
589 s->time_stat[s->time_idx] = ecm_time;
590 calc_stat(s);
591 }
592 else
593 {
594#ifdef WITH_DEBUG
595 if (rc >= E_FOUND && (D_LB & cs_dblevel)) {
596 char buf[ECM_FMT_LEN];
597 format_ecm(er, buf, ECM_FMT_LEN);
598 cs_debug_mask(D_LB, "loadbalancer: not handled stat for reader %s: rc %d %s time %dms",
599 rdr->label, rc, buf, ecm_time);
600 }
601#endif
602 return;
603 }
604
605 housekeeping_stat(0);
606
607#ifdef WITH_DEBUG
608 if (D_LB & cs_dblevel) {
609 char buf[ECM_FMT_LEN];
610 format_ecm(er, buf, ECM_FMT_LEN);
611 cs_debug_mask(D_LB, "loadbalancer: adding stat for reader %s: rc %d %s time %dms fail %d",
612 rdr->label, rc, buf, ecm_time, s->fail_factor);
613 }
614#endif
615
616 if (cfg.lb_save) {
617 stat_load_save++;
618 if (stat_load_save > cfg.lb_save)
619 save_stat_to_file(1);
620 }
621}
622
623static void reset_stat(STAT_QUERY *q)
624{
625 //cs_debug_mask(D_LB, "loadbalance: resetting ecm count");
626 struct s_reader *rdr;
627 cs_readlock(&readerlist_lock);
628 for (rdr=first_active_reader; rdr ; rdr=rdr->next) {
629 if (rdr->lb_stat && rdr->client) {
630 READER_STAT *s = get_stat(rdr, q);
631 if (s) {
632 if (s->ecm_count > 0)
633 s->ecm_count = 1; //not zero, so we know it's decodeable
634 s->rc = E_FOUND;
635 s->fail_factor = 0;
636 }
637 }
638 }
639 cs_readunlock(&readerlist_lock);
640}
641
642int32_t clean_stat_by_rc(struct s_reader *rdr, int8_t rc, int8_t inverse)
643{
644 int32_t count = 0;
645 if (rdr && rdr->lb_stat) {
646 cs_writelock(&rdr->lb_stat_lock);
647 READER_STAT *s;
648 LL_ITER itr = ll_iter_create(rdr->lb_stat);
649 while ((s = ll_iter_next(&itr))) {
650 if ((!inverse && s->rc == rc) || (inverse && s->rc != rc)) {
651 ll_iter_remove_data(&itr);
652 count++;
653 }
654 }
655 cs_writeunlock(&rdr->lb_stat_lock);
656 }
657 return count;
658}
659
660int32_t clean_all_stats_by_rc(int8_t rc, int8_t inverse)
661{
662 int32_t count = 0;
663 LL_ITER itr = ll_iter_create(configured_readers);
664 struct s_reader *rdr;
665 while ((rdr = ll_iter_next(&itr))) {
666 count += clean_stat_by_rc(rdr, rc, inverse);
667 }
668 save_stat_to_file(0);
669 return count;
670}
671
672int32_t clean_stat_by_id(struct s_reader *rdr, uint16_t caid, uint32_t prid, uint16_t srvid, uint16_t chid, uint16_t ecmlen)
673{
674 int32_t count = 0;
675 if (rdr && rdr->lb_stat) {
676
677 cs_writelock(&rdr->lb_stat_lock);
678 READER_STAT *s;
679 LL_ITER itr = ll_iter_create(rdr->lb_stat);
680 while ((s = ll_iter_next(&itr))) {
681 if (s->caid == caid &&
682 s->prid == prid &&
683 s->srvid == srvid &&
684 s->chid == chid &&
685 s->ecmlen == ecmlen) {
686 ll_iter_remove_data(&itr);
687 count++;
688 break; // because the entry should unique we can left here
689 }
690 }
691 cs_writeunlock(&rdr->lb_stat_lock);
692 }
693 return count;
694}
695
696
697static int32_t has_ident(FTAB *ftab, ECM_REQUEST *er) {
698
699 if (!ftab || !ftab->filts)
700 return 0;
701
702 int32_t j, k;
703
704 for (j = 0; j < ftab->nfilts; j++) {
705 if (ftab->filts[j].caid) {
706 if (ftab->filts[j].caid==er->caid) { //caid matches!
707 int32_t nprids = ftab->filts[j].nprids;
708 if (!nprids) // No Provider ->Ok
709 return 1;
710
711 for (k = 0; k < nprids; k++) {
712 uint32_t prid = ftab->filts[j].prids[k];
713 if (prid == er->prid) { //Provider matches
714 return 1;
715 }
716 }
717 }
718 }
719 }
720 return 0; //No match!
721}
722
723static int32_t get_retrylimit(ECM_REQUEST *er) {
724 int32_t i;
725 for (i = 0; i < cfg.lb_retrylimittab.n; i++) {
726 if (cfg.lb_retrylimittab.caid[i] == er->caid || cfg.lb_retrylimittab.caid[i] == er->caid>>8)
727 return cfg.lb_retrylimittab.value[i];
728 }
729 return cfg.lb_retrylimit;
730}
731
732static int32_t get_nbest_readers(ECM_REQUEST *er) {
733 int32_t i;
734 for (i = 0; i < cfg.lb_nbest_readers_tab.n; i++) {
735 if (cfg.lb_nbest_readers_tab.caid[i] == er->caid || cfg.lb_nbest_readers_tab.caid[i] == er->caid>>8)
736 return cfg.lb_nbest_readers_tab.value[i];
737 }
738 return cfg.lb_nbest_readers;
739}
740
741static time_t get_reopen_seconds(READER_STAT *s)
742{
743 int32_t max = (INT_MAX / cfg.lb_reopen_seconds);
744 if (max > 9999) max = 9999;
745 if (s->fail_factor > max)
746 s->fail_factor = max;
747 if (!s->fail_factor)
748 return cfg.lb_reopen_seconds;
749 return (time_t)s->fail_factor * (time_t)cfg.lb_reopen_seconds;
750}
751
752static void convert_to_beta_int(ECM_REQUEST *er, uint16_t caid_to)
753{
754 unsigned char md5tmp[MD5_DIGEST_LENGTH];
755 convert_to_beta(er->client, er, caid_to);
756 // update ecmd5 for store ECM in cache
757 memcpy(er->ecmd5, MD5(er->ecm+13, er->ecmlen-13, md5tmp), CS_ECMSTORESIZE);
758 cacheex_update_hash(er);
759 er->btun = 2; //marked as auto-betatunnel converted. Also for fixing recursive lock in get_cw
760}
761
762
763static void convert_to_nagra_int(ECM_REQUEST *er, uint16_t caid_to)
764{
765 unsigned char md5tmp[MD5_DIGEST_LENGTH];
766 convert_to_nagra(er->client, er, caid_to);
767 // update ecmd5 for store ECM in cache
768 memcpy(er->ecmd5, MD5(er->ecm+3, er->ecmlen-3, md5tmp), CS_ECMSTORESIZE);
769 cacheex_update_hash(er);
770 er->btun = 2; //marked as auto-betatunnel converted. Also for fixing recursive lock in get_cw
771}
772
773uint16_t get_betatunnel_caid_to(uint16_t caid){
774 int32_t lbbm = cfg.lb_auto_betatunnel_mode;
775 if (lbbm <=3) {
776 if (caid == 0x1801) return 0x1722;
777 if (caid == 0x1833) return 0x1702;
778 if (caid == 0x1834) return 0x1722;
779 if (caid == 0x1835) return 0x1722;
780 }
781 if (lbbm >=1) {
782 if (caid == 0x1702) return 0x1833;
783 }
784 if (lbbm == 1 || lbbm == 4 ) {
785 if (caid == 0x1722) return 0x1801;
786 } else if (lbbm == 2 || lbbm == 5 ) {
787 if (caid == 0x1722) return 0x1834;
788 } else if (lbbm == 3 || lbbm == 6 ) {
789 if (caid == 0x1722) return 0x1835;
790 }
791 return 0;
792}
793
794void check_lb_auto_betatunnel_mode(ECM_REQUEST *er) {
795 int32_t lbbm = cfg.lb_auto_betatunnel_mode;
796 if ( lbbm == 1 || lbbm == 4) {
797 er->caid = 0x1801;
798 } else if ( lbbm == 2 || lbbm == 5) {
799 er->caid = 0x1834;
800 } else if ( lbbm == 3 || lbbm == 6) {
801 er->caid = 0x1835;
802 }
803 ////no other way to autodetect is 1801,1834 or 1835
804}
805
806uint16_t get_rdr_caid(struct s_reader *rdr) {
807 if (is_network_reader(rdr)) {
808 return 0; //reader caid is not real caid
809 } else {
810 return rdr->caid;
811 }
812}
813
814uint16_t is_betatunnel_caid(uint16_t caid){
815 if (caid == 0x1702 || caid == 0x1722 || caid == 0x1801 || caid == 0x1833 || caid == 0x1834 || caid == 0x1835) return 1;
816 return 0;
817}
818
819/**
820 * Gets best reader for caid/prid/srvid/ecmlen.
821 * Best reader is evaluated by lowest avg time but only if ecm_count > cfg.lb_min_ecmcount (5)
822 * Also the reader is asked if he is "available"
823 * returns ridx when found or -1 when not found
824 */
825void stat_get_best_reader(ECM_REQUEST *er)
826{
827 if (!cfg.lb_mode || cfg.lb_mode==LB_LOG_ONLY)
828 return;
829
830 if (!er->reader_avail)
831 return;
832
833 struct s_reader *rdr;
834 struct s_ecm_answer *ea;
835
836 //preferred card forwarding (CCcam client):
837 if (cccam_forward_origin_card(er))
838 return;
839
840 STAT_QUERY q;
841 get_stat_query(er, &q);
842
843 //auto-betatunnel: The trick is: "let the loadbalancer decide"!
844 if (cfg.lb_auto_betatunnel && er->caid >> 8 == 0x18 && er->ecmlen) { //nagra
845 uint16_t caid_to = get_betatunnel_caid_to(er->caid);
846 if (caid_to) {
847 int8_t needs_stats_nagra = 1, needs_stats_beta = 1;
848
849 //Clone query parameters for beta:
850 STAT_QUERY qbeta = q;
851 qbeta.caid = caid_to;
852 qbeta.prid = 0;
853 qbeta.ecmlen = er->ecm[2] + 3 + 10;
854
855 int32_t time_nagra = 0;
856 int32_t time_beta = 0;
857 int32_t weight;
858 int32_t ntime;
859
860 READER_STAT *stat_nagra = NULL;
861 READER_STAT *stat_beta = NULL;
862
863 //What is faster? nagra or beta?
864 int8_t isn;
865 int8_t isb;
866 int8_t overall_valid = 0;
867 int8_t overall_nvalid = 0;
868 for(ea = er->matching_rdr; ea; ea = ea->next) {
869 isn = 0;
870 isb = 0;
871 rdr = ea->reader;
872 weight = rdr->lb_weight;
873 if (weight <= 0) weight = 1;
874
875
876 //Check if betatunnel is allowed on this reader:
877 int8_t valid = chk_ctab(caid_to, &rdr->ctab) //Check caid
878 && chk_rfilter2(caid_to, 0, rdr) //Ident
879 && chk_srvid_by_caid_prov_rdr(rdr, caid_to, 0) //Services
880 && (!get_rdr_caid(rdr) || chk_caid_rdr(rdr,caid_to)); //rdr-caid
881 if (valid) {
882 stat_beta = get_stat(rdr, &qbeta);
883 overall_valid = 1;
884 }
885 //else
886 //stat_beta = NULL;
887
888 //Check if nagra is allowed on this reader:
889 int8_t nvalid = chk_ctab(er->caid, &rdr->ctab)//Check caid
890 && chk_rfilter2(er->caid, 0, rdr) //Ident
891 && chk_srvid_by_caid_prov_rdr(rdr, er->caid, 0) //Services
892 && (!get_rdr_caid(rdr) || chk_caid_rdr(rdr,er->caid)); //rdr-caid
893 if (nvalid) {
894 stat_nagra = get_stat(rdr, &q);
895 overall_nvalid = 1;
896 }
897
898 //calculate nagra data:
899 if (stat_nagra && stat_nagra->rc == E_FOUND) {
900 ntime = stat_nagra->time_avg*100/weight;
901 if (!time_nagra || ntime < time_nagra)
902 time_nagra = ntime;
903 }
904
905 //calculate beta data:
906 if (stat_beta && stat_beta->rc == E_FOUND) {
907 ntime = stat_beta->time_avg*100/weight;
908 if (!time_beta || ntime < time_beta)
909 time_beta = ntime;
910 }
911
912 //Uncomplete reader evaluation, we need more stats!
913 if (stat_nagra){
914 needs_stats_nagra = 0;
915 isn = 1;
916 }
917 if (stat_beta){
918 needs_stats_beta = 0;
919 isb = 1;
920 }
921 cs_debug_mask(D_LB, "loadbalancer-betatunnel valid %d, stat_nagra %d, stat_beta %d, (%04X,%04X)", valid, isn, isb ,get_rdr_caid(rdr),caid_to);
922 }
923
924 if (!overall_valid)//we have no valid betatunnel reader also we don't needs stats (converted)
925 needs_stats_beta = 0;
926
927 if (!overall_nvalid) //we have no valid reader also we don't needs stats (unconverted)
928 needs_stats_nagra = 0;
929
930 if (cfg.lb_auto_betatunnel_prefer_beta && time_beta){
931 time_beta = time_beta * cfg.lb_auto_betatunnel_prefer_beta/100;
932 if (time_beta <= 0)
933 time_beta = 1;
934 }
935
936 if (needs_stats_nagra || needs_stats_beta) {
937 cs_debug_mask(D_LB, "loadbalancer-betatunnel %04X:%04X (%d/%d) needs more statistics...", er->caid, caid_to,
938 needs_stats_nagra, needs_stats_beta);
939 if (needs_stats_beta) { //try beta first
940
941 convert_to_beta_int(er, caid_to);
942 get_stat_query(er, &q);
943 }
944 }
945 else if (time_beta && (!time_nagra || time_beta <= time_nagra)) {
946 cs_debug_mask(D_LB, "loadbalancer-betatunnel %04X:%04X selected beta: n%dms > b%dms", er->caid, caid_to, time_nagra, time_beta);
947 convert_to_beta_int(er, caid_to);
948 get_stat_query(er, &q);
949 }
950 else {
951 cs_debug_mask(D_LB, "loadbalancer-betatunnel %04X:%04X selected nagra: n%dms < b%dms", er->caid, caid_to, time_nagra, time_beta);
952 }
953 // else nagra is faster or no beta, so continue unmodified
954 }
955 } else
956
957
958 if (cfg.lb_auto_betatunnel && (er->caid == 0x1702 || er->caid == 0x1722) && er->ocaid == 0x0000 && er->ecmlen) { //beta
959 uint16_t caid_to = get_betatunnel_caid_to(er->caid);
960 if (caid_to) {
961 int8_t needs_stats_nagra = 1, needs_stats_beta = 1;
962
963 //Clone query parameters for beta:
964 STAT_QUERY qnagra = q;
965 qnagra.caid = caid_to;
966 qnagra.prid = 0;
967 qnagra.ecmlen = er->ecm[2] - 7;
968
969 int32_t time_nagra = 0;
970 int32_t time_beta = 0;
971 int32_t weight;
972 int32_t avg_time;
973
974 READER_STAT *stat_nagra = NULL;
975 READER_STAT *stat_beta = NULL;
976 //What is faster? nagra or beta?
977 int8_t isb;
978 int8_t isn;
979 int8_t overall_valid = 0;
980 int8_t overall_bvalid = 0;
981 for(ea = er->matching_rdr; ea; ea = ea->next) {
982 isb = 0;
983 isn = 0;
984 rdr = ea->reader;
985 weight = rdr->lb_weight;
986 if (weight <= 0) weight = 1;
987
988
989
990 //Check if reverse betatunnel is allowed on this reader:
991 int8_t valid = chk_ctab(caid_to, &rdr->ctab)//, rdr->typ) //Check caid
992 && chk_rfilter2(caid_to, 0, rdr) //Ident
993 && chk_srvid_by_caid_prov_rdr(rdr, caid_to, 0) //Services
994 && (!get_rdr_caid(rdr) || chk_caid_rdr(rdr,caid_to)); //rdr-caid
995 if (valid) {
996 stat_nagra = get_stat(rdr, &qnagra);
997 overall_valid = 1;
998 }
999 //else
1000 //stat_nagra = NULL;
1001
1002 //Check if beta is allowed on this reader:
1003 int8_t bvalid = chk_ctab(er->caid, &rdr->ctab)//, rdr->typ) //Check caid
1004 && chk_rfilter2(er->caid, 0, rdr) //Ident
1005 && chk_srvid_by_caid_prov_rdr(rdr, er->caid, 0) //Services
1006 && (!get_rdr_caid(rdr) || chk_caid_rdr(rdr,er->caid)); //rdr-caid
1007 if (bvalid) {
1008 stat_beta = get_stat(rdr, &q);
1009 overall_bvalid = 1;
1010 }
1011
1012 //calculate nagra data:
1013 if (stat_nagra && stat_nagra->rc == E_FOUND) {
1014 avg_time = stat_nagra->time_avg*100/weight;
1015 if (!time_nagra || avg_time < time_nagra)
1016 time_nagra = avg_time;
1017 }
1018
1019 //calculate beta data:
1020 if (stat_beta && stat_beta->rc == E_FOUND) {
1021 avg_time = stat_beta->time_avg*100/weight;
1022 if (!time_beta || avg_time < time_beta)
1023 time_beta = avg_time;
1024 }
1025
1026 //Uncomplete reader evaluation, we need more stats!
1027 if (stat_beta){
1028 needs_stats_beta = 0;
1029 isb = 1;
1030 }
1031 if (stat_nagra){
1032 needs_stats_nagra = 0;
1033 isn = 1;
1034 }
1035 cs_debug_mask(D_LB, "loadbalancer-betatunnel valid %d, stat_beta %d, stat_nagra %d, (%04X,%04X)", valid, isb, isn ,get_rdr_caid(rdr),caid_to);
1036 }
1037
1038 if (!overall_valid)//we have no valid reverse betatunnel reader also we don't needs stats (converted)
1039 needs_stats_nagra = 0;
1040
1041 if (!overall_bvalid) //we have no valid reader also we don't needs stats (unconverted)
1042 needs_stats_beta = 0;
1043
1044 if (cfg.lb_auto_betatunnel_prefer_beta && time_beta) {
1045 time_beta = time_beta * cfg.lb_auto_betatunnel_prefer_beta/100;
1046 if (time_beta < 0)
1047 time_beta = 0;
1048 }
1049
1050 //if we needs stats, we send 2 ecm requests: 18xx and 17xx:
1051 if (needs_stats_nagra || needs_stats_beta) {
1052 cs_debug_mask(D_LB, "loadbalancer-betatunnel %04X:%04X (%d/%d) needs more statistics...", er->caid, caid_to,
1053 needs_stats_beta, needs_stats_nagra);
1054 if (needs_stats_nagra){// try nagra frist
1055
1056 convert_to_nagra_int(er, caid_to);
1057 get_stat_query(er, &q);
1058
1059 }
1060 }
1061 else if (time_nagra && (!time_beta || time_nagra <= time_beta)) {
1062 cs_debug_mask(D_LB, "loadbalancer-betatunnel %04X:%04X selected nagra: b%dms > n%dms", er->caid, caid_to, time_beta, time_nagra);
1063 convert_to_nagra_int(er, caid_to);
1064 get_stat_query(er, &q);
1065 }
1066 else {
1067 cs_debug_mask(D_LB, "loadbalancer-betatunnel %04X:%04X selected beta: b%dms < n%dms", er->caid, caid_to, time_beta, time_nagra);
1068 }
1069
1070 }
1071 }
1072
1073 if (cfg.lb_auto_betatunnel && is_betatunnel_caid(er->caid)) {
1074 //check again is caid valied to reader
1075 //with both caid on local readers or with proxy
1076 //(both caid will setup to reader for make tunnel caid in share (ccc) visible)
1077 //make sure dosn't send a beta ecm to nagra reader (or reverse)
1078 struct s_ecm_answer *prv = NULL;
1079 for(ea = er->matching_rdr; ea; ea = ea->next) {
1080 rdr = ea->reader;
1081 if (is_network_reader(rdr)) { //reader caid is not real caid
1082 prv = ea;
1083 continue; // proxy can convert or reject
1084 }
1085 cs_debug_mask(D_LB, "check again caid %04X on reader %s", er->caid, rdr->label);
1086 if ( !get_rdr_caid(ea->reader) || chk_caid_rdr(ea->reader,er->caid)) {
1087 prv = ea;
1088 } else {
1089 if (!rdr->fallback) er->reader_avail--;
1090 cs_debug_mask(D_LB, "caid %04X not found in caidlist, reader %s removed from request reader list", er->caid, rdr->label);
1091 if (prv){
1092 prv->next = ea->next;
1093 } else
1094 er->matching_rdr = ea->next;
1095 }
1096 }
1097 if (!er->reader_avail)
1098 return;
1099 }
1100
1101 struct timeb check_time;
1102 cs_ftime(&check_time);
1103 time_t current_time = time(NULL);
1104 int32_t current = -1;
1105 READER_STAT *s = NULL;
1106 int32_t retrylimit = get_retrylimit(er);
1107 int32_t reader_count = 0;
1108 int32_t new_stats = 0;
1109 int32_t nlocal_readers = 0;
1110 int32_t nbest_readers = get_nbest_readers(er);
1111 int32_t nfb_readers = cfg.lb_nfb_readers;
1112 int32_t nreaders = cfg.lb_max_readers;
1113 if (!nreaders)
1114 nreaders = -1;
1115 else if (nreaders <= cfg.lb_nbest_readers)
1116 nreaders = cfg.lb_nbest_readers+1;
1117 int32_t nmaxreopen = nreaders-nbest_readers;
1118 if (nmaxreopen < 1)
1119 {
1120 if (nreaders > 0) nmaxreopen = 1;
1121 else nmaxreopen = er->reader_avail;
1122 }
1123
1124#ifdef WITH_DEBUG
1125 if (cs_dblevel & D_LB) {
1126 //loadbalancer debug output:
1127 int32_t nr = 0;
1128 char buf[512];
1129 int n, l=512;
1130 char *rptr = buf;
1131 *rptr = 0;
1132
1133 for(ea = er->matching_rdr; ea; ea = ea->next) {
1134 nr++;
1135
1136 if (nr>5) continue;
1137
1138 if (!(ea->status & READER_FALLBACK))
1139 n = snprintf(rptr, l, "%s%s%s ", ea->reader->label, (ea->status&READER_CACHEEX)?"*":"", (ea->status&READER_LOCAL)?"L":"");
1140 else
1141 n = snprintf(rptr, l, "[%s%s%s] ", ea->reader->label, (ea->status&READER_CACHEEX)?"*":"", (ea->status&READER_LOCAL)?"L":"");
1142 rptr+=n;
1143 l-=n;
1144 }
1145
1146 if (nr>5)
1147 snprintf(rptr, l, "...(%d more)", nr - 5);
1148
1149 char ecmbuf[ECM_FMT_LEN];
1150 format_ecm(er, ecmbuf, ECM_FMT_LEN);
1151
1152 cs_debug_mask(D_LB, "loadbalancer: client %s for %s: n=%d valid readers: %s",
1153 username(er->client), ecmbuf, nr, buf);
1154 }
1155#endif
1156
1157 for(ea = er->matching_rdr; ea; ea = ea->next) {
1158 ea->status &= ~(READER_ACTIVE|READER_FALLBACK);
1159 ea->value = 0;
1160 }
1161
1162 for(ea = er->matching_rdr; ea; ea = ea->next) {
1163 rdr = ea->reader;
1164#ifdef CS_CACHEEX
1165 int8_t cacheex = rdr->cacheex.mode;
1166 if (cacheex == 1) {
1167 ea->status |= READER_ACTIVE; //no statistics, this reader is a cacheex reader and so always active
1168 continue;
1169 }
1170#endif
1171 struct s_client *cl = rdr->client;
1172 reader_count++;
1173
1174 int32_t weight = rdr->lb_weight <= 0?100:rdr->lb_weight;
1175
1176 s = get_stat(rdr, &q);
1177 if (!s) {
1178 if (nmaxreopen>0) {
1179 cs_debug_mask(D_LB, "loadbalancer: starting statistics for reader %s", rdr->label);
1180 ea->status |= READER_ACTIVE; //no statistics, this reader is active (now) but we need statistics first!
1181 new_stats = 1;
1182 //nreaders--;
1183 if (!cfg.lb_reopen_mode)
1184 nmaxreopen--;
1185 }
1186 continue;
1187 }
1188
1189 if (nmaxreopen>0 && (s->ecm_count < 0||(s->ecm_count > cfg.lb_max_ecmcount && s->time_avg > retrylimit))) {
1190 cs_debug_mask(D_LB, "loadbalancer: max ecms (%d) reached by reader %s, resetting statistics", cfg.lb_max_ecmcount, rdr->label);
1191 reset_stat(&q);
1192 ea->status |= READER_ACTIVE; //max ecm reached, get new statistics
1193 nreaders--;
1194 nmaxreopen--;
1195 continue;
1196 }
1197
1198// if (nreopen_readers && s->rc != E_FOUND && s->last_received+get_reopen_seconds(s) < current_time) {
1199// cs_debug_mask(D_LB, "loadbalancer: reopen reader %s", rdr->label);
1200// reset_stat(er->caid, prid, er->srvid, er->chid, er->ecmlen);
1201// ea->status |= READER_ACTIVE; //max ecm reached, get new statistics
1202// nreopen_readers--;
1203// continue;
1204// }
1205
1206 int32_t hassrvid;
1207 if(cl)
1208 hassrvid = has_srvid(cl, er) || has_ident(&rdr->ftab, er);
1209 else
1210 hassrvid = 0;
1211
1212 if (nmaxreopen>0 && s->rc == E_FOUND && s->ecm_count < cfg.lb_min_ecmcount) {
1213 cs_debug_mask(D_LB, "loadbalancer: reader %s needs more statistics", rdr->label);
1214 ea->status |= READER_ACTIVE; //need more statistics!
1215 new_stats = 1;
1216 nreaders--;
1217 nmaxreopen--;
1218 continue;
1219 }
1220
1221 //Reader can decode this service (rc==0) and has lb_min_ecmcount ecms:
1222 if (s->rc == E_FOUND || hassrvid) {
1223 if (cfg.preferlocalcards && (ea->status & READER_LOCAL))
1224 nlocal_readers++; //Prefer local readers!
1225
1226 switch (cfg.lb_mode) {
1227 default:
1228 case LB_NONE:
1229 case LB_LOG_ONLY:
1230 //cs_debug_mask(D_LB, "loadbalance disabled");
1231 ea->status |= READER_ACTIVE;
1232 if (rdr->fallback)
1233 ea->status |= READER_FALLBACK;
1234 continue;
1235
1236 case LB_FASTEST_READER_FIRST:
1237 current = s->time_avg * 100 / weight;
1238 break;
1239
1240 case LB_OLDEST_READER_FIRST:
1241 if (!rdr->lb_last.time)
1242 rdr->lb_last = check_time;
1243
1244 //current is negative here! the older, the bigger is the difference
1245 current = 1000 * (rdr->lb_last.time - check_time.time) + (rdr->lb_last.millitm - check_time.millitm) - 10;
1246 // current /= weight; /* The others are divided by weight only OLDEST not??*/
1247 if (!current)
1248 current = -1;
1249 break;
1250
1251 case LB_LOWEST_USAGELEVEL:
1252 current = rdr->lb_usagelevel * 100 / weight;
1253 break;
1254 }
1255 cs_debug_mask(D_LB, "rdr %s lbvalue = %d", rdr->label, abs(current));
1256
1257#if defined(WEBIF) || defined(LCDSUPPORT)
1258 rdr->lbvalue = abs(current);
1259#endif
1260
1261 if (cfg.lb_mode != LB_OLDEST_READER_FIRST) { //Adjust selection to reader load:
1262 if (rdr->ph.c_available && !rdr->ph.c_available(rdr, AVAIL_CHECK_LOADBALANCE, er)) {
1263 current=current*2;
1264 }
1265
1266 if (cl && cl->pending)
1267 current=current*cl->pending;
1268
1269 if (s->rc >= E_NOTFOUND) { //when reader has service this is possible
1270 current=current*(s->fail_factor+2); //Mark als slow
1271 }
1272 if (current < 1)
1273 current=1;
1274 }
1275
1276 ea->value = current;
1277 ea->time = s->time_avg;
1278 }
1279 }
1280
1281 if (nlocal_readers > nbest_readers) { //if we have local readers, we prefer them!
1282 nlocal_readers = nbest_readers;
1283 nbest_readers = 0;
1284 }
1285 else
1286 nbest_readers = nbest_readers - nlocal_readers;
1287
1288 struct s_reader *best_rdr = NULL;
1289 struct s_reader *best_rdri = NULL;
1290 int32_t best_time = 0;
1291 int32_t result_count = 0;
1292
1293 int32_t n=0;
1294 while (nreaders) {
1295 struct s_ecm_answer *best = NULL;
1296
1297 for(ea = er->matching_rdr; ea; ea = ea->next) {
1298 if (nlocal_readers && !(ea->status & READER_LOCAL))
1299 continue;
1300
1301 if (ea->value && (!best || ea->value < best->value))
1302 best=ea;
1303 }
1304 if (!best)
1305 break;
1306
1307 n++;
1308 best_rdri = best->reader;
1309 if (!best_rdr) {
1310 best_rdr = best_rdri;
1311 best_time = best->time;
1312 }
1313 best->value = 0;
1314
1315 if (nlocal_readers) {//primary readers, local
1316 nlocal_readers--;
1317 nreaders--;
1318 best->status |= READER_ACTIVE;
1319 }
1320 else if (nbest_readers) {//primary readers, other
1321 nbest_readers--;
1322 nreaders--;
1323 best->status |= READER_ACTIVE;
1324 }
1325 else if (nfb_readers) { //fallbacks:
1326 nfb_readers--;
1327 best->status |= (READER_ACTIVE|READER_FALLBACK);
1328 }
1329 else
1330 break;
1331 result_count++;
1332 }
1333
1334 if (!new_stats && result_count < reader_count) {
1335 if (!n) //no best reader found? reopen if we have ecm_count>0
1336 {
1337 cs_debug_mask(D_LB, "loadbalancer: NO MATCHING READER FOUND, reopen last valid:");
1338 for(ea = er->matching_rdr; ea; ea = ea->next) {
1339 if (!(ea->status&READER_ACTIVE)) {
1340 rdr = ea->reader;
1341 s = get_stat(rdr, &q);
1342 if (s && s->rc != E_FOUND && s->last_received+get_reopen_seconds(s) < current_time) {
1343 if (nreaders) {
1344 ea->status |= READER_ACTIVE;
1345 nreaders--;
1346 cs_debug_mask(D_LB, "loadbalancer: reopened reader %s", rdr->label);
1347 }
1348 n++;
1349 }
1350 }
1351 }
1352 cs_debug_mask(D_LB, "loadbalancer: reopened %d readers", n);
1353 }
1354
1355 //algo for reopen other reader only if responsetime>retrylimit:
1356 int32_t reopen = !best_rdr || (best_time && (best_time > retrylimit));
1357 if (reopen) {
1358#ifdef WITH_DEBUG
1359 if (best_rdr)
1360 cs_debug_mask(D_LB, "loadbalancer: reader %s reached retrylimit (%dms), reopening other readers", best_rdr->label, best_time);
1361 else
1362 cs_debug_mask(D_LB, "loadbalancer: no best reader found, reopening other readers");
1363#endif
1364 for(ea = er->matching_rdr; ea && nreaders; ea = ea->next) {
1365 if (!(ea->status&READER_ACTIVE)) {
1366 rdr = ea->reader;
1367 s = get_stat(rdr, &q);
1368
1369 if (s && s->rc != E_FOUND) { //retrylimit reached:
1370 if (!cfg.lb_reopen_mode) cs_debug_mask(D_LB, "loadbalancer: reader %s need %ld seconds to reopen", ea->reader->label, (s->last_received+get_reopen_seconds(s))-current_time);
1371 if (cfg.lb_reopen_mode || s->last_received+get_reopen_seconds(s) < current_time) { //Retrying reader every (900/conf) seconds
1372 if (cfg.lb_reopen_mode) {
1373 cs_debug_mask(D_LB, "loadbalancer: reader %s reopen fast", rdr->label);
1374 } else {
1375 cs_debug_mask(D_LB, "loadbalancer: reader %s reopen after %ld sec.", rdr->label, get_reopen_seconds(s));
1376 }
1377 s->last_received = current_time;
1378 ea->status |= READER_ACTIVE;
1379 nreaders--;
1380 cs_debug_mask(D_LB, "loadbalancer: retrying reader %s (fail %d)", rdr->label, s->fail_factor);
1381 }
1382 }
1383 }
1384 }
1385 }
1386 }
1387
1388#ifdef WITH_DEBUG
1389 if (cs_dblevel & D_LB) {
1390 //loadbalancer debug output:
1391 int32_t nr = 0;
1392 char buf[512];
1393 int32_t l=512;
1394 char *rptr = buf;
1395 *rptr = 0;
1396
1397 for(ea = er->matching_rdr; ea; ea = ea->next) {
1398 if (!(ea->status & READER_ACTIVE))
1399 continue;
1400
1401 nr++;
1402
1403 if (nr>5) continue;
1404
1405 if (!(ea->status & READER_FALLBACK))
1406 n = snprintf(rptr, l, "%s%s%s ", ea->reader->label, (ea->status&READER_CACHEEX)?"*":"", (ea->status&READER_LOCAL)?"L":"");
1407 else
1408 n = snprintf(rptr, l, "[%s%s%s] ", ea->reader->label, (ea->status&READER_CACHEEX)?"*":"", (ea->status&READER_LOCAL)?"L":"");
1409 rptr+=n;
1410 l-=n;
1411 }
1412
1413 if (nr>5)
1414 snprintf(rptr, l, "...(%d more)", nr - 5);
1415
1416 char ecmbuf[ECM_FMT_LEN];
1417 format_ecm(er, ecmbuf, ECM_FMT_LEN);
1418
1419 cs_debug_mask(D_LB, "loadbalancer: client %s for %s: n=%d selected readers: %s",
1420 username(er->client), ecmbuf, nr, buf);
1421 }
1422#endif
1423 return;
1424}
1425
1426/**
1427 * clears statistic of reader ridx.
1428 **/
1429void clear_reader_stat(struct s_reader *rdr)
1430{
1431 if (!rdr->lb_stat)
1432 return;
1433
1434 ll_clear_data(rdr->lb_stat);
1435}
1436
1437void clear_all_stat(void)
1438{
1439 struct s_reader *rdr;
1440 LL_ITER itr = ll_iter_create(configured_readers);
1441 while ((rdr = ll_iter_next(&itr))) {
1442 clear_reader_stat(rdr);
1443 }
1444}
1445
1446static void housekeeping_stat_thread(void)
1447{
1448 time_t cleanup_time = time(NULL) - (cfg.lb_stat_cleanup*60*60);
1449 int32_t cleaned = 0;
1450 struct s_reader *rdr;
1451 set_thread_name(__func__);
1452 LL_ITER itr = ll_iter_create(configured_readers);
1453 cs_readlock(&readerlist_lock); //this avoids cleaning a reading during writing
1454 while ((rdr = ll_iter_next(&itr))) {
1455 if (rdr->lb_stat) {
1456 cs_writelock(&rdr->lb_stat_lock);
1457 LL_ITER it = ll_iter_create(rdr->lb_stat);
1458 READER_STAT *s;
1459 while ((s=ll_iter_next(&it))) {
1460
1461 if (s->last_received < cleanup_time) {
1462 ll_iter_remove_data(&it);
1463 cleaned++;
1464 }
1465 }
1466 cs_writeunlock(&rdr->lb_stat_lock);
1467 }
1468 }
1469 cs_readunlock(&readerlist_lock);
1470 cs_debug_mask(D_LB, "loadbalancer cleanup: removed %d entries", cleaned);
1471}
1472
1473static void housekeeping_stat(int32_t force)
1474{
1475 time_t now = time(NULL);
1476 if (!force && last_housekeeping + 60*60 > now) //only clean once in an hour
1477 return;
1478
1479 last_housekeeping = now;
1480 start_thread((void*)&housekeeping_stat_thread, "housekeeping lb stats");
1481}
1482
1483static int compare_stat(READER_STAT **ps1, READER_STAT **ps2) {
1484 READER_STAT *s1 = (*ps1), *s2 = (*ps2);
1485 int res = s1->rc - s2->rc;
1486 if (res) return res;
1487 res = s1->caid - s2->caid;
1488 if (res) return res;
1489 res = s1->prid - s2->prid;
1490 if (res) return res;
1491 res = s1->srvid - s2->srvid;
1492 if (res) return res;
1493 res = s1->chid - s2->chid;
1494 if (res) return res;
1495 res = s1->ecmlen - s2->ecmlen;
1496 if (res) return res;
1497 res = s1->last_received - s2->last_received;
1498 return res;
1499}
1500
1501static int compare_stat_r(READER_STAT **ps1, READER_STAT **ps2) {
1502 return -compare_stat(ps1, ps2);
1503}
1504
1505READER_STAT **get_sorted_stat_copy(struct s_reader *rdr, int32_t reverse, int32_t *size)
1506{
1507 if (reverse)
1508 return (READER_STAT **)ll_sort(rdr->lb_stat, compare_stat_r, size);
1509 else
1510 return (READER_STAT **)ll_sort(rdr->lb_stat, compare_stat, size);
1511}
1512
1513static int8_t stat_in_ecmlen(struct s_reader *rdr, READER_STAT *s)
1514{
1515 struct s_ecmWhitelist *tmp;
1516 struct s_ecmWhitelistIdent *tmpIdent;
1517 struct s_ecmWhitelistLen *tmpLen;
1518 for (tmp = rdr->ecmWhitelist; tmp; tmp = tmp->next) {
1519 if (tmp->caid == 0 || (tmp->caid == s->caid)) {
1520 for (tmpIdent = tmp->idents; tmpIdent; tmpIdent = tmpIdent->next) {
1521 if (tmpIdent->ident == 0 || tmpIdent->ident == s->prid) {
1522 for (tmpLen = tmpIdent->lengths; tmpLen; tmpLen = tmpLen->next) {
1523 if (tmpLen->len == s->ecmlen) {
1524 return 1;
1525 }
1526 }
1527 }
1528 }
1529 }
1530 }
1531 return 0;
1532}
1533
1534static int8_t add_to_ecmlen(struct s_reader *rdr, READER_STAT *s)
1535{
1536 struct s_ecmWhitelist *tmp = NULL;
1537 struct s_ecmWhitelistIdent *tmpIdent = NULL;
1538 struct s_ecmWhitelistLen *tmpLen = NULL;
1539
1540 for (tmp = rdr->ecmWhitelist; tmp; tmp = tmp->next) {
1541 if (tmp->caid == s->caid) {
1542 for (tmpIdent = tmp->idents; tmpIdent; tmpIdent = tmpIdent->next) {
1543 if (tmpIdent->ident == s->prid) {
1544 for (tmpLen = tmpIdent->lengths; tmpLen; tmpLen = tmpLen->next) {
1545 if (tmpLen->len == s->ecmlen) {
1546 return 1;
1547 }
1548 }
1549 break;
1550 }
1551 }
1552 break;
1553 }
1554 }
1555
1556 if (!tmp) {
1557 if (cs_malloc(&tmp, sizeof(struct s_ecmWhitelist))) {
1558 tmp->caid = s->caid;
1559 tmp->next = rdr->ecmWhitelist;
1560 rdr->ecmWhitelist = tmp;
1561 }
1562 }
1563
1564 if (!tmpIdent && tmp) {
1565 if (cs_malloc(&tmpIdent, sizeof(struct s_ecmWhitelistIdent))) {
1566 tmpIdent->ident = s->prid;
1567 tmpIdent->next = tmp->idents;
1568 tmp->idents = tmpIdent;
1569 }
1570 }
1571
1572 if (!tmpLen && tmpIdent) {
1573 if (cs_malloc(&tmpLen, sizeof(struct s_ecmWhitelistLen))) {
1574 tmpLen->len = s->ecmlen;
1575 tmpLen->next = tmpIdent->lengths;
1576 tmpIdent->lengths = tmpLen;
1577 }
1578 }
1579
1580 return 0;
1581}
1582
1583void update_ecmlen_from_stat(struct s_reader *rdr)
1584{
1585 if (!rdr || &rdr->lb_stat)
1586 return;
1587
1588 cs_readlock(&rdr->lb_stat_lock);
1589 LL_ITER it = ll_iter_create(rdr->lb_stat);
1590 READER_STAT *s;
1591 while ((s = ll_iter_next(&it))) {
1592 if (s->rc ==E_FOUND) {
1593 if (!stat_in_ecmlen(rdr, s))
1594 add_to_ecmlen(rdr, s);
1595 }
1596 }
1597 cs_readunlock(&rdr->lb_stat_lock);
1598}
1599
1600int32_t lb_valid_btun(ECM_REQUEST *er, uint16_t caidto)
1601{
1602 STAT_QUERY q;
1603 READER_STAT *s;
1604 struct s_reader *rdr;
1605
1606 get_stat_query(er, &q);
1607 q.caid = caidto;
1608
1609 cs_readlock(&readerlist_lock);
1610 for (rdr=first_active_reader; rdr ; rdr=rdr->next) {
1611 if (rdr->lb_stat && rdr->client) {
1612 s = get_stat(rdr, &q);
1613 if (s && s->rc == E_FOUND) {
1614 cs_readunlock(&readerlist_lock);
1615 return 1;
1616 }
1617 }
1618 }
1619 cs_readunlock(&readerlist_lock);
1620 return 0;
1621}
1622
1623/**
1624 * mark as last reader after checked for cache requests:
1625 **/
1626void lb_mark_last_reader(ECM_REQUEST *er)
1627{
1628 //OLDEST_READER: set lb_last
1629 struct s_ecm_answer *ea;
1630 for (ea=er->matching_rdr; ea; ea=ea->next) {
1631 if ((ea->status&(READER_ACTIVE|READER_FALLBACK)) == READER_ACTIVE)
1632 cs_ftime(&ea->reader->lb_last);
1633 }
1634}
1635
1636/**
1637 * Automatic timeout feature depending on statistik values
1638 **/
1639uint32_t lb_auto_timeout(ECM_REQUEST *er, uint32_t ctimeout) {
1640 STAT_QUERY q;
1641 READER_STAT *s = NULL;
1642
1643 struct s_reader *rdr = NULL;
1644 struct s_ecm_answer *ea;
1645
1646 for (ea = er->matching_rdr; ea; ea = ea->next) {
1647 if ((ea->status&(READER_ACTIVE|READER_FALLBACK)) == READER_ACTIVE) {
1648 rdr = ea->reader;
1649 get_stat_query(er, &q);
1650 s = get_stat(rdr, &q);
1651 if (s) break;
1652 }
1653 }
1654 if (!s) return ctimeout;
1655
1656 uint32_t t;
1657 if (s->rc == E_TIMEOUT)
1658 t = ctimeout/2; //timeout known, early timeout!
1659 else {
1660 if (s->ecm_count < cfg.lb_min_ecmcount) return ctimeout;
1661
1662 t = s->time_avg*(100+cfg.lb_auto_timeout_p)/100;
1663 if ((int32_t)(t-s->time_avg) < cfg.lb_auto_timeout_t) t = s->time_avg+cfg.lb_auto_timeout_t;
1664 }
1665 if (t > ctimeout) t = ctimeout;
1666#ifdef WITH_DEBUG
1667 if (D_TRACE & cs_dblevel) {
1668 char buf[ECM_FMT_LEN];
1669 format_ecm(er, buf, ECM_FMT_LEN);
1670 cs_debug_mask(D_TRACE, "auto-timeout for %s %s set rdr %s to %d", username(er->client), buf, rdr->label, t);
1671 }
1672#endif
1673 return t;
1674}
1675
1676void send_reader_stat(struct s_reader *rdr, ECM_REQUEST *er, struct s_ecm_answer *ea, int8_t rc)
1677{
1678 if (!rdr || rc >= E_99 || cacheex_reader(rdr))
1679 return;
1680 if (er->ecmcacheptr) //ignore cache answer
1681 return;
1682
1683 struct timeb tpe;
1684 cs_ftime(&tpe);
1685#ifndef CS_CACHEEX
1686 int32_t ntime = comp_timeb(&tpe,&er->tps);
1687#else
1688 int32_t ntime = comp_timeb(&tpe, &er->cacheex_wait);
1689#endif
1690 if (ntime < 1)
1691 ntime = 1;
1692
1693 if (ea && (ea->status & READER_FALLBACK) && ntime > (int32_t)cfg.ftimeout)
1694 ntime = ntime - cfg.ftimeout;
1695 add_stat(rdr, er, ntime, rc);
1696}
1697
1698void stat_finish(void) {
1699 if (cfg.lb_mode && cfg.lb_save) {
1700 save_stat_to_file(0);
1701 if (cfg.lb_savepath)
1702 cs_log("stats saved to file %s", cfg.lb_savepath);
1703 cfg.lb_save = 0; //this is for avoiding duplicate saves
1704 }
1705}
1706
1707#endif
Note: See TracBrowser for help on using the repository browser.