Sun May 20 06:33:57 2012

Asterisk developer's documentation


pbx_spool.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 1999 - 2010, Digium, Inc.
00005  *
00006  * Mark Spencer <markster@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*! \file
00020  *
00021  * \brief Full-featured outgoing call spool support
00022  * 
00023  */
00024 
00025 /*** MODULEINFO
00026    <support_level>core</support_level>
00027  ***/
00028 
00029 #include "asterisk.h"
00030 
00031 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 355058 $")
00032 
00033 #include <sys/stat.h>
00034 #include <time.h>
00035 #include <utime.h>
00036 #include <dirent.h>
00037 #ifdef HAVE_INOTIFY
00038 #include <sys/inotify.h>
00039 #elif defined(HAVE_KQUEUE)
00040 #include <sys/types.h>
00041 #include <sys/time.h>
00042 #include <sys/event.h>
00043 #include <fcntl.h>
00044 #endif
00045 
00046 #include "asterisk/paths.h"   /* use ast_config_AST_SPOOL_DIR */
00047 #include "asterisk/lock.h"
00048 #include "asterisk/file.h"
00049 #include "asterisk/logger.h"
00050 #include "asterisk/channel.h"
00051 #include "asterisk/callerid.h"
00052 #include "asterisk/pbx.h"
00053 #include "asterisk/module.h"
00054 #include "asterisk/utils.h"
00055 #include "asterisk/options.h"
00056 
00057 /*
00058  * pbx_spool is similar in spirit to qcall, but with substantially enhanced functionality...
00059  * The spool file contains a header 
00060  */
00061 
00062 enum {
00063    /*! Always delete the call file after a call succeeds or the
00064     * maximum number of retries is exceeded, even if the
00065     * modification time of the call file is in the future.
00066     */
00067    SPOOL_FLAG_ALWAYS_DELETE = (1 << 0),
00068    /* Don't unlink the call file after processing, move in qdonedir */
00069    SPOOL_FLAG_ARCHIVE = (1 << 1),
00070 };
00071 
00072 static char qdir[255];
00073 static char qdonedir[255];
00074 
00075 struct outgoing {
00076    int retries;                              /*!< Current number of retries */
00077    int maxretries;                           /*!< Maximum number of retries permitted */
00078    int retrytime;                            /*!< How long to wait between retries (in seconds) */
00079    int waittime;                             /*!< How long to wait for an answer */
00080    long callingpid;                          /*!< PID which is currently calling */
00081    struct ast_format_cap *capabilities;                 /*!< Formats (codecs) for this call */
00082    AST_DECLARE_STRING_FIELDS (
00083       AST_STRING_FIELD(fn);                 /*!< File name of call file */
00084       AST_STRING_FIELD(tech);               /*!< Which channel technology to use for outgoing call */
00085       AST_STRING_FIELD(dest);               /*!< Which device/line to use for outgoing call */
00086       AST_STRING_FIELD(app);                /*!< If application: Application name */
00087       AST_STRING_FIELD(data);               /*!< If application: Application data */
00088       AST_STRING_FIELD(exten);              /*!< If extension/context/priority: Extension in dialplan */
00089       AST_STRING_FIELD(context);            /*!< If extension/context/priority: Dialplan context */
00090       AST_STRING_FIELD(cid_num);            /*!< CallerID Information: Number/extension */
00091       AST_STRING_FIELD(cid_name);           /*!< CallerID Information: Name */
00092       AST_STRING_FIELD(account);            /*!< account code */
00093    );
00094    int priority;                             /*!< If extension/context/priority: Dialplan priority */
00095    struct ast_variable *vars;                /*!< Variables and Functions */
00096    int maxlen;                               /*!< Maximum length of call */
00097    struct ast_flags options;                 /*!< options */
00098 };
00099 
00100 #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE)
00101 static void queue_file(const char *filename, time_t when);
00102 #endif
00103 
00104 static int init_outgoing(struct outgoing *o)
00105 {
00106    struct ast_format tmpfmt;
00107    o->priority = 1;
00108    o->retrytime = 300;
00109    o->waittime = 45;
00110 
00111    if (!(o->capabilities = ast_format_cap_alloc_nolock())) {
00112       return -1;
00113    }
00114    ast_format_cap_add(o->capabilities, ast_format_set(&tmpfmt, AST_FORMAT_SLINEAR, 0));
00115 
00116    ast_set_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE);
00117    if (ast_string_field_init(o, 128)) {
00118       return -1;
00119    }
00120    return 0;
00121 }
00122 
00123 static void free_outgoing(struct outgoing *o)
00124 {
00125    if (o->vars) {
00126       ast_variables_destroy(o->vars);
00127    }
00128    ast_string_field_free_memory(o);
00129    o->capabilities = ast_format_cap_destroy(o->capabilities);
00130    ast_free(o);
00131 }
00132 
00133 static int apply_outgoing(struct outgoing *o, const char *fn, FILE *f)
00134 {
00135    char buf[256];
00136    char *c, *c2;
00137    int lineno = 0;
00138    struct ast_variable *var, *last = o->vars;
00139 
00140    while (last && last->next) {
00141       last = last->next;
00142    }
00143 
00144    while(fgets(buf, sizeof(buf), f)) {
00145       lineno++;
00146       /* Trim comments */
00147       c = buf;
00148       while ((c = strchr(c, '#'))) {
00149          if ((c == buf) || (*(c-1) == ' ') || (*(c-1) == '\t'))
00150             *c = '\0';
00151          else
00152             c++;
00153       }
00154 
00155       c = buf;
00156       while ((c = strchr(c, ';'))) {
00157          if ((c > buf) && (c[-1] == '\\')) {
00158             memmove(c - 1, c, strlen(c) + 1);
00159             c++;
00160          } else {
00161             *c = '\0';
00162             break;
00163          }
00164       }
00165 
00166       /* Trim trailing white space */
00167       while(!ast_strlen_zero(buf) && buf[strlen(buf) - 1] < 33)
00168          buf[strlen(buf) - 1] = '\0';
00169       if (!ast_strlen_zero(buf)) {
00170          c = strchr(buf, ':');
00171          if (c) {
00172             *c = '\0';
00173             c++;
00174             while ((*c) && (*c < 33))
00175                c++;
00176 #if 0
00177             printf("'%s' is '%s' at line %d\n", buf, c, lineno);
00178 #endif
00179             if (!strcasecmp(buf, "channel")) {
00180                if ((c2 = strchr(c, '/'))) {
00181                   *c2 = '\0';
00182                   c2++;
00183                   ast_string_field_set(o, tech, c);
00184                   ast_string_field_set(o, dest, c2);
00185                } else {
00186                   ast_log(LOG_NOTICE, "Channel should be in form Tech/Dest at line %d of %s\n", lineno, fn);
00187                }
00188             } else if (!strcasecmp(buf, "callerid")) {
00189                char cid_name[80] = {0}, cid_num[80] = {0};
00190                ast_callerid_split(c, cid_name, sizeof(cid_name), cid_num, sizeof(cid_num));
00191                ast_string_field_set(o, cid_num, cid_num);
00192                ast_string_field_set(o, cid_name, cid_name);
00193             } else if (!strcasecmp(buf, "application")) {
00194                ast_string_field_set(o, app, c);
00195             } else if (!strcasecmp(buf, "data")) {
00196                ast_string_field_set(o, data, c);
00197             } else if (!strcasecmp(buf, "maxretries")) {
00198                if (sscanf(c, "%30d", &o->maxretries) != 1) {
00199                   ast_log(LOG_WARNING, "Invalid max retries at line %d of %s\n", lineno, fn);
00200                   o->maxretries = 0;
00201                }
00202             } else if (!strcasecmp(buf, "codecs")) {
00203                ast_parse_allow_disallow(NULL, o->capabilities, c, 1);
00204             } else if (!strcasecmp(buf, "context")) {
00205                ast_string_field_set(o, context, c);
00206             } else if (!strcasecmp(buf, "extension")) {
00207                ast_string_field_set(o, exten, c);
00208             } else if (!strcasecmp(buf, "priority")) {
00209                if ((sscanf(c, "%30d", &o->priority) != 1) || (o->priority < 1)) {
00210                   ast_log(LOG_WARNING, "Invalid priority at line %d of %s\n", lineno, fn);
00211                   o->priority = 1;
00212                }
00213             } else if (!strcasecmp(buf, "retrytime")) {
00214                if ((sscanf(c, "%30d", &o->retrytime) != 1) || (o->retrytime < 1)) {
00215                   ast_log(LOG_WARNING, "Invalid retrytime at line %d of %s\n", lineno, fn);
00216                   o->retrytime = 300;
00217                }
00218             } else if (!strcasecmp(buf, "waittime")) {
00219                if ((sscanf(c, "%30d", &o->waittime) != 1) || (o->waittime < 1)) {
00220                   ast_log(LOG_WARNING, "Invalid waittime at line %d of %s\n", lineno, fn);
00221                   o->waittime = 45;
00222                }
00223             } else if (!strcasecmp(buf, "retry")) {
00224                o->retries++;
00225             } else if (!strcasecmp(buf, "startretry")) {
00226                if (sscanf(c, "%30ld", &o->callingpid) != 1) {
00227                   ast_log(LOG_WARNING, "Unable to retrieve calling PID!\n");
00228                   o->callingpid = 0;
00229                }
00230             } else if (!strcasecmp(buf, "endretry") || !strcasecmp(buf, "abortretry")) {
00231                o->callingpid = 0;
00232                o->retries++;
00233             } else if (!strcasecmp(buf, "delayedretry")) {
00234             } else if (!strcasecmp(buf, "setvar") || !strcasecmp(buf, "set")) {
00235                c2 = c;
00236                strsep(&c2, "=");
00237                if (c2) {
00238                   var = ast_variable_new(c, c2, fn);
00239                   if (var) {
00240                      /* Always insert at the end, because some people want to treat the spool file as a script */
00241                      if (last) {
00242                         last->next = var;
00243                      } else {
00244                         o->vars = var;
00245                      }
00246                      last = var;
00247                   }
00248                } else
00249                   ast_log(LOG_WARNING, "Malformed \"%s\" argument.  Should be \"%s: variable=value\"\n", buf, buf);
00250             } else if (!strcasecmp(buf, "account")) {
00251                ast_string_field_set(o, account, c);
00252             } else if (!strcasecmp(buf, "alwaysdelete")) {
00253                ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ALWAYS_DELETE);
00254             } else if (!strcasecmp(buf, "archive")) {
00255                ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ARCHIVE);
00256             } else {
00257                ast_log(LOG_WARNING, "Unknown keyword '%s' at line %d of %s\n", buf, lineno, fn);
00258             }
00259          } else
00260             ast_log(LOG_NOTICE, "Syntax error at line %d of %s\n", lineno, fn);
00261       }
00262    }
00263    ast_string_field_set(o, fn, fn);
00264    if (ast_strlen_zero(o->tech) || ast_strlen_zero(o->dest) || (ast_strlen_zero(o->app) && ast_strlen_zero(o->exten))) {
00265       ast_log(LOG_WARNING, "At least one of app or extension must be specified, along with tech and dest in file %s\n", fn);
00266       return -1;
00267    }
00268    return 0;
00269 }
00270 
00271 static void safe_append(struct outgoing *o, time_t now, char *s)
00272 {
00273    FILE *f;
00274    struct utimbuf tbuf = { .actime = now, .modtime = now + o->retrytime };
00275 
00276    ast_debug(1, "Outgoing %s/%s: %s\n", o->tech, o->dest, s);
00277 
00278    if ((f = fopen(o->fn, "a"))) {
00279       fprintf(f, "\n%s: %ld %d (%ld)\n", s, (long)ast_mainpid, o->retries, (long) now);
00280       fclose(f);
00281    }
00282 
00283    /* Update the file time */
00284    if (utime(o->fn, &tbuf)) {
00285       ast_log(LOG_WARNING, "Unable to set utime on %s: %s\n", o->fn, strerror(errno));
00286    }
00287 }
00288 
00289 /*!
00290  * \brief Remove a call file from the outgoing queue optionally moving it in the archive dir
00291  *
00292  * \param o the pointer to outgoing struct
00293  * \param status the exit status of the call. Can be "Completed", "Failed" or "Expired"
00294  */
00295 static int remove_from_queue(struct outgoing *o, const char *status)
00296 {
00297    FILE *f;
00298    char newfn[256];
00299    const char *bname;
00300 
00301    if (!ast_test_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE)) {
00302       struct stat current_file_status;
00303 
00304       if (!stat(o->fn, &current_file_status)) {
00305          if (time(NULL) < current_file_status.st_mtime) {
00306             return 0;
00307          }
00308       }
00309    }
00310 
00311    if (!ast_test_flag(&o->options, SPOOL_FLAG_ARCHIVE)) {
00312       unlink(o->fn);
00313       return 0;
00314    }
00315 
00316    if (ast_mkdir(qdonedir, 0777)) {
00317       ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool archiving disabled\n", qdonedir);
00318       unlink(o->fn);
00319       return -1;
00320    }
00321 
00322    if (!(bname = strrchr(o->fn, '/'))) {
00323       bname = o->fn;
00324    } else {
00325       bname++;
00326    }
00327 
00328    snprintf(newfn, sizeof(newfn), "%s/%s", qdonedir, bname);
00329    /* a existing call file the archive dir is overwritten */
00330    unlink(newfn);
00331    if (rename(o->fn, newfn) != 0) {
00332       unlink(o->fn);
00333       return -1;
00334    }
00335 
00336    /* Only append to the file AFTER we move it out of the watched directory,
00337     * otherwise the fclose() causes another event for inotify(7) */
00338    if ((f = fopen(newfn, "a"))) {
00339       fprintf(f, "Status: %s\n", status);
00340       fclose(f);
00341    }
00342 
00343    return 0;
00344 }
00345 
00346 static void *attempt_thread(void *data)
00347 {
00348    struct outgoing *o = data;
00349    int res, reason;
00350    if (!ast_strlen_zero(o->app)) {
00351       ast_verb(3, "Attempting call on %s/%s for application %s(%s) (Retry %d)\n", o->tech, o->dest, o->app, o->data, o->retries);
00352       res = ast_pbx_outgoing_app(o->tech, o->capabilities, o->dest, o->waittime * 1000,
00353          o->app, o->data, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name,
00354          o->vars, o->account, NULL);
00355       o->vars = NULL;
00356    } else {
00357       ast_verb(3, "Attempting call on %s/%s for %s@%s:%d (Retry %d)\n", o->tech, o->dest, o->exten, o->context,o->priority, o->retries);
00358       res = ast_pbx_outgoing_exten(o->tech, o->capabilities, o->dest,
00359          o->waittime * 1000, o->context, o->exten, o->priority, &reason,
00360          2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL);
00361       o->vars = NULL;
00362    }
00363    if (res) {
00364       ast_log(LOG_NOTICE, "Call failed to go through, reason (%d) %s\n", reason, ast_channel_reason2str(reason));
00365       if (o->retries >= o->maxretries + 1) {
00366          /* Max retries exceeded */
00367          ast_log(LOG_NOTICE, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : "");
00368          remove_from_queue(o, "Expired");
00369       } else {
00370          /* Notate that the call is still active */
00371          safe_append(o, time(NULL), "EndRetry");
00372 #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE)
00373          queue_file(o->fn, time(NULL) + o->retrytime);
00374 #endif
00375       }
00376    } else {
00377       ast_log(LOG_NOTICE, "Call completed to %s/%s\n", o->tech, o->dest);
00378       remove_from_queue(o, "Completed");
00379    }
00380    free_outgoing(o);
00381    return NULL;
00382 }
00383 
00384 static void launch_service(struct outgoing *o)
00385 {
00386    pthread_t t;
00387    int ret;
00388 
00389    if ((ret = ast_pthread_create_detached(&t, NULL, attempt_thread, o))) {
00390       ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret);
00391       free_outgoing(o);
00392    }
00393 }
00394 
00395 /* Called from scan_thread or queue_file */
00396 static int scan_service(const char *fn, time_t now)
00397 {
00398    struct outgoing *o = NULL;
00399    FILE *f;
00400    int res = 0;
00401 
00402    if (!(o = ast_calloc(1, sizeof(*o)))) {
00403       ast_log(LOG_WARNING, "Out of memory ;(\n");
00404       return -1;
00405    }
00406 
00407    if (init_outgoing(o)) {
00408       /* No need to call free_outgoing here since we know the failure
00409        * was to allocate string fields and no variables have been allocated
00410        * yet.
00411        */
00412       ast_free(o);
00413       return -1;
00414    }
00415 
00416    /* Attempt to open the file */
00417    if (!(f = fopen(fn, "r"))) {
00418       remove_from_queue(o, "Failed");
00419       free_outgoing(o);
00420 #if !defined(HAVE_INOTIFY) && !defined(HAVE_KQUEUE)
00421       ast_log(LOG_WARNING, "Unable to open %s: %s, deleting\n", fn, strerror(errno));
00422 #endif
00423       return -1;
00424    }
00425 
00426    /* Read in and verify the contents */
00427    if (apply_outgoing(o, fn, f)) {
00428       remove_from_queue(o, "Failed");
00429       free_outgoing(o);
00430       ast_log(LOG_WARNING, "Invalid file contents in %s, deleting\n", fn);
00431       fclose(f);
00432       return -1;
00433    }
00434 
00435 #if 0
00436    printf("Filename: %s, Retries: %d, max: %d\n", fn, o->retries, o->maxretries);
00437 #endif
00438    fclose(f);
00439    if (o->retries <= o->maxretries) {
00440       now += o->retrytime;
00441       if (o->callingpid && (o->callingpid == ast_mainpid)) {
00442          safe_append(o, time(NULL), "DelayedRetry");
00443          ast_debug(1, "Delaying retry since we're currently running '%s'\n", o->fn);
00444          free_outgoing(o);
00445       } else {
00446          /* Increment retries */
00447          o->retries++;
00448          /* If someone else was calling, they're presumably gone now
00449             so abort their retry and continue as we were... */
00450          if (o->callingpid)
00451             safe_append(o, time(NULL), "AbortRetry");
00452 
00453          safe_append(o, now, "StartRetry");
00454          launch_service(o);
00455       }
00456       res = now;
00457    } else {
00458       ast_log(LOG_NOTICE, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : "");
00459       remove_from_queue(o, "Expired");
00460       free_outgoing(o);
00461    }
00462 
00463    return res;
00464 }
00465 
00466 #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE)
00467 struct direntry {
00468    AST_LIST_ENTRY(direntry) list;
00469    time_t mtime;
00470    char name[0];
00471 };
00472 
00473 static AST_LIST_HEAD_STATIC(dirlist, direntry);
00474 
00475 #if defined(HAVE_INOTIFY)
00476 /* Only one thread is accessing this list, so no lock is necessary */
00477 static AST_LIST_HEAD_NOLOCK_STATIC(createlist, direntry);
00478 static AST_LIST_HEAD_NOLOCK_STATIC(openlist, direntry);
00479 #endif
00480 
00481 static void queue_file(const char *filename, time_t when)
00482 {
00483    struct stat st;
00484    struct direntry *cur, *new;
00485    int res;
00486    time_t now = time(NULL);
00487 
00488    if (filename[0] != '/') {
00489       char *fn = alloca(strlen(qdir) + strlen(filename) + 2);
00490       sprintf(fn, "%s/%s", qdir, filename); /* SAFE */
00491       filename = fn;
00492    }
00493 
00494    if (when == 0) {
00495       if (stat(filename, &st)) {
00496          ast_log(LOG_WARNING, "Unable to stat %s: %s\n", filename, strerror(errno));
00497          return;
00498       }
00499 
00500       if (!S_ISREG(st.st_mode)) {
00501          return;
00502       }
00503 
00504       when = st.st_mtime;
00505    }
00506 
00507    /* Need to check the existing list in order to avoid duplicates. */
00508    AST_LIST_LOCK(&dirlist);
00509    AST_LIST_TRAVERSE(&dirlist, cur, list) {
00510       if (cur->mtime == when && !strcmp(filename, cur->name)) {
00511          AST_LIST_UNLOCK(&dirlist);
00512          return;
00513       }
00514    }
00515 
00516    if ((res = when) > now || (res = scan_service(filename, now)) > 0) {
00517       if (!(new = ast_calloc(1, sizeof(*new) + strlen(filename) + 1))) {
00518          AST_LIST_UNLOCK(&dirlist);
00519          return;
00520       }
00521       new->mtime = res;
00522       strcpy(new->name, filename);
00523       /* List is ordered by mtime */
00524       if (AST_LIST_EMPTY(&dirlist)) {
00525          AST_LIST_INSERT_HEAD(&dirlist, new, list);
00526       } else {
00527          int found = 0;
00528          AST_LIST_TRAVERSE_SAFE_BEGIN(&dirlist, cur, list) {
00529             if (cur->mtime > new->mtime) {
00530                AST_LIST_INSERT_BEFORE_CURRENT(new, list);
00531                found = 1;
00532                break;
00533             }
00534          }
00535          AST_LIST_TRAVERSE_SAFE_END
00536          if (!found) {
00537             AST_LIST_INSERT_TAIL(&dirlist, new, list);
00538          }
00539       }
00540    }
00541    AST_LIST_UNLOCK(&dirlist);
00542 }
00543 
00544 #ifdef HAVE_INOTIFY
00545 static void queue_file_create(const char *filename)
00546 {
00547    struct direntry *cur;
00548 
00549    AST_LIST_TRAVERSE(&createlist, cur, list) {
00550       if (!strcmp(cur->name, filename)) {
00551          return;
00552       }
00553    }
00554 
00555    if (!(cur = ast_calloc(1, sizeof(*cur) + strlen(filename) + 1))) {
00556       return;
00557    }
00558    strcpy(cur->name, filename);
00559    /* We'll handle this file unless an IN_OPEN event occurs within 2 seconds */
00560    cur->mtime = time(NULL) + 2;
00561    AST_LIST_INSERT_TAIL(&createlist, cur, list);
00562 }
00563 
00564 static void queue_file_open(const char *filename)
00565 {
00566    struct direntry *cur;
00567 
00568    AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) {
00569       if (!strcmp(cur->name, filename)) {
00570          AST_LIST_REMOVE_CURRENT(list);
00571          AST_LIST_INSERT_TAIL(&openlist, cur, list);
00572          break;
00573       }
00574    }
00575    AST_LIST_TRAVERSE_SAFE_END
00576 }
00577 
00578 static void queue_created_files(void)
00579 {
00580    struct direntry *cur;
00581    time_t now = time(NULL);
00582 
00583    AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) {
00584       if (cur->mtime > now) {
00585          break;
00586       }
00587 
00588       AST_LIST_REMOVE_CURRENT(list);
00589       queue_file(cur->name, 0);
00590       ast_free(cur);
00591    }
00592    AST_LIST_TRAVERSE_SAFE_END
00593 }
00594 
00595 static void queue_file_write(const char *filename)
00596 {
00597    struct direntry *cur;
00598    /* Only queue entries where an IN_CREATE preceded the IN_CLOSE_WRITE */
00599    AST_LIST_TRAVERSE_SAFE_BEGIN(&openlist, cur, list) {
00600       if (!strcmp(cur->name, filename)) {
00601          AST_LIST_REMOVE_CURRENT(list);
00602          ast_free(cur);
00603          queue_file(filename, 0);
00604          break;
00605       }
00606    }
00607    AST_LIST_TRAVERSE_SAFE_END
00608 }
00609 #endif
00610 
00611 static void *scan_thread(void *unused)
00612 {
00613    DIR *dir;
00614    struct dirent *de;
00615    time_t now;
00616    struct timespec ts = { .tv_sec = 1 };
00617 #ifdef HAVE_INOTIFY
00618    ssize_t res;
00619    int inotify_fd = inotify_init();
00620    struct inotify_event *iev;
00621    char buf[8192] __attribute__((aligned (sizeof(int))));
00622    struct pollfd pfd = { .fd = inotify_fd, .events = POLLIN };
00623 #else
00624    struct timespec nowait = { 0, 1 };
00625    int inotify_fd = kqueue();
00626    struct kevent kev;
00627 #endif
00628    struct direntry *cur;
00629 
00630    while (!ast_fully_booted) {
00631       nanosleep(&ts, NULL);
00632    }
00633 
00634    if (inotify_fd < 0) {
00635       ast_log(LOG_ERROR, "Unable to initialize "
00636 #ifdef HAVE_INOTIFY
00637          "inotify(7)"
00638 #else
00639          "kqueue(2)"
00640 #endif
00641          "\n");
00642       return NULL;
00643    }
00644 
00645 #ifdef HAVE_INOTIFY
00646    inotify_add_watch(inotify_fd, qdir, IN_CREATE | IN_OPEN | IN_CLOSE_WRITE | IN_MOVED_TO);
00647 #endif
00648 
00649    /* First, run through the directory and clear existing entries */
00650    if (!(dir = opendir(qdir))) {
00651       ast_log(LOG_ERROR, "Unable to open directory %s: %s\n", qdir, strerror(errno));
00652       return NULL;
00653    }
00654 
00655 #ifndef HAVE_INOTIFY
00656    EV_SET(&kev, dirfd(dir), EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_WRITE, 0, NULL);
00657    if (kevent(inotify_fd, &kev, 1, NULL, 0, &nowait) < 0 && errno != 0) {
00658       ast_log(LOG_ERROR, "Unable to watch directory %s: %s\n", qdir, strerror(errno));
00659    }
00660 #endif
00661    now = time(NULL);
00662    while ((de = readdir(dir))) {
00663       queue_file(de->d_name, 0);
00664    }
00665 
00666 #ifdef HAVE_INOTIFY
00667    /* Directory needs to remain open for kqueue(2) */
00668    closedir(dir);
00669 #endif
00670 
00671    /* Wait for either a) next timestamp to occur, or b) a change to happen */
00672    for (;/* ever */;) {
00673       time_t next = AST_LIST_EMPTY(&dirlist) ? INT_MAX : AST_LIST_FIRST(&dirlist)->mtime;
00674 
00675       time(&now);
00676       if (next > now) {
00677 #ifdef HAVE_INOTIFY
00678          int stage = 0;
00679          /* Convert from seconds to milliseconds, unless there's nothing
00680           * in the queue already, in which case, we wait forever. */
00681          int waittime = next == INT_MAX ? -1 : (next - now) * 1000;
00682          if (!AST_LIST_EMPTY(&createlist)) {
00683             waittime = 1000;
00684          }
00685          /* When a file arrives, add it to the queue, in mtime order. */
00686          if ((res = poll(&pfd, 1, waittime)) > 0 && (stage = 1) &&
00687             (res = read(inotify_fd, &buf, sizeof(buf))) >= sizeof(*iev)) {
00688             ssize_t len = 0;
00689             /* File(s) added to directory, add them to my list */
00690             for (iev = (void *) buf; res >= sizeof(*iev); iev = (struct inotify_event *) (((char *) iev) + len)) {
00691                /* For an IN_MOVED_TO event, simply process the file. However, if
00692                 * we get an IN_CREATE event it *might* be an open(O_CREAT) or it
00693                 * might be a hardlink (like smsq does, since rename() might
00694                 * overwrite an existing file). So we have to see if we get a
00695                 * subsequent IN_OPEN event on the same file. If we do, keep it
00696                 * on the openlist and wait for the corresponding IN_CLOSE_WRITE.
00697                 * If we *don't* see an IN_OPEN event, then it was a hard link so
00698                 * it can be processed immediately.
00699                 *
00700                 * Unfortunately, although open(O_CREAT) is an atomic file system
00701                 * operation, the inotify subsystem doesn't give it to us in a
00702                 * single event with both IN_CREATE|IN_OPEN set. It's two separate
00703                 * events, and the kernel doesn't even give them to us at the same
00704                 * time. We can read() from inotify_fd after the IN_CREATE event,
00705                 * and get *nothing* from it. The IN_OPEN arrives only later! So
00706                 * we have a very short timeout of 2 seconds. */
00707                if (iev->mask & IN_CREATE) {
00708                   queue_file_create(iev->name);
00709                } else if (iev->mask & IN_OPEN) {
00710                   queue_file_open(iev->name);
00711                } else if (iev->mask & IN_CLOSE_WRITE) {
00712                   queue_file_write(iev->name);
00713                } else if (iev->mask & IN_MOVED_TO) {
00714                   queue_file(iev->name, 0);
00715                } else {
00716                   ast_log(LOG_ERROR, "Unexpected event %d for file '%s'\n", (int) iev->mask, iev->name);
00717                }
00718 
00719                len = sizeof(*iev) + iev->len;
00720                res -= len;
00721             }
00722          } else if (res < 0 && errno != EINTR && errno != EAGAIN) {
00723             ast_debug(1, "Got an error back from %s(2): %s\n", stage ? "read" : "poll", strerror(errno));
00724          }
00725          time(&now);
00726       }
00727       queue_created_files();
00728 #else
00729          struct timespec ts2 = { next - now, 0 };
00730          if (kevent(inotify_fd, NULL, 0, &kev, 1, &ts2) <= 0) {
00731             /* Interrupt or timeout, restart calculations */
00732             continue;
00733          } else {
00734             /* Directory changed, rescan */
00735             rewinddir(dir);
00736             while ((de = readdir(dir))) {
00737                queue_file(de->d_name, 0);
00738             }
00739          }
00740          time(&now);
00741       }
00742 #endif
00743 
00744       /* Empty the list of all entries ready to be processed */
00745       AST_LIST_LOCK(&dirlist);
00746       while (!AST_LIST_EMPTY(&dirlist) && AST_LIST_FIRST(&dirlist)->mtime <= now) {
00747          cur = AST_LIST_REMOVE_HEAD(&dirlist, list);
00748          queue_file(cur->name, cur->mtime);
00749          ast_free(cur);
00750       }
00751       AST_LIST_UNLOCK(&dirlist);
00752    }
00753    return NULL;
00754 }
00755 
00756 #else
00757 static void *scan_thread(void *unused)
00758 {
00759    struct stat st;
00760    DIR *dir;
00761    struct dirent *de;
00762    char fn[256];
00763    int res;
00764    int force_poll = 1;
00765    time_t last = 0;
00766    time_t next = 0;
00767    time_t now;
00768    struct timespec ts = { .tv_sec = 1 };
00769 
00770    while (!ast_fully_booted) {
00771       nanosleep(&ts, NULL);
00772    }
00773 
00774    for (;;) {
00775       /* Wait a sec */
00776       nanosleep(&ts, NULL);
00777       time(&now);
00778 
00779       if (stat(qdir, &st)) {
00780          ast_log(LOG_WARNING, "Unable to stat %s\n", qdir);
00781          continue;
00782       }
00783 
00784       /* Make sure it is time for us to execute our check */
00785       if (!force_poll && st.st_mtime == last && (!next || now < next)) {
00786          /*
00787           * The directory timestamp did not change and any delayed
00788           * call-file is not ready to be executed.
00789           */
00790          continue;
00791       }
00792 
00793 #if 0
00794       printf("atime: %ld, mtime: %ld, ctime: %ld\n", st.st_atime, st.st_mtime, st.st_ctime);
00795       printf("Ooh, something changed / timeout\n");
00796 #endif
00797 
00798       if (!(dir = opendir(qdir))) {
00799          ast_log(LOG_WARNING, "Unable to open directory %s: %s\n", qdir, strerror(errno));
00800          continue;
00801       }
00802 
00803       /*
00804        * Since the dir timestamp is available at one second
00805        * resolution, we cannot know if it was updated within the same
00806        * second after we scanned it.  Therefore, we will force another
00807        * scan if the dir was just modified.
00808        */
00809       force_poll = (st.st_mtime == now);
00810 
00811       next = 0;
00812       last = st.st_mtime;
00813       while ((de = readdir(dir))) {
00814          snprintf(fn, sizeof(fn), "%s/%s", qdir, de->d_name);
00815          if (stat(fn, &st)) {
00816             ast_log(LOG_WARNING, "Unable to stat %s: %s\n", fn, strerror(errno));
00817             continue;
00818          }
00819          if (!S_ISREG(st.st_mode)) {
00820             /* Not a regular file. */
00821             continue;
00822          }
00823          if (st.st_mtime <= now) {
00824             res = scan_service(fn, now);
00825             if (res > 0) {
00826                /* The call-file is delayed or to be retried later. */
00827                if (!next || res < next) {
00828                   /* This delayed call file expires earlier. */
00829                   next = res;
00830                }
00831             } else if (res) {
00832                ast_log(LOG_WARNING, "Failed to scan service '%s'\n", fn);
00833             } else if (!next) {
00834                /* Expired entry: must recheck on the next go-around */
00835                next = st.st_mtime;
00836             }
00837          } else {
00838             /* The file's timestamp is in the future. */
00839             if (!next || st.st_mtime < next) {
00840                /* This call-file's timestamp expires earlier. */
00841                next = st.st_mtime;
00842             }
00843          }
00844       }
00845       closedir(dir);
00846    }
00847    return NULL;
00848 }
00849 #endif
00850 
00851 static int unload_module(void)
00852 {
00853    return -1;
00854 }
00855 
00856 static int load_module(void)
00857 {
00858    pthread_t thread;
00859    int ret;
00860    snprintf(qdir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing");
00861    if (ast_mkdir(qdir, 0777)) {
00862       ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool disabled\n", qdir);
00863       return AST_MODULE_LOAD_DECLINE;
00864    }
00865    snprintf(qdonedir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing_done");
00866 
00867    if ((ret = ast_pthread_create_detached_background(&thread, NULL, scan_thread, NULL))) {
00868       ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret);
00869       return AST_MODULE_LOAD_FAILURE;
00870    }
00871 
00872    return AST_MODULE_LOAD_SUCCESS;
00873 }
00874 
00875 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Outgoing Spool Support");

Generated on Sun May 20 06:33:57 2012 for Asterisk - The Open Source Telephony Project by  doxygen 1.5.6