Sun May 20 06:33:58 2012

Asterisk developer's documentation


res_corosync.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2007, Digium, Inc.
00005  * Copyright (C) 2012, Russell Bryant
00006  *
00007  * Russell Bryant <russell@russellbryant.net>
00008  *
00009  * See http://www.asterisk.org for more information about
00010  * the Asterisk project. Please do not directly contact
00011  * any of the maintainers of this project for assistance;
00012  * the project provides a web site, mailing lists and IRC
00013  * channels for your use.
00014  *
00015  * This program is free software, distributed under the terms of
00016  * the GNU General Public License Version 2. See the LICENSE file
00017  * at the top of the source tree.
00018  */
00019 
00020 /*!
00021  * \file
00022  * \author Russell Bryant <russell@russellbryant.net>
00023  *
00024  * This module is based on and replaces the previous res_ais module.
00025  */
00026 
00027 /*** MODULEINFO
00028    <depend>corosync</depend>
00029    <support_level>extended</support_level>
00030  ***/
00031 
00032 #include "asterisk.h"
00033 
00034 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 364444 $");
00035 
00036 #include <corosync/cpg.h>
00037 #include <corosync/cfg.h>
00038 
00039 #include "asterisk/module.h"
00040 #include "asterisk/logger.h"
00041 #include "asterisk/poll-compat.h"
00042 #include "asterisk/config.h"
00043 #include "asterisk/event.h"
00044 #include "asterisk/cli.h"
00045 #include "asterisk/devicestate.h"
00046 
00047 AST_RWLOCK_DEFINE_STATIC(event_types_lock);
00048 
00049 static struct {
00050    const char *name;
00051    struct ast_event_sub *sub;
00052    unsigned char publish;
00053    unsigned char subscribe;
00054 } event_types[] = {
00055    [AST_EVENT_MWI] = { .name = "mwi", },
00056    [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state", },
00057 };
00058 
00059 static struct {
00060    pthread_t id;
00061    int alert_pipe[2];
00062    unsigned int stop:1;
00063 } dispatch_thread = {
00064    .id = AST_PTHREADT_NULL,
00065    .alert_pipe = { -1, -1 },
00066 };
00067 
00068 static cpg_handle_t cpg_handle;
00069 static corosync_cfg_handle_t cfg_handle;
00070 
00071 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
00072 static void cfg_state_track_cb(
00073       corosync_cfg_state_notification_buffer_t *notification_buffer,
00074       cs_error_t error);
00075 #endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
00076 
00077 static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
00078       corosync_cfg_shutdown_flags_t flags);
00079 
00080 static corosync_cfg_callbacks_t cfg_callbacks = {
00081 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
00082    .corosync_cfg_state_track_callback = cfg_state_track_cb,
00083 #endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
00084    .corosync_cfg_shutdown_callback = cfg_shutdown_cb,
00085 };
00086 
00087 static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
00088       uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
00089 
00090 static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
00091       const struct cpg_address *member_list, size_t member_list_entries,
00092       const struct cpg_address *left_list, size_t left_list_entries,
00093       const struct cpg_address *joined_list, size_t joined_list_entries);
00094 
00095 static cpg_callbacks_t cpg_callbacks = {
00096    .cpg_deliver_fn = cpg_deliver_cb,
00097    .cpg_confchg_fn = cpg_confchg_cb,
00098 };
00099 
00100 static void ast_event_cb(const struct ast_event *event, void *data);
00101 
00102 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
00103 static void cfg_state_track_cb(
00104       corosync_cfg_state_notification_buffer_t *notification_buffer,
00105       cs_error_t error)
00106 {
00107 }
00108 #endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
00109 
00110 static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
00111       corosync_cfg_shutdown_flags_t flags)
00112 {
00113 }
00114 
00115 static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
00116       uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
00117 {
00118    struct ast_event *event;
00119 
00120    if (msg_len < ast_event_minimum_length()) {
00121       ast_debug(1, "Ignoring event that's too small. %u < %u\n",
00122          (unsigned int) msg_len,
00123          (unsigned int) ast_event_minimum_length());
00124       return;
00125    }
00126 
00127    if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(msg, AST_EVENT_IE_EID))) {
00128       /* Don't feed events back in that originated locally. */
00129       return;
00130    }
00131 
00132    ast_rwlock_rdlock(&event_types_lock);
00133    if (!event_types[ast_event_get_type(msg)].subscribe) {
00134       /* We are not configured to subscribe to these events. */
00135       ast_rwlock_unlock(&event_types_lock);
00136       return;
00137    }
00138    ast_rwlock_unlock(&event_types_lock);
00139 
00140    if (!(event = ast_malloc(msg_len))) {
00141       return;
00142    }
00143 
00144    memcpy(event, msg, msg_len);
00145 
00146    ast_event_queue_and_cache(event);
00147 }
00148 
00149 static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
00150       const struct cpg_address *member_list, size_t member_list_entries,
00151       const struct cpg_address *left_list, size_t left_list_entries,
00152       const struct cpg_address *joined_list, size_t joined_list_entries)
00153 {
00154    unsigned int i;
00155 
00156    /* If any new nodes have joined, dump our cache of events we are publishing
00157     * that originated from this server. */
00158 
00159    if (!joined_list_entries) {
00160       return;
00161    }
00162 
00163    for (i = 0; i < ARRAY_LEN(event_types); i++) {
00164       struct ast_event_sub *event_sub;
00165 
00166       ast_rwlock_rdlock(&event_types_lock);
00167       if (!event_types[i].publish) {
00168          ast_rwlock_unlock(&event_types_lock);
00169          continue;
00170       }
00171       ast_rwlock_unlock(&event_types_lock);
00172 
00173       event_sub = ast_event_subscribe_new(i, ast_event_cb, NULL);
00174       ast_event_sub_append_ie_raw(event_sub, AST_EVENT_IE_EID,
00175                &ast_eid_default, sizeof(ast_eid_default));
00176       ast_event_dump_cache(event_sub);
00177       ast_event_sub_destroy(event_sub);
00178    }
00179 }
00180 
00181 static void *dispatch_thread_handler(void *data)
00182 {
00183    cs_error_t cs_err;
00184    struct pollfd pfd[3] = {
00185       { .events = POLLIN, },
00186       { .events = POLLIN, },
00187       { .events = POLLIN, },
00188    };
00189 
00190    if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
00191       ast_log(LOG_ERROR, "Failed to get CPG fd.  This module is now broken.\n");
00192       return NULL;
00193    }
00194 
00195    if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
00196       ast_log(LOG_ERROR, "Failed to get CFG fd.  This module is now broken.\n");
00197       return NULL;
00198    }
00199 
00200    pfd[2].fd = dispatch_thread.alert_pipe[0];
00201 
00202    while (!dispatch_thread.stop) {
00203       int res;
00204 
00205       cs_err = CS_OK;
00206 
00207       pfd[0].revents = 0;
00208       pfd[1].revents = 0;
00209       pfd[2].revents = 0;
00210 
00211       res = ast_poll(pfd, ARRAY_LEN(pfd), -1);
00212       if (res == -1 && errno != EINTR && errno != EAGAIN) {
00213          ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno);
00214          continue;
00215       }
00216 
00217       if (pfd[0].revents & POLLIN) {
00218          if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
00219             ast_log(LOG_WARNING, "Failed CPG dispatch: %d\n", cs_err);
00220          }
00221       }
00222 
00223       if (pfd[1].revents & POLLIN) {
00224          if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
00225             ast_log(LOG_WARNING, "Failed CFG dispatch: %d\n", cs_err);
00226          }
00227       }
00228 
00229       if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
00230          struct cpg_name name;
00231 
00232          /* If corosync gets restarted out from under Asterisk, try to recover. */
00233 
00234          ast_log(LOG_NOTICE, "Attempting to recover from corosync failure.\n");
00235 
00236          if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
00237             ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
00238             sleep(5);
00239             continue;
00240          }
00241 
00242          if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) {
00243             ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
00244             sleep(5);
00245             continue;
00246          }
00247 
00248          if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
00249             ast_log(LOG_ERROR, "Failed to get CPG fd.\n");
00250             sleep(5);
00251             continue;
00252          }
00253 
00254          if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
00255             ast_log(LOG_ERROR, "Failed to get CFG fd.\n");
00256             sleep(5);
00257             continue;
00258          }
00259 
00260          ast_copy_string(name.value, "asterisk", sizeof(name.value));
00261          name.length = strlen(name.value);
00262          if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
00263             ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err);
00264             sleep(5);
00265             continue;
00266          }
00267 
00268          ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
00269       }
00270    }
00271 
00272    return NULL;
00273 }
00274 
00275 static void ast_event_cb(const struct ast_event *event, void *data)
00276 {
00277    cs_error_t cs_err;
00278    struct iovec iov = {
00279       .iov_base = (void *) event,
00280       .iov_len = ast_event_get_size(event),
00281    };
00282 
00283    if (ast_eid_cmp(&ast_eid_default,
00284          ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
00285       /* If the event didn't originate from this server, don't send it back out. */
00286       return;
00287    }
00288 
00289    /* The ast_event subscription will only exist if we are configured to publish
00290     * these events, so just send away. */
00291 
00292    if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
00293       ast_log(LOG_WARNING, "CPG mcast failed (%d)\n", cs_err);
00294    }
00295 }
00296 
00297 static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00298 {
00299    cs_error_t cs_err;
00300    cpg_iteration_handle_t cpg_iter;
00301    struct cpg_iteration_description_t cpg_desc;
00302    unsigned int i;
00303 
00304    switch (cmd) {
00305    case CLI_INIT:
00306       e->command = "corosync show members";
00307       e->usage =
00308          "Usage: corosync show members\n"
00309          "       Show corosync cluster members\n";
00310       return NULL;
00311 
00312    case CLI_GENERATE:
00313       return NULL;   /* no completion */
00314    }
00315 
00316    if (a->argc != e->args) {
00317       return CLI_SHOWUSAGE;
00318    }
00319 
00320    cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
00321 
00322    if (cs_err != CS_OK) {
00323       ast_cli(a->fd, "Failed to initialize CPG iterator.\n");
00324       return CLI_FAILURE;
00325    }
00326 
00327    ast_cli(a->fd, "\n"
00328                "=============================================================\n"
00329                "=== Cluster members =========================================\n"
00330                "=============================================================\n"
00331                "===\n");
00332 
00333    for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
00334          cs_err == CS_OK;
00335          cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
00336       corosync_cfg_node_address_t addrs[8];
00337       int num_addrs = 0;
00338       unsigned int j;
00339 
00340       cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
00341             ARRAY_LEN(addrs), &num_addrs, addrs);
00342       if (cs_err != CS_OK) {
00343          ast_log(LOG_WARNING, "Failed to get node addresses\n");
00344          continue;
00345       }
00346 
00347       ast_cli(a->fd, "=== Node %d\n", i);
00348       ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
00349 
00350       for (j = 0; j < num_addrs; j++) {
00351          struct sockaddr *sa = (struct sockaddr *) addrs[j].address;
00352          size_t sa_len = (size_t) addrs[j].address_length;
00353          char buf[128];
00354 
00355          getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
00356 
00357          ast_cli(a->fd, "=== --> Address %d: %s\n", j + 1, buf);
00358       }
00359 
00360    }
00361 
00362    ast_cli(a->fd, "===\n"
00363                   "=============================================================\n"
00364                   "\n");
00365 
00366    cpg_iteration_finalize(cpg_iter);
00367 
00368    return CLI_SUCCESS;
00369 }
00370 
00371 static char *corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00372 {
00373    unsigned int i;
00374 
00375    switch (cmd) {
00376    case CLI_INIT:
00377       e->command = "corosync show config";
00378       e->usage =
00379          "Usage: corosync show config\n"
00380          "       Show configuration loaded from res_corosync.conf\n";
00381       return NULL;
00382 
00383    case CLI_GENERATE:
00384       return NULL;   /* no completion */
00385    }
00386 
00387    if (a->argc != e->args) {
00388       return CLI_SHOWUSAGE;
00389    }
00390 
00391    ast_cli(a->fd, "\n"
00392                "=============================================================\n"
00393                "=== res_corosync config =====================================\n"
00394                "=============================================================\n"
00395                "===\n");
00396 
00397    ast_rwlock_rdlock(&event_types_lock);
00398    for (i = 0; i < ARRAY_LEN(event_types); i++) {
00399       if (event_types[i].publish) {
00400          ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
00401                event_types[i].name);
00402       }
00403       if (event_types[i].subscribe) {
00404          ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
00405                event_types[i].name);
00406       }
00407    }
00408    ast_rwlock_unlock(&event_types_lock);
00409 
00410    ast_cli(a->fd, "===\n"
00411                   "=============================================================\n"
00412                   "\n");
00413 
00414    return CLI_SUCCESS;
00415 }
00416 
00417 static struct ast_cli_entry corosync_cli[] = {
00418    AST_CLI_DEFINE(corosync_show_config, "Show configuration"),
00419    AST_CLI_DEFINE(corosync_show_members, "Show cluster members"),
00420 };
00421 
00422 enum {
00423    PUBLISH,
00424    SUBSCRIBE,
00425 };
00426 
00427 static int set_event(const char *event_type, int pubsub)
00428 {
00429    unsigned int i;
00430 
00431    for (i = 0; i < ARRAY_LEN(event_types); i++) {
00432       if (!event_types[i].name || strcasecmp(event_type, event_types[i].name)) {
00433          continue;
00434       }
00435 
00436       switch (pubsub) {
00437       case PUBLISH:
00438          event_types[i].publish = 1;
00439          break;
00440       case SUBSCRIBE:
00441          event_types[i].subscribe = 1;
00442          break;
00443       }
00444 
00445       break;
00446    }
00447 
00448    return (i == ARRAY_LEN(event_types)) ? -1 : 0;
00449 }
00450 
00451 static int load_general_config(struct ast_config *cfg)
00452 {
00453    struct ast_variable *v;
00454    int res = 0;
00455    unsigned int i;
00456 
00457    ast_rwlock_wrlock(&event_types_lock);
00458 
00459    for (i = 0; i < ARRAY_LEN(event_types); i++) {
00460       event_types[i].publish = 0;
00461       event_types[i].subscribe = 0;
00462    }
00463 
00464    for (v = ast_variable_browse(cfg, "general"); v && !res; v = v->next) {
00465       if (!strcasecmp(v->name, "publish_event")) {
00466          res = set_event(v->value, PUBLISH);
00467       } else if (!strcasecmp(v->name, "subscribe_event")) {
00468          res = set_event(v->value, SUBSCRIBE);
00469       } else {
00470          ast_log(LOG_WARNING, "Unknown option '%s'\n", v->name);
00471       }
00472    }
00473 
00474    for (i = 0; i < ARRAY_LEN(event_types); i++) {
00475       if (event_types[i].publish && !event_types[i].sub) {
00476          event_types[i].sub = ast_event_subscribe(i,
00477                   ast_event_cb, "Corosync", NULL,
00478                   AST_EVENT_IE_END);
00479       } else if (!event_types[i].publish && event_types[i].sub) {
00480          event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
00481       }
00482    }
00483 
00484    ast_rwlock_unlock(&event_types_lock);
00485 
00486    return res;
00487 }
00488 
00489 static int load_config(unsigned int reload)
00490 {
00491    static const char filename[] = "res_corosync.conf";
00492    struct ast_config *cfg;
00493    const char *cat = NULL;
00494    struct ast_flags config_flags = { 0 };
00495    int res = 0;
00496 
00497    cfg = ast_config_load(filename, config_flags);
00498 
00499    if (cfg == CONFIG_STATUS_FILEMISSING || cfg == CONFIG_STATUS_FILEINVALID) {
00500       return -1;
00501    }
00502 
00503    while ((cat = ast_category_browse(cfg, cat))) {
00504       if (!strcasecmp(cat, "general")) {
00505          res = load_general_config(cfg);
00506       } else {
00507          ast_log(LOG_WARNING, "Unknown configuration section '%s'\n", cat);
00508       }
00509    }
00510 
00511    ast_config_destroy(cfg);
00512 
00513    return res;
00514 }
00515 
00516 static void cleanup_module(void)
00517 {
00518    cs_error_t cs_err;
00519    unsigned int i;
00520 
00521    for (i = 0; i < ARRAY_LEN(event_types); i++) {
00522       if (event_types[i].sub) {
00523          event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
00524       }
00525       event_types[i].publish = 0;
00526       event_types[i].subscribe = 0;
00527    }
00528 
00529    if (dispatch_thread.id != AST_PTHREADT_NULL) {
00530       char meepmeep = 'x';
00531       dispatch_thread.stop = 1;
00532       if (ast_carefulwrite(dispatch_thread.alert_pipe[1], &meepmeep, 1,
00533                5000) == -1) {
00534          ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
00535                strerror(errno), errno);
00536       }
00537       pthread_join(dispatch_thread.id, NULL);
00538    }
00539 
00540    if (dispatch_thread.alert_pipe[0] != -1) {
00541       close(dispatch_thread.alert_pipe[0]);
00542       dispatch_thread.alert_pipe[0] = -1;
00543    }
00544 
00545    if (dispatch_thread.alert_pipe[1] != -1) {
00546       close(dispatch_thread.alert_pipe[1]);
00547       dispatch_thread.alert_pipe[1] = -1;
00548    }
00549 
00550    if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
00551       ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
00552    }
00553    cpg_handle = 0;
00554 
00555    if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
00556       ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
00557    }
00558    cfg_handle = 0;
00559 }
00560 
00561 static int load_module(void)
00562 {
00563    cs_error_t cs_err;
00564    enum ast_module_load_result res = AST_MODULE_LOAD_FAILURE;
00565    struct cpg_name name;
00566 
00567    if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
00568       ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
00569       return AST_MODULE_LOAD_DECLINE;
00570    }
00571 
00572    if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
00573       ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
00574       goto failed;
00575    }
00576 
00577    ast_copy_string(name.value, "asterisk", sizeof(name.value));
00578    name.length = strlen(name.value);
00579 
00580    if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
00581       ast_log(LOG_ERROR, "Failed to join (%d)\n", (int) cs_err);
00582       goto failed;
00583    }
00584 
00585    if (pipe(dispatch_thread.alert_pipe) == -1) {
00586       ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
00587             strerror(errno), errno);
00588       goto failed;
00589    }
00590 
00591    if (ast_pthread_create_background(&dispatch_thread.id, NULL,
00592          dispatch_thread_handler, NULL)) {
00593       ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n");
00594       goto failed;
00595    }
00596 
00597    if (load_config(0)) {
00598       /* simply not configured is not a fatal error */
00599       res = AST_MODULE_LOAD_DECLINE;
00600       goto failed;
00601    }
00602 
00603    ast_cli_register_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
00604 
00605    ast_enable_distributed_devstate();
00606 
00607    return AST_MODULE_LOAD_SUCCESS;
00608 
00609 failed:
00610    cleanup_module();
00611 
00612    return res;
00613 }
00614 
00615 static int unload_module(void)
00616 {
00617    ast_cli_unregister_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
00618 
00619    cleanup_module();
00620 
00621    return 0;
00622 }
00623 
00624 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Corosync");

Generated on Sun May 20 06:33:58 2012 for Asterisk - The Open Source Telephony Project by  doxygen 1.5.6