00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "asterisk.h"
00028
00029 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 360190 $")
00030
00031 #ifdef DEBUG_SCHEDULER
00032 #define DEBUG(a) do { \
00033 if (option_debug) \
00034 DEBUG_M(a) \
00035 } while (0)
00036 #else
00037 #define DEBUG(a)
00038 #endif
00039
00040 #include <sys/time.h>
00041
00042 #include "asterisk/sched.h"
00043 #include "asterisk/channel.h"
00044 #include "asterisk/lock.h"
00045 #include "asterisk/utils.h"
00046 #include "asterisk/linkedlists.h"
00047 #include "asterisk/dlinkedlists.h"
00048 #include "asterisk/hashtab.h"
00049 #include "asterisk/heap.h"
00050 #include "asterisk/threadstorage.h"
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060 #define SCHED_MAX_CACHE 128
00061
00062 AST_THREADSTORAGE(last_del_id);
00063
00064 struct sched {
00065 AST_LIST_ENTRY(sched) list;
00066 int id;
00067 struct timeval when;
00068 int resched;
00069 int variable;
00070 const void *data;
00071 ast_sched_cb callback;
00072 ssize_t __heap_index;
00073 };
00074
00075 struct sched_thread {
00076 pthread_t thread;
00077 ast_cond_t cond;
00078 unsigned int stop:1;
00079 };
00080
00081 struct ast_sched_context {
00082 ast_mutex_t lock;
00083 unsigned int eventcnt;
00084 unsigned int schedcnt;
00085 unsigned int highwater;
00086 struct ast_hashtab *schedq_ht;
00087 struct ast_heap *sched_heap;
00088 struct sched_thread *sched_thread;
00089
00090 #ifdef SCHED_MAX_CACHE
00091 AST_LIST_HEAD_NOLOCK(, sched) schedc;
00092 unsigned int schedccnt;
00093 #endif
00094 };
00095
00096 static void *sched_run(void *data)
00097 {
00098 struct ast_sched_context *con = data;
00099
00100 while (!con->sched_thread->stop) {
00101 int ms;
00102 struct timespec ts = {
00103 .tv_sec = 0,
00104 };
00105
00106 ast_mutex_lock(&con->lock);
00107
00108 if (con->sched_thread->stop) {
00109 ast_mutex_unlock(&con->lock);
00110 return NULL;
00111 }
00112
00113 ms = ast_sched_wait(con);
00114
00115 if (ms == -1) {
00116 ast_cond_wait(&con->sched_thread->cond, &con->lock);
00117 } else {
00118 struct timeval tv;
00119 tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
00120 ts.tv_sec = tv.tv_sec;
00121 ts.tv_nsec = tv.tv_usec * 1000;
00122 ast_cond_timedwait(&con->sched_thread->cond, &con->lock, &ts);
00123 }
00124
00125 ast_mutex_unlock(&con->lock);
00126
00127 if (con->sched_thread->stop) {
00128 return NULL;
00129 }
00130
00131 ast_sched_runq(con);
00132 }
00133
00134 return NULL;
00135 }
00136
00137 static void sched_thread_destroy(struct ast_sched_context *con)
00138 {
00139 if (!con->sched_thread) {
00140 return;
00141 }
00142
00143 if (con->sched_thread->thread != AST_PTHREADT_NULL) {
00144 ast_mutex_lock(&con->lock);
00145 con->sched_thread->stop = 1;
00146 ast_cond_signal(&con->sched_thread->cond);
00147 ast_mutex_unlock(&con->lock);
00148 pthread_join(con->sched_thread->thread, NULL);
00149 con->sched_thread->thread = AST_PTHREADT_NULL;
00150 }
00151
00152 ast_cond_destroy(&con->sched_thread->cond);
00153
00154 ast_free(con->sched_thread);
00155
00156 con->sched_thread = NULL;
00157 }
00158
00159 int ast_sched_start_thread(struct ast_sched_context *con)
00160 {
00161 struct sched_thread *st;
00162
00163 if (con->sched_thread) {
00164 ast_log(LOG_ERROR, "Thread already started on this scheduler context\n");
00165 return -1;
00166 }
00167
00168 if (!(st = ast_calloc(1, sizeof(*st)))) {
00169 return -1;
00170 }
00171
00172 ast_cond_init(&st->cond, NULL);
00173
00174 st->thread = AST_PTHREADT_NULL;
00175
00176 con->sched_thread = st;
00177
00178 if (ast_pthread_create_background(&st->thread, NULL, sched_run, con)) {
00179 ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
00180 sched_thread_destroy(con);
00181 return -1;
00182 }
00183
00184 return 0;
00185 }
00186
00187 static int sched_cmp(const void *a, const void *b)
00188 {
00189 const struct sched *as = a;
00190 const struct sched *bs = b;
00191 return as->id != bs->id;
00192 }
00193
00194 static unsigned int sched_hash(const void *obj)
00195 {
00196 const struct sched *s = obj;
00197 unsigned int h = s->id;
00198 return h;
00199 }
00200
00201 static int sched_time_cmp(void *a, void *b)
00202 {
00203 return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when);
00204 }
00205
00206 struct ast_sched_context *ast_sched_context_create(void)
00207 {
00208 struct ast_sched_context *tmp;
00209
00210 if (!(tmp = ast_calloc(1, sizeof(*tmp)))) {
00211 return NULL;
00212 }
00213
00214 ast_mutex_init(&tmp->lock);
00215 tmp->eventcnt = 1;
00216
00217 tmp->schedq_ht = ast_hashtab_create(23, sched_cmp, ast_hashtab_resize_java, ast_hashtab_newsize_java, sched_hash, 1);
00218
00219 if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
00220 offsetof(struct sched, __heap_index)))) {
00221 ast_sched_context_destroy(tmp);
00222 return NULL;
00223 }
00224
00225 return tmp;
00226 }
00227
00228 void ast_sched_context_destroy(struct ast_sched_context *con)
00229 {
00230 struct sched *s;
00231
00232 sched_thread_destroy(con);
00233 con->sched_thread = NULL;
00234
00235 ast_mutex_lock(&con->lock);
00236
00237 #ifdef SCHED_MAX_CACHE
00238 while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
00239 ast_free(s);
00240 }
00241 #endif
00242
00243 if (con->sched_heap) {
00244 while ((s = ast_heap_pop(con->sched_heap))) {
00245 ast_free(s);
00246 }
00247 ast_heap_destroy(con->sched_heap);
00248 con->sched_heap = NULL;
00249 }
00250
00251 ast_hashtab_destroy(con->schedq_ht, NULL);
00252 con->schedq_ht = NULL;
00253
00254 ast_mutex_unlock(&con->lock);
00255 ast_mutex_destroy(&con->lock);
00256
00257 ast_free(con);
00258 }
00259
00260 static struct sched *sched_alloc(struct ast_sched_context *con)
00261 {
00262 struct sched *tmp;
00263
00264
00265
00266
00267
00268 #ifdef SCHED_MAX_CACHE
00269 if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
00270 con->schedccnt--;
00271 else
00272 #endif
00273 tmp = ast_calloc(1, sizeof(*tmp));
00274
00275 return tmp;
00276 }
00277
00278 static void sched_release(struct ast_sched_context *con, struct sched *tmp)
00279 {
00280
00281
00282
00283
00284
00285 #ifdef SCHED_MAX_CACHE
00286 if (con->schedccnt < SCHED_MAX_CACHE) {
00287 AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
00288 con->schedccnt++;
00289 } else
00290 #endif
00291 ast_free(tmp);
00292 }
00293
00294
00295
00296
00297
00298 int ast_sched_wait(struct ast_sched_context *con)
00299 {
00300 int ms;
00301 struct sched *s;
00302
00303 DEBUG(ast_debug(1, "ast_sched_wait()\n"));
00304
00305 ast_mutex_lock(&con->lock);
00306 if ((s = ast_heap_peek(con->sched_heap, 1))) {
00307 ms = ast_tvdiff_ms(s->when, ast_tvnow());
00308 if (ms < 0) {
00309 ms = 0;
00310 }
00311 } else {
00312 ms = -1;
00313 }
00314 ast_mutex_unlock(&con->lock);
00315
00316 return ms;
00317 }
00318
00319
00320
00321
00322
00323
00324
00325 static void schedule(struct ast_sched_context *con, struct sched *s)
00326 {
00327 ast_heap_push(con->sched_heap, s);
00328
00329 if (!ast_hashtab_insert_safe(con->schedq_ht, s)) {
00330 ast_log(LOG_WARNING,"Schedule Queue entry %d is already in table!\n", s->id);
00331 }
00332
00333 con->schedcnt++;
00334
00335 if (con->schedcnt > con->highwater) {
00336 con->highwater = con->schedcnt;
00337 }
00338 }
00339
00340
00341
00342
00343
00344 static int sched_settime(struct timeval *t, int when)
00345 {
00346 struct timeval now = ast_tvnow();
00347
00348
00349 if (ast_tvzero(*t))
00350 *t = now;
00351 *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
00352 if (ast_tvcmp(*t, now) < 0) {
00353 *t = now;
00354 }
00355 return 0;
00356 }
00357
00358 int ast_sched_replace_variable(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
00359 {
00360
00361 if (old_id > 0) {
00362 AST_SCHED_DEL(con, old_id);
00363 }
00364 return ast_sched_add_variable(con, when, callback, data, variable);
00365 }
00366
00367
00368
00369
00370 int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
00371 {
00372 struct sched *tmp;
00373 int res = -1;
00374
00375 DEBUG(ast_debug(1, "ast_sched_add()\n"));
00376
00377 ast_mutex_lock(&con->lock);
00378 if ((tmp = sched_alloc(con))) {
00379 tmp->id = con->eventcnt++;
00380 tmp->callback = callback;
00381 tmp->data = data;
00382 tmp->resched = when;
00383 tmp->variable = variable;
00384 tmp->when = ast_tv(0, 0);
00385 if (sched_settime(&tmp->when, when)) {
00386 sched_release(con, tmp);
00387 } else {
00388 schedule(con, tmp);
00389 res = tmp->id;
00390 }
00391 }
00392 #ifdef DUMP_SCHEDULER
00393
00394 if (option_debug)
00395 ast_sched_dump(con);
00396 #endif
00397 if (con->sched_thread) {
00398 ast_cond_signal(&con->sched_thread->cond);
00399 }
00400 ast_mutex_unlock(&con->lock);
00401
00402 return res;
00403 }
00404
00405 int ast_sched_replace(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
00406 {
00407 if (old_id > -1) {
00408 AST_SCHED_DEL(con, old_id);
00409 }
00410 return ast_sched_add(con, when, callback, data);
00411 }
00412
00413 int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
00414 {
00415 return ast_sched_add_variable(con, when, callback, data, 0);
00416 }
00417
00418 const void *ast_sched_find_data(struct ast_sched_context *con, int id)
00419 {
00420 struct sched tmp,*res;
00421 tmp.id = id;
00422 res = ast_hashtab_lookup(con->schedq_ht, &tmp);
00423 if (res)
00424 return res->data;
00425 return NULL;
00426 }
00427
00428
00429
00430
00431
00432
00433
00434 #ifndef AST_DEVMODE
00435 int ast_sched_del(struct ast_sched_context *con, int id)
00436 #else
00437 int _ast_sched_del(struct ast_sched_context *con, int id, const char *file, int line, const char *function)
00438 #endif
00439 {
00440 struct sched *s, tmp = {
00441 .id = id,
00442 };
00443 int *last_id = ast_threadstorage_get(&last_del_id, sizeof(int));
00444
00445 DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
00446
00447 if (id < 0) {
00448 return 0;
00449 }
00450
00451 ast_mutex_lock(&con->lock);
00452 s = ast_hashtab_lookup(con->schedq_ht, &tmp);
00453 if (s) {
00454 if (!ast_heap_remove(con->sched_heap, s)) {
00455 ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->id);
00456 }
00457
00458 if (!ast_hashtab_remove_this_object(con->schedq_ht, s)) {
00459 ast_log(LOG_WARNING,"Found sched entry %d, then couldn't remove it?\n", s->id);
00460 }
00461
00462 con->schedcnt--;
00463
00464 sched_release(con, s);
00465 }
00466
00467 #ifdef DUMP_SCHEDULER
00468
00469 if (option_debug)
00470 ast_sched_dump(con);
00471 #endif
00472 if (con->sched_thread) {
00473 ast_cond_signal(&con->sched_thread->cond);
00474 }
00475 ast_mutex_unlock(&con->lock);
00476
00477 if (!s && *last_id != id) {
00478 ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
00479 #ifndef AST_DEVMODE
00480 ast_assert(s != NULL);
00481 #else
00482 {
00483 char buf[100];
00484 snprintf(buf, sizeof(buf), "s != NULL, id=%d", id);
00485 _ast_assert(0, buf, file, line, function);
00486 }
00487 #endif
00488 *last_id = id;
00489 return -1;
00490 } else if (!s) {
00491 return -1;
00492 }
00493
00494 return 0;
00495 }
00496
00497 void ast_sched_report(struct ast_sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
00498 {
00499 int i, x;
00500 struct sched *cur;
00501 int countlist[cbnames->numassocs + 1];
00502 size_t heap_size;
00503
00504 memset(countlist, 0, sizeof(countlist));
00505 ast_str_set(buf, 0, " Highwater = %d\n schedcnt = %d\n", con->highwater, con->schedcnt);
00506
00507 ast_mutex_lock(&con->lock);
00508
00509 heap_size = ast_heap_size(con->sched_heap);
00510 for (x = 1; x <= heap_size; x++) {
00511 cur = ast_heap_peek(con->sched_heap, x);
00512
00513 for (i = 0; i < cbnames->numassocs; i++) {
00514 if (cur->callback == cbnames->cblist[i]) {
00515 break;
00516 }
00517 }
00518 if (i < cbnames->numassocs) {
00519 countlist[i]++;
00520 } else {
00521 countlist[cbnames->numassocs]++;
00522 }
00523 }
00524
00525 ast_mutex_unlock(&con->lock);
00526
00527 for (i = 0; i < cbnames->numassocs; i++) {
00528 ast_str_append(buf, 0, " %s : %d\n", cbnames->list[i], countlist[i]);
00529 }
00530
00531 ast_str_append(buf, 0, " <unknown> : %d\n", countlist[cbnames->numassocs]);
00532 }
00533
00534
00535 void ast_sched_dump(struct ast_sched_context *con)
00536 {
00537 struct sched *q;
00538 struct timeval when = ast_tvnow();
00539 int x;
00540 size_t heap_size;
00541 #ifdef SCHED_MAX_CACHE
00542 ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d Cache, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->schedccnt, con->highwater);
00543 #else
00544 ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->highwater);
00545 #endif
00546
00547 ast_debug(1, "=============================================================\n");
00548 ast_debug(1, "|ID Callback Data Time (sec:ms) |\n");
00549 ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
00550 ast_mutex_lock(&con->lock);
00551 heap_size = ast_heap_size(con->sched_heap);
00552 for (x = 1; x <= heap_size; x++) {
00553 struct timeval delta;
00554 q = ast_heap_peek(con->sched_heap, x);
00555 delta = ast_tvsub(q->when, when);
00556 ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
00557 q->id,
00558 q->callback,
00559 q->data,
00560 (long)delta.tv_sec,
00561 (long int)delta.tv_usec);
00562 }
00563 ast_mutex_unlock(&con->lock);
00564 ast_debug(1, "=============================================================\n");
00565 }
00566
00567
00568
00569
00570 int ast_sched_runq(struct ast_sched_context *con)
00571 {
00572 struct sched *current;
00573 struct timeval when;
00574 int numevents;
00575 int res;
00576
00577 DEBUG(ast_debug(1, "ast_sched_runq()\n"));
00578
00579 ast_mutex_lock(&con->lock);
00580
00581 when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
00582 for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
00583
00584
00585
00586
00587
00588 if (ast_tvcmp(current->when, when) != -1) {
00589 break;
00590 }
00591
00592 current = ast_heap_pop(con->sched_heap);
00593
00594 if (!ast_hashtab_remove_this_object(con->schedq_ht, current)) {
00595 ast_log(LOG_ERROR,"Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
00596 }
00597
00598 con->schedcnt--;
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609 ast_mutex_unlock(&con->lock);
00610 res = current->callback(current->data);
00611 ast_mutex_lock(&con->lock);
00612
00613 if (res) {
00614
00615
00616
00617
00618 if (sched_settime(¤t->when, current->variable? res : current->resched)) {
00619 sched_release(con, current);
00620 } else {
00621 schedule(con, current);
00622 }
00623 } else {
00624
00625 sched_release(con, current);
00626 }
00627 }
00628
00629 ast_mutex_unlock(&con->lock);
00630
00631 return numevents;
00632 }
00633
00634 long ast_sched_when(struct ast_sched_context *con,int id)
00635 {
00636 struct sched *s, tmp;
00637 long secs = -1;
00638 DEBUG(ast_debug(1, "ast_sched_when()\n"));
00639
00640 ast_mutex_lock(&con->lock);
00641
00642
00643 tmp.id = id;
00644 s = ast_hashtab_lookup(con->schedq_ht, &tmp);
00645
00646 if (s) {
00647 struct timeval now = ast_tvnow();
00648 secs = s->when.tv_sec - now.tv_sec;
00649 }
00650 ast_mutex_unlock(&con->lock);
00651
00652 return secs;
00653 }