00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include "asterisk.h"
00033
00034 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 364444 $");
00035
00036 #include <corosync/cpg.h>
00037 #include <corosync/cfg.h>
00038
00039 #include "asterisk/module.h"
00040 #include "asterisk/logger.h"
00041 #include "asterisk/poll-compat.h"
00042 #include "asterisk/config.h"
00043 #include "asterisk/event.h"
00044 #include "asterisk/cli.h"
00045 #include "asterisk/devicestate.h"
00046
00047 AST_RWLOCK_DEFINE_STATIC(event_types_lock);
00048
00049 static struct {
00050 const char *name;
00051 struct ast_event_sub *sub;
00052 unsigned char publish;
00053 unsigned char subscribe;
00054 } event_types[] = {
00055 [AST_EVENT_MWI] = { .name = "mwi", },
00056 [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state", },
00057 };
00058
00059 static struct {
00060 pthread_t id;
00061 int alert_pipe[2];
00062 unsigned int stop:1;
00063 } dispatch_thread = {
00064 .id = AST_PTHREADT_NULL,
00065 .alert_pipe = { -1, -1 },
00066 };
00067
00068 static cpg_handle_t cpg_handle;
00069 static corosync_cfg_handle_t cfg_handle;
00070
00071 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
00072 static void cfg_state_track_cb(
00073 corosync_cfg_state_notification_buffer_t *notification_buffer,
00074 cs_error_t error);
00075 #endif
00076
00077 static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
00078 corosync_cfg_shutdown_flags_t flags);
00079
00080 static corosync_cfg_callbacks_t cfg_callbacks = {
00081 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
00082 .corosync_cfg_state_track_callback = cfg_state_track_cb,
00083 #endif
00084 .corosync_cfg_shutdown_callback = cfg_shutdown_cb,
00085 };
00086
00087 static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
00088 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
00089
00090 static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
00091 const struct cpg_address *member_list, size_t member_list_entries,
00092 const struct cpg_address *left_list, size_t left_list_entries,
00093 const struct cpg_address *joined_list, size_t joined_list_entries);
00094
00095 static cpg_callbacks_t cpg_callbacks = {
00096 .cpg_deliver_fn = cpg_deliver_cb,
00097 .cpg_confchg_fn = cpg_confchg_cb,
00098 };
00099
00100 static void ast_event_cb(const struct ast_event *event, void *data);
00101
00102 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
00103 static void cfg_state_track_cb(
00104 corosync_cfg_state_notification_buffer_t *notification_buffer,
00105 cs_error_t error)
00106 {
00107 }
00108 #endif
00109
00110 static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
00111 corosync_cfg_shutdown_flags_t flags)
00112 {
00113 }
00114
00115 static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
00116 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
00117 {
00118 struct ast_event *event;
00119
00120 if (msg_len < ast_event_minimum_length()) {
00121 ast_debug(1, "Ignoring event that's too small. %u < %u\n",
00122 (unsigned int) msg_len,
00123 (unsigned int) ast_event_minimum_length());
00124 return;
00125 }
00126
00127 if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(msg, AST_EVENT_IE_EID))) {
00128
00129 return;
00130 }
00131
00132 ast_rwlock_rdlock(&event_types_lock);
00133 if (!event_types[ast_event_get_type(msg)].subscribe) {
00134
00135 ast_rwlock_unlock(&event_types_lock);
00136 return;
00137 }
00138 ast_rwlock_unlock(&event_types_lock);
00139
00140 if (!(event = ast_malloc(msg_len))) {
00141 return;
00142 }
00143
00144 memcpy(event, msg, msg_len);
00145
00146 ast_event_queue_and_cache(event);
00147 }
00148
00149 static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
00150 const struct cpg_address *member_list, size_t member_list_entries,
00151 const struct cpg_address *left_list, size_t left_list_entries,
00152 const struct cpg_address *joined_list, size_t joined_list_entries)
00153 {
00154 unsigned int i;
00155
00156
00157
00158
00159 if (!joined_list_entries) {
00160 return;
00161 }
00162
00163 for (i = 0; i < ARRAY_LEN(event_types); i++) {
00164 struct ast_event_sub *event_sub;
00165
00166 ast_rwlock_rdlock(&event_types_lock);
00167 if (!event_types[i].publish) {
00168 ast_rwlock_unlock(&event_types_lock);
00169 continue;
00170 }
00171 ast_rwlock_unlock(&event_types_lock);
00172
00173 event_sub = ast_event_subscribe_new(i, ast_event_cb, NULL);
00174 ast_event_sub_append_ie_raw(event_sub, AST_EVENT_IE_EID,
00175 &ast_eid_default, sizeof(ast_eid_default));
00176 ast_event_dump_cache(event_sub);
00177 ast_event_sub_destroy(event_sub);
00178 }
00179 }
00180
00181 static void *dispatch_thread_handler(void *data)
00182 {
00183 cs_error_t cs_err;
00184 struct pollfd pfd[3] = {
00185 { .events = POLLIN, },
00186 { .events = POLLIN, },
00187 { .events = POLLIN, },
00188 };
00189
00190 if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
00191 ast_log(LOG_ERROR, "Failed to get CPG fd. This module is now broken.\n");
00192 return NULL;
00193 }
00194
00195 if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
00196 ast_log(LOG_ERROR, "Failed to get CFG fd. This module is now broken.\n");
00197 return NULL;
00198 }
00199
00200 pfd[2].fd = dispatch_thread.alert_pipe[0];
00201
00202 while (!dispatch_thread.stop) {
00203 int res;
00204
00205 cs_err = CS_OK;
00206
00207 pfd[0].revents = 0;
00208 pfd[1].revents = 0;
00209 pfd[2].revents = 0;
00210
00211 res = ast_poll(pfd, ARRAY_LEN(pfd), -1);
00212 if (res == -1 && errno != EINTR && errno != EAGAIN) {
00213 ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno);
00214 continue;
00215 }
00216
00217 if (pfd[0].revents & POLLIN) {
00218 if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
00219 ast_log(LOG_WARNING, "Failed CPG dispatch: %d\n", cs_err);
00220 }
00221 }
00222
00223 if (pfd[1].revents & POLLIN) {
00224 if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
00225 ast_log(LOG_WARNING, "Failed CFG dispatch: %d\n", cs_err);
00226 }
00227 }
00228
00229 if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
00230 struct cpg_name name;
00231
00232
00233
00234 ast_log(LOG_NOTICE, "Attempting to recover from corosync failure.\n");
00235
00236 if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
00237 ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
00238 sleep(5);
00239 continue;
00240 }
00241
00242 if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) {
00243 ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
00244 sleep(5);
00245 continue;
00246 }
00247
00248 if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
00249 ast_log(LOG_ERROR, "Failed to get CPG fd.\n");
00250 sleep(5);
00251 continue;
00252 }
00253
00254 if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
00255 ast_log(LOG_ERROR, "Failed to get CFG fd.\n");
00256 sleep(5);
00257 continue;
00258 }
00259
00260 ast_copy_string(name.value, "asterisk", sizeof(name.value));
00261 name.length = strlen(name.value);
00262 if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
00263 ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err);
00264 sleep(5);
00265 continue;
00266 }
00267
00268 ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
00269 }
00270 }
00271
00272 return NULL;
00273 }
00274
00275 static void ast_event_cb(const struct ast_event *event, void *data)
00276 {
00277 cs_error_t cs_err;
00278 struct iovec iov = {
00279 .iov_base = (void *) event,
00280 .iov_len = ast_event_get_size(event),
00281 };
00282
00283 if (ast_eid_cmp(&ast_eid_default,
00284 ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
00285
00286 return;
00287 }
00288
00289
00290
00291
00292 if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
00293 ast_log(LOG_WARNING, "CPG mcast failed (%d)\n", cs_err);
00294 }
00295 }
00296
00297 static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00298 {
00299 cs_error_t cs_err;
00300 cpg_iteration_handle_t cpg_iter;
00301 struct cpg_iteration_description_t cpg_desc;
00302 unsigned int i;
00303
00304 switch (cmd) {
00305 case CLI_INIT:
00306 e->command = "corosync show members";
00307 e->usage =
00308 "Usage: corosync show members\n"
00309 " Show corosync cluster members\n";
00310 return NULL;
00311
00312 case CLI_GENERATE:
00313 return NULL;
00314 }
00315
00316 if (a->argc != e->args) {
00317 return CLI_SHOWUSAGE;
00318 }
00319
00320 cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
00321
00322 if (cs_err != CS_OK) {
00323 ast_cli(a->fd, "Failed to initialize CPG iterator.\n");
00324 return CLI_FAILURE;
00325 }
00326
00327 ast_cli(a->fd, "\n"
00328 "=============================================================\n"
00329 "=== Cluster members =========================================\n"
00330 "=============================================================\n"
00331 "===\n");
00332
00333 for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
00334 cs_err == CS_OK;
00335 cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
00336 corosync_cfg_node_address_t addrs[8];
00337 int num_addrs = 0;
00338 unsigned int j;
00339
00340 cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
00341 ARRAY_LEN(addrs), &num_addrs, addrs);
00342 if (cs_err != CS_OK) {
00343 ast_log(LOG_WARNING, "Failed to get node addresses\n");
00344 continue;
00345 }
00346
00347 ast_cli(a->fd, "=== Node %d\n", i);
00348 ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
00349
00350 for (j = 0; j < num_addrs; j++) {
00351 struct sockaddr *sa = (struct sockaddr *) addrs[j].address;
00352 size_t sa_len = (size_t) addrs[j].address_length;
00353 char buf[128];
00354
00355 getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
00356
00357 ast_cli(a->fd, "=== --> Address %d: %s\n", j + 1, buf);
00358 }
00359
00360 }
00361
00362 ast_cli(a->fd, "===\n"
00363 "=============================================================\n"
00364 "\n");
00365
00366 cpg_iteration_finalize(cpg_iter);
00367
00368 return CLI_SUCCESS;
00369 }
00370
00371 static char *corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00372 {
00373 unsigned int i;
00374
00375 switch (cmd) {
00376 case CLI_INIT:
00377 e->command = "corosync show config";
00378 e->usage =
00379 "Usage: corosync show config\n"
00380 " Show configuration loaded from res_corosync.conf\n";
00381 return NULL;
00382
00383 case CLI_GENERATE:
00384 return NULL;
00385 }
00386
00387 if (a->argc != e->args) {
00388 return CLI_SHOWUSAGE;
00389 }
00390
00391 ast_cli(a->fd, "\n"
00392 "=============================================================\n"
00393 "=== res_corosync config =====================================\n"
00394 "=============================================================\n"
00395 "===\n");
00396
00397 ast_rwlock_rdlock(&event_types_lock);
00398 for (i = 0; i < ARRAY_LEN(event_types); i++) {
00399 if (event_types[i].publish) {
00400 ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
00401 event_types[i].name);
00402 }
00403 if (event_types[i].subscribe) {
00404 ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
00405 event_types[i].name);
00406 }
00407 }
00408 ast_rwlock_unlock(&event_types_lock);
00409
00410 ast_cli(a->fd, "===\n"
00411 "=============================================================\n"
00412 "\n");
00413
00414 return CLI_SUCCESS;
00415 }
00416
00417 static struct ast_cli_entry corosync_cli[] = {
00418 AST_CLI_DEFINE(corosync_show_config, "Show configuration"),
00419 AST_CLI_DEFINE(corosync_show_members, "Show cluster members"),
00420 };
00421
00422 enum {
00423 PUBLISH,
00424 SUBSCRIBE,
00425 };
00426
00427 static int set_event(const char *event_type, int pubsub)
00428 {
00429 unsigned int i;
00430
00431 for (i = 0; i < ARRAY_LEN(event_types); i++) {
00432 if (!event_types[i].name || strcasecmp(event_type, event_types[i].name)) {
00433 continue;
00434 }
00435
00436 switch (pubsub) {
00437 case PUBLISH:
00438 event_types[i].publish = 1;
00439 break;
00440 case SUBSCRIBE:
00441 event_types[i].subscribe = 1;
00442 break;
00443 }
00444
00445 break;
00446 }
00447
00448 return (i == ARRAY_LEN(event_types)) ? -1 : 0;
00449 }
00450
00451 static int load_general_config(struct ast_config *cfg)
00452 {
00453 struct ast_variable *v;
00454 int res = 0;
00455 unsigned int i;
00456
00457 ast_rwlock_wrlock(&event_types_lock);
00458
00459 for (i = 0; i < ARRAY_LEN(event_types); i++) {
00460 event_types[i].publish = 0;
00461 event_types[i].subscribe = 0;
00462 }
00463
00464 for (v = ast_variable_browse(cfg, "general"); v && !res; v = v->next) {
00465 if (!strcasecmp(v->name, "publish_event")) {
00466 res = set_event(v->value, PUBLISH);
00467 } else if (!strcasecmp(v->name, "subscribe_event")) {
00468 res = set_event(v->value, SUBSCRIBE);
00469 } else {
00470 ast_log(LOG_WARNING, "Unknown option '%s'\n", v->name);
00471 }
00472 }
00473
00474 for (i = 0; i < ARRAY_LEN(event_types); i++) {
00475 if (event_types[i].publish && !event_types[i].sub) {
00476 event_types[i].sub = ast_event_subscribe(i,
00477 ast_event_cb, "Corosync", NULL,
00478 AST_EVENT_IE_END);
00479 } else if (!event_types[i].publish && event_types[i].sub) {
00480 event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
00481 }
00482 }
00483
00484 ast_rwlock_unlock(&event_types_lock);
00485
00486 return res;
00487 }
00488
00489 static int load_config(unsigned int reload)
00490 {
00491 static const char filename[] = "res_corosync.conf";
00492 struct ast_config *cfg;
00493 const char *cat = NULL;
00494 struct ast_flags config_flags = { 0 };
00495 int res = 0;
00496
00497 cfg = ast_config_load(filename, config_flags);
00498
00499 if (cfg == CONFIG_STATUS_FILEMISSING || cfg == CONFIG_STATUS_FILEINVALID) {
00500 return -1;
00501 }
00502
00503 while ((cat = ast_category_browse(cfg, cat))) {
00504 if (!strcasecmp(cat, "general")) {
00505 res = load_general_config(cfg);
00506 } else {
00507 ast_log(LOG_WARNING, "Unknown configuration section '%s'\n", cat);
00508 }
00509 }
00510
00511 ast_config_destroy(cfg);
00512
00513 return res;
00514 }
00515
00516 static void cleanup_module(void)
00517 {
00518 cs_error_t cs_err;
00519 unsigned int i;
00520
00521 for (i = 0; i < ARRAY_LEN(event_types); i++) {
00522 if (event_types[i].sub) {
00523 event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
00524 }
00525 event_types[i].publish = 0;
00526 event_types[i].subscribe = 0;
00527 }
00528
00529 if (dispatch_thread.id != AST_PTHREADT_NULL) {
00530 char meepmeep = 'x';
00531 dispatch_thread.stop = 1;
00532 if (ast_carefulwrite(dispatch_thread.alert_pipe[1], &meepmeep, 1,
00533 5000) == -1) {
00534 ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
00535 strerror(errno), errno);
00536 }
00537 pthread_join(dispatch_thread.id, NULL);
00538 }
00539
00540 if (dispatch_thread.alert_pipe[0] != -1) {
00541 close(dispatch_thread.alert_pipe[0]);
00542 dispatch_thread.alert_pipe[0] = -1;
00543 }
00544
00545 if (dispatch_thread.alert_pipe[1] != -1) {
00546 close(dispatch_thread.alert_pipe[1]);
00547 dispatch_thread.alert_pipe[1] = -1;
00548 }
00549
00550 if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
00551 ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
00552 }
00553 cpg_handle = 0;
00554
00555 if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
00556 ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
00557 }
00558 cfg_handle = 0;
00559 }
00560
00561 static int load_module(void)
00562 {
00563 cs_error_t cs_err;
00564 enum ast_module_load_result res = AST_MODULE_LOAD_FAILURE;
00565 struct cpg_name name;
00566
00567 if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
00568 ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
00569 return AST_MODULE_LOAD_DECLINE;
00570 }
00571
00572 if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
00573 ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
00574 goto failed;
00575 }
00576
00577 ast_copy_string(name.value, "asterisk", sizeof(name.value));
00578 name.length = strlen(name.value);
00579
00580 if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
00581 ast_log(LOG_ERROR, "Failed to join (%d)\n", (int) cs_err);
00582 goto failed;
00583 }
00584
00585 if (pipe(dispatch_thread.alert_pipe) == -1) {
00586 ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
00587 strerror(errno), errno);
00588 goto failed;
00589 }
00590
00591 if (ast_pthread_create_background(&dispatch_thread.id, NULL,
00592 dispatch_thread_handler, NULL)) {
00593 ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n");
00594 goto failed;
00595 }
00596
00597 if (load_config(0)) {
00598
00599 res = AST_MODULE_LOAD_DECLINE;
00600 goto failed;
00601 }
00602
00603 ast_cli_register_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
00604
00605 ast_enable_distributed_devstate();
00606
00607 return AST_MODULE_LOAD_SUCCESS;
00608
00609 failed:
00610 cleanup_module();
00611
00612 return res;
00613 }
00614
00615 static int unload_module(void)
00616 {
00617 ast_cli_unregister_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
00618
00619 cleanup_module();
00620
00621 return 0;
00622 }
00623
00624 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Corosync");