/* * Copyright (c) 2009 David Teigland * All Rights Reserved. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License V2 * as published by the Free Software Foundation. * * This program is distributed in the hope that it would be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ /* New history begins at zero every time a new group is formed (all members go away). Otherwise, when formning a new group we'd have to wait for all members to join and gather the latest history from all so we know where to begin. This also means that a node clears its history before every join, since history may have been restarted while it was away. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "list.h" #ifdef WHITETANK #include static char *exec_name = "aisexec"; #else #include static char *exec_name = "corosync"; #endif #define MAX_NODES 64 /* options 8, 16, 32, 64 */ #if MAX_NODES == 8 #define EVENT_BUF_LEN 28 /* + 4 = 32 */ #endif #if MAX_NODES == 16 #define EVENT_BUF_LEN 60 /* + 4 = 64 */ #endif #if MAX_NODES == 32 #define EVENT_BUF_LEN 124 /* + 4 = 128 */ #endif #if MAX_NODES == 64 #define EVENT_BUF_LEN 252 /* + 4 = 256 */ #endif #define CLIENT_NALLOC 2 #define DUMP_SIZE (1024 * 1024) #define HISTORY_EVENTS (1024 * 4) #define DEFAULT_SYNC_MAX 1000 /* sync up to this many events */ #define DEFAULT_TIMEOUT_SEC 60 #define DEFAULT_PORT 5405 #define DEFAULT_RESTART_SEC 10 #define DUMP_WRITE_PATH "/var/log/cluster/cpgx_debug.txt" #define LEAVE_TIME_MAX 10 /* leave or exit up to this long after join*/ #define EV_CONFCHG 1 #define EV_MSGTIME 2 #define EV_MSGSYNC 3 struct client { int fd; void *workfn; void *deadfn; }; /* time msg is followed by N event structs from history for run-time checks sync msg is followed by M event structs from history to sync new node, the final one is the confchg event that to_nodeid joined in */ struct dct_header { uint8_t type; /* EV_MSGTIME, EV_MSGSYNC */ uint8_t nodeid; /* sender */ uint8_t to_nodeid; /* for MSGSYNC */ uint8_t pad; uint8_t synced_nodes[MAX_NODES];/* for MSGSYNC, nodes in sync */ uint32_t tv_sec; uint32_t tv_usec; uint32_t last_config; /* last config eventid we've seen */ uint32_t event_count; /* this many events after header */ }; /* 28 bytes, 36 bytes, 53 bytes, 84 bytes */ struct dct_config { uint8_t type; /* EV_CONFCHG */ uint8_t memb_count; uint8_t join_count; uint8_t left_count; uint8_t memb[MAX_NODES]; uint8_t join[MAX_NODES]; uint8_t left[MAX_NODES]; }; /* 28 bytes, 52 bytes, 100 bytes, 196 bytes */ struct event { uint32_t eventid; union { struct dct_header header; struct dct_config config; char buf[EVENT_BUF_LEN]; }; }; /* 32 bytes, 64 bytes, 128 bytes, 256 bytes */ struct node { struct list_head list; uint8_t nodeid; uint8_t sync_from; /* node that will send the sync to this node */ int is_member; int needs_sync; /* when this node joins we set this to 1, and clear it when we see sync message for it */ int sent_sync; /* we sent a sync */ uint32_t join_eventid; uint32_t last_check_eventid; /* the eventid from the check event in the most recent time message we received from this node. not used for anything */ uint8_t synced_nodes[MAX_NODES]; }; struct save_event { struct list_head list; int type; int we_join; int len; char buf[0]; }; static int client_maxi; static int client_size = 0; static struct client *client = NULL; static struct pollfd *pollfd = NULL; static struct cpg_name our_cpg_name; static cpg_handle_t our_cpg_handle; static int our_cpg_client; static int our_cpg_fd; static char our_name[32]; static char iptables_a[128]; static char iptables_d[128]; static char exec_addr[64]; static int exec_port = DEFAULT_PORT; static int exec_join = 0; static int prog_quit; static int cluster_down; static int opt_leave = 1; static int opt_exit = 1; static int opt_die = 0; static int iterations_sec = 0; static int timeout_sec = DEFAULT_TIMEOUT_SEC; static int restart_sec = DEFAULT_RESTART_SEC; static int run_iptables = 0; static int continue_after_error = 0; static int opt_print_event = 1; static int opt_print_debug = 1; static int got_error = 0; static time_t parent_begin; static time_t child_begin; static time_t join_time; static time_t leave_time; static time_t last_dispatch; static uint32_t dispatch_count; static int dump_point; static int dump_wrap; static char debug_buf[256]; static char dump_buf[DUMP_SIZE]; static uint8_t our_nodeid; static uint32_t eventid; static uint32_t last_config_eventid; static uint32_t sync_max = DEFAULT_SYNC_MAX; static int events_len; static struct event *events; static struct dct_config last_config; static struct list_head nodes; static struct list_head saved_events; static int join_wait; static int join_done; static int sync_wait; static int sync_done; static int leave_wait; static int leave_done; static void process_message(struct dct_header *hd, int len); void send_sync(uint8_t nodeid); void dump_save(void); void dump_write(void); /* TODO: if there's a gap of 2 or more seconds between consecutive output lines, insert a "GAP" line */ #define log_error(fmt, args...) \ do { \ snprintf(debug_buf, 255, "%ld ERROR: " fmt "\n", \ time(NULL), ##args); \ dump_save(); \ fprintf(stderr, "%s", debug_buf); \ got_error = 1; \ } while (0) /* TODO: -H 0 (off), 1 (on), T (time), S (sync), C (conf), TSC, etc */ #define log_history(fmt, args...) \ do { \ snprintf(debug_buf, 255, "%ld H: " fmt "\n", time(NULL), ##args); \ dump_save(); \ if (opt_print_event) \ fprintf(stdout, "%s", debug_buf); \ } while (0) #define log_debug(fmt, args...) \ do { \ snprintf(debug_buf, 255, "%ld D: " fmt "\n", time(NULL), ##args); \ dump_save(); \ if (opt_print_debug) \ fprintf(stdout, "%s", debug_buf); \ } while (0) static void _log_config(struct dct_config *c, uint32_t id, int error) { char m_buf[256]; char j_buf[256]; char l_buf[256]; int i, off; memset(m_buf, 0, sizeof(m_buf)); memset(j_buf, 0, sizeof(j_buf)); memset(l_buf, 0, sizeof(l_buf)); off = 0; for (i = 0; i < c->memb_count; i++) off += sprintf(m_buf+off, " %u", c->memb[i]); off = 0; for (i = 0; i < c->join_count; i++) off += sprintf(j_buf+off, " %u", c->join[i]); off = 0; for (i = 0; i < c->left_count; i++) off += sprintf(l_buf+off, " %u", c->left[i]); if (error == 1) log_error("%08u conf %u %u %u memb%s join%s left%s", id, c->memb_count, c->join_count, c->left_count, m_buf, j_buf, l_buf); else if (error == 2) log_debug("%08u conf %u %u %u memb%s join%s left%s", id, c->memb_count, c->join_count, c->left_count, m_buf, j_buf, l_buf); else log_history("%08u conf %u %u %u memb%s join%s left%s", id, c->memb_count, c->join_count, c->left_count, m_buf, j_buf, l_buf); } #define log_config(_c, _er) _log_config(_c, 0, _er) static void _log_header(struct dct_header *h, uint32_t id, int error) { if (error && h->type == EV_MSGTIME) log_error("%08u time %u tv %u.%06u config %u", id, h->nodeid, h->tv_sec, h->tv_usec, h->last_config); else if (error && h->type == EV_MSGSYNC) log_error("%08u sync %u to %u tv %u.%06u config %u count %u", id, h->nodeid, h->to_nodeid, h->tv_sec, h->tv_usec, h->last_config, h->event_count); else if (!error && h->type == EV_MSGTIME) log_history("%08u time %u tv %u.%06u config %u", id, h->nodeid, h->tv_sec, h->tv_usec, h->last_config); else if (!error && h->type == EV_MSGSYNC) log_history("%08u sync %u to %u tv %u.%06u config %u count %u", id, h->nodeid, h->to_nodeid, h->tv_sec, h->tv_usec, h->last_config, h->event_count); else log_error("%08u unknown message type %d", id, h->type); } #define log_header(_h, _er) _log_header(_h, 0, _er) static void log_event(struct event *ev, int error) { struct dct_config *c = &ev->config; if (c->type == EV_CONFCHG) _log_config(&ev->config, ev->eventid, error); else _log_header(&ev->header, ev->eventid, error); } static int rand_int(int a, int b) { return a + (int) (((float)(b - a + 1)) * random() / (RAND_MAX+1.0)); } static unsigned long time_diff_ms(struct timeval *begin, struct timeval *end) { struct timeval result; timersub(end, begin, &result); return (result.tv_sec * 1000) + (result.tv_usec / 1000); } static void client_alloc(void) { int i; if (!client) { client = malloc(CLIENT_NALLOC * sizeof(struct client)); pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd)); } else { client = realloc(client, (client_size + CLIENT_NALLOC) * sizeof(struct client)); pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) * sizeof(struct pollfd)); if (!pollfd) log_error("can't alloc for pollfd"); } if (!client || !pollfd) log_error("can't alloc for client array"); for (i = client_size; i < client_size + CLIENT_NALLOC; i++) { client[i].workfn = NULL; client[i].deadfn = NULL; client[i].fd = -1; pollfd[i].fd = -1; pollfd[i].revents = 0; } client_size += CLIENT_NALLOC; } static void client_dead(int ci) { close(client[ci].fd); client[ci].workfn = NULL; client[ci].fd = -1; pollfd[ci].fd = -1; } static int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci)) { int i; if (!client) client_alloc(); again: for (i = 0; i < client_size; i++) { if (client[i].fd == -1) { client[i].workfn = workfn; if (deadfn) client[i].deadfn = deadfn; else client[i].deadfn = client_dead; client[i].fd = fd; pollfd[i].fd = fd; pollfd[i].events = POLLIN; if (i > client_maxi) client_maxi = i; return i; } } client_alloc(); goto again; } void cluster_dead(int ci) { if (!cluster_down) log_error("cluster is down, exiting"); prog_quit = 1; cluster_down = 1; } static inline struct event *history(uint32_t id) { return &events[id % HISTORY_EVENTS]; } struct node *get_node(uint8_t nodeid) { struct node *node; list_for_each_entry(node, &nodes, list) { if (node->nodeid == nodeid) return node; } return NULL; } struct node *add_node(uint8_t nodeid) { struct node *node; node = malloc(sizeof(struct node)); if (!node) return NULL; memset(node, 0, sizeof(struct node)); node->nodeid = nodeid; list_add_tail(&node->list, &nodes); return node; } void set_node_synced(uint8_t nodeid) { struct node *node; node = get_node(nodeid); node->needs_sync = 0; log_debug("nodeid %d needs_sync 0", nodeid); } int in_memb(struct dct_config *c, uint8_t nodeid) { int i; for (i = 0; i < c->memb_count; i++) { if (c->memb[i] == nodeid) return 1; } return 0; } int in_join(struct dct_config *c, uint8_t nodeid) { int i; for (i = 0; i < c->join_count; i++) { if (c->join[i] == nodeid) return 1; } return 0; } int in_left(struct dct_config *c, uint8_t nodeid) { int i; for (i = 0; i < c->left_count; i++) { if (c->left[i] == nodeid) return 1; } return 0; } void print_nodes_list(void) { struct node *node; list_for_each_entry(node, &nodes, list) { log_debug("nodeid %u is_member %d needs_sync %d join %08u check %08u", node->nodeid, node->is_member, node->needs_sync, node->join_eventid, node->last_check_eventid); } } void init_join_eventid(struct node *node) { struct event *ev; struct dct_config *c; int i = eventid - 1; while (1) { ev = history(i); c = &ev->config; if ((c->type == EV_CONFCHG) && in_join(c, node->nodeid)) { node->join_eventid = ev->eventid; return; } if (!i) break; i--; } } /* hd is the header of our sync message, c is the config for our join event */ void init_nodes_list(struct dct_header *hd, struct dct_config *c) { struct node *node; uint8_t nodeid; int i; if (!list_empty(&nodes)) { log_error("init_nodes_list nodes not empty"); return; } for (i = 0; i < c->memb_count; i++) { node = add_node(c->memb[i]); node->is_member = 1; node->needs_sync = 1; } for (i = 0; i < MAX_NODES; i++) { nodeid = hd->synced_nodes[i]; if (!nodeid) break; node = get_node(nodeid); if (!node) log_error("init_nodes_list synced_nodes %u", nodeid); else node->needs_sync = 0; } /* I don't think there's any case where we'll need to know the join_eventid of an earlier joining node to send it a sync */ #if 0 list_for_each_entry(node, &nodes, list) init_join_eventid(node); #endif log_debug("init_nodes_list"); print_nodes_list(); } void free_nodes_list(void) { struct node *node, *safe; list_for_each_entry_safe(node, safe, &nodes, list) { list_del(&node->list); free(node); } } void update_nodes_list(struct dct_config *c, uint32_t id) { struct node *node; uint8_t synced_nodes[MAX_NODES]; uint8_t low = 0; int i, is_memb, is_join, is_left; list_for_each_entry(node, &nodes, list) { is_memb = in_memb(c, node->nodeid); is_join = in_join(c, node->nodeid); is_left = in_left(c, node->nodeid); if (is_memb && node->is_member) { if (is_join || is_left) log_error("member list off a %d %d %d %d %u", is_memb, is_join, is_left, node->is_member, node->nodeid); continue; } if (!is_memb && !node->is_member) { if (is_join || is_left) log_error("member list off b %d %d %d %d %u", is_memb, is_join, is_left, node->is_member, node->nodeid); continue; } /* node has failed/left */ if (!is_memb && node->is_member) { if (is_join || !is_left) log_error("member list off c %d %d %d %d %u", is_memb, is_join, is_left, node->is_member, node->nodeid); node->is_member = 0; node->sync_from = 0; node->sent_sync = 0; continue; } /* old node has joined again */ if (is_memb && !node->is_member) { if (!is_join || is_left) log_error("member list off d %d %d %d %d %u", is_memb, is_join, is_left, node->is_member, node->nodeid); node->is_member = 1; node->needs_sync = 1; node->join_eventid = id; continue; } log_error("update_nodes_list shouldn't get here %d %d %d %d %u", is_memb, is_join, is_left, node->is_member, node->nodeid); } /* figure out who should send syncs and what synced_nodes are */ i = 0; memset(&synced_nodes, 0, MAX_NODES); list_for_each_entry(node, &nodes, list) { if (!node->is_member) continue; if (node->needs_sync) continue; synced_nodes[i++] = node->nodeid; if (!low || node->nodeid < low) low = node->nodeid; } if (!low) log_error("update_nodes_list no synced nodeid"); /* nodes with need_sync and a dead sync_from node */ list_for_each_entry(node, &nodes, list) { if (node->is_member && node->needs_sync && node->sync_from && !in_memb(c, node->sync_from)) { log_debug("node %u sync_from old %u new %u", node->nodeid, node->sync_from, low); node->sync_from = low; } } /* old nodes that have joined again */ list_for_each_entry(node, &nodes, list) { if (node->is_member && node->needs_sync && node->join_eventid == id) { node->sync_from = low; memcpy(&node->synced_nodes, &synced_nodes, MAX_NODES); } } /* add new node we've not seen before */ for (i = 0; i < c->memb_count; i++) { node = get_node(c->memb[i]); if (node) continue; node = add_node(c->memb[i]); node->is_member = 1; node->needs_sync = 1; node->join_eventid = id; node->sync_from = low; memcpy(&node->synced_nodes, &synced_nodes, MAX_NODES); is_join = in_join(c, node->nodeid); if (!is_join) log_error("join list off %d", node->nodeid); } log_debug("update_nodes_list"); print_nodes_list(); } /* Checking sync events by already-synced nodes and the limited history we can send mean that we need to send sync messages to nodes in the order that they joined: - node 4 joins in event 5007 and node 5 joins in event 5008 - we send sync to node 5 with events 3009-5008, and then send sync to node 4 with events 3008-5007 - node 5 will check sync events sent to node 4 and find that it does not have event 3008 (If we could always send the full history then it wouldn't be a problem.) Another problem is if a the later-joining node becomes synced, and all other synced nodes fail, the later-joining now-synced node doesn't know which event the earlier joining, unsynced node joined at. */ void send_syncs(void) { struct node *node, *to_node; uint32_t low_eventid; restart: low_eventid = 0; to_node = NULL; list_for_each_entry(node, &nodes, list) { if (!node->is_member) continue; if (!node->needs_sync) continue; if (node->sync_from != our_nodeid) continue; if (node->sent_sync) continue; if (!low_eventid || node->join_eventid < low_eventid) { low_eventid = node->join_eventid; to_node = node; } } if (low_eventid) { send_sync(to_node->nodeid); to_node->sent_sync = 1; goto restart; } } static int _send_message(cpg_handle_t h, char *buf, int len, int type) { struct iovec iov; cpg_error_t error; int retries = 0; iov.iov_base = buf; iov.iov_len = len; retry: error = cpg_mcast_joined(h, CPG_TYPE_AGREED, &iov, 1); if (error == CPG_ERR_TRY_AGAIN) { retries++; usleep(1000); if (!(retries % 100)) log_debug("cpg_mcast_joined retry %d %d", retries, type); goto retry; } if (error != CPG_OK) { log_error("cpg_mcast_joined error %d handle %llx %d", error, (unsigned long long)h, type); return -1; } return 0; } #define CHECK_COUNT 8 void send_time(void) { char *buf; struct timeval tv; struct dct_header *hd; struct event *ev; uint32_t start_eventid, end_eventid; /* inclusive */ int i, count, event_count, len; event_count = eventid; if (event_count < CHECK_COUNT) { count = event_count; end_eventid = eventid - 1; start_eventid = 0; } else { count = CHECK_COUNT; end_eventid = eventid - 1; start_eventid = end_eventid - count + 1; } len = sizeof(struct dct_header) + count * sizeof(struct event); buf = malloc(len); if (!buf) { log_error("send_time no mem %d", len); return; } memset(buf, 0, len); hd = (struct dct_header *)buf; ev = (struct event *)(buf + sizeof(struct dct_header)); gettimeofday(&tv, NULL); hd->type = EV_MSGTIME; hd->nodeid = our_nodeid; hd->tv_sec = tv.tv_sec; hd->tv_usec = tv.tv_usec; hd->last_config = last_config_eventid; hd->event_count = count; for (i = start_eventid; i < end_eventid + 1; i++) { memcpy(ev, history(i), sizeof(struct event)); ev++; } _send_message(our_cpg_handle, buf, len, EV_MSGTIME); free(buf); } void send_sync(uint8_t nodeid) { char *buf; struct timeval tv; struct dct_header *hd; struct event *ev; struct node *node; uint32_t start_eventid, end_eventid; /* inclusive */ int i, count, event_count, len; node = get_node(nodeid); if (!node) { log_error("send_sync no node %u", nodeid); return; } if (!node->join_eventid) { log_error("send_sync nodeid %d zero join_eventid", nodeid); return; } event_count = node->join_eventid + 1; if (event_count > sync_max) { count = sync_max; end_eventid = node->join_eventid; start_eventid = end_eventid - count + 1; } else { count = event_count; end_eventid = node->join_eventid; start_eventid = 0; } len = sizeof(struct dct_header) + count * sizeof(struct event); buf = malloc(len); if (!buf) { log_error("send_sync no mem %d", len); return; } memset(buf, 0, len); hd = (struct dct_header *)buf; ev = (struct event *)(buf + sizeof(struct dct_header)); gettimeofday(&tv, NULL); hd->type = EV_MSGSYNC; hd->nodeid = our_nodeid; hd->to_nodeid = nodeid; hd->tv_sec = tv.tv_sec; hd->tv_usec = tv.tv_usec; hd->last_config = last_config_eventid; hd->event_count = count; /* which nodes are synced is part of the replicated state that needs to be synced to new nodes */ memcpy(hd->synced_nodes, node->synced_nodes, MAX_NODES); for (i = start_eventid; i < end_eventid + 1; i++) { memcpy(ev, history(i), sizeof(struct event)); ev++; } log_debug("send_sync to %u len %d count %d events %u-%u", nodeid, len, count, start_eventid, end_eventid); _send_message(our_cpg_handle, buf, len, EV_MSGSYNC); free(buf); } void check_event(struct event *ev_buf) { struct dct_config *c = &ev_buf->config; int len = 0; switch (c->type) { case EV_CONFCHG: len = sizeof(struct dct_config); break; case EV_MSGTIME: case EV_MSGSYNC: len = sizeof(struct dct_header); break; default: log_error("check_event unknown type %d", c->type); return; } if (memcmp(ev_buf, history(ev_buf->eventid), len)) { log_error("check_event %u", ev_buf->eventid); log_event(ev_buf, 1); log_event(history(ev_buf->eventid), 1); } } /* last event read should be the one in which we joined, after that, we process saved events, after that we add the sync message these events are being read from */ void read_events(struct dct_header *hd, int len, int check_only) { struct event *ev_buf, *ev_his; struct dct_config *c1, *c2; struct save_event *se; uint32_t start_eventid, end_eventid; /* inclusive */ int count = hd->event_count; int i; /* we don't save the entire sync message sent to another node on save_events while waiting for our own sync message; we just save the header */ if (len == sizeof(struct dct_header)) return; if (len != sizeof(struct dct_header) + count * sizeof(struct event)) { log_error("read_events bad len %d count %d", len, count); return; } ev_buf = (struct event *)((char *)hd + sizeof(struct dct_header)); start_eventid = ev_buf->eventid; for (i = 0; i < count; i++) { ev_his = history(ev_buf->eventid); if (check_only || ev_his->eventid) check_event(ev_buf); else memcpy(ev_his, ev_buf, sizeof(struct event)); end_eventid = ev_buf->eventid; ev_buf++; } log_debug("read_events %u to %u len %d count %d events %u-%u %s", hd->nodeid, hd->to_nodeid, len, count, start_eventid, end_eventid, check_only ? "check" : "copy"); if (check_only) return; /* verify last event in sync series is our join. it should match the first saved event which should be our join config. we remove this first saved event which overlaps what we got from sync */ ev_his = history(end_eventid); c1 = &ev_his->config; if (c1->type != EV_CONFCHG) { log_error("read_events history event not confchg"); log_event(ev_his, 1); } se = list_first_entry(&saved_events, struct save_event, list); c2 = (struct dct_config *)se->buf; if (se->type != EV_CONFCHG) { log_error("read_events saved event not confchg"); log_event((struct event *)se->buf, 1); } if (!se->we_join) { log_error("read_events first entry not our join"); log_config(c2, 1); } if (memcmp(c1, c2, sizeof(struct dct_config))) { log_error("read_events no config match"); log_config(c1, 1); log_config(c2, 1); } eventid = end_eventid + 1; last_config_eventid = end_eventid; memcpy(&last_config, c1, sizeof(struct dct_config)); list_del(&se->list); free(se); init_nodes_list(hd, c1); } void add_history_confchg(struct dct_config *c) { struct event *ev; ev = history(eventid); ev->eventid = eventid++; memcpy(&ev->config, c, sizeof(struct dct_config)); _log_config(c, ev->eventid, 0); last_config_eventid = ev->eventid; memcpy(&last_config, &ev->config, sizeof(struct dct_config)); } void add_history_message(struct dct_header *h, int len) { struct event *ev; ev = history(eventid); ev->eventid = eventid++; memcpy(&ev->header, h, sizeof(struct dct_header)); _log_header(h, ev->eventid, 0); } /* process events that occured between our join event (which is processed in receive_sync and removed from saved_events) and our sync event */ void read_saved_events(void) { struct save_event *se, *safe; int count_c = 0, count_m = 0; log_debug("read_saved_events"); list_for_each_entry_safe(se, safe, &saved_events, list) { if (se->type == EV_CONFCHG) { add_history_confchg((struct dct_config *)&se->buf); update_nodes_list(&last_config, last_config_eventid); count_c++; } else { process_message((struct dct_header *)&se->buf, se->len); count_m++; } list_del(&se->list); free(se); } log_debug("read_saved_events confchg %d message %d", count_c, count_m); } void save_message(struct dct_header *hd, int len, int type) { struct save_event *se; se = malloc(sizeof(struct save_event) + len); if (!se) { log_error("save_message no mem %d", len); return; } memset(se, 0, sizeof(struct save_event) + len); se->type = type; se->len = len; memcpy(&se->buf, hd, len); list_add_tail(&se->list, &saved_events); /* log_debug("save_message"); */ } void save_confchg(struct dct_config *c, int we_join) { struct save_event *se; int len; len = sizeof(struct save_event) + sizeof(struct dct_config); se = malloc(len); if (!se) { log_error("save_confchg no mem %d", len); return; } memset(se, 0, len); se->type = EV_CONFCHG; se->we_join = we_join; memcpy(&se->buf, c, sizeof(struct dct_config)); list_add_tail(&se->list, &saved_events); log_debug("save_confchg"); log_config(c, 2); } static int nodeid_compare(const void *va, const void *vb) { const uint8_t *a = va; const uint8_t *b = vb; return *a - *b; } #ifdef WHITETANK static void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name, struct cpg_address *memb_list, int memb_list_entries, struct cpg_address *left_list, int left_list_entries, struct cpg_address *join_list, int join_list_entries) #else static void confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *memb_list, size_t memb_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *join_list, size_t join_list_entries) #endif { uint8_t memb_sort[MAX_NODES]; uint8_t left_sort[MAX_NODES]; uint8_t join_sort[MAX_NODES]; struct dct_config c; struct node *node; int we_join = 0; int i; dispatch_count++; last_dispatch = time(NULL); memset(&c, 0, sizeof(struct dct_config)); c.type = EV_CONFCHG; /* FIXME: the join/left lists are not globally consistent, so should just ignore them entirely */ c.memb_count = memb_list_entries; c.left_count = left_list_entries; c.join_count = join_list_entries; for (i = 0; i < memb_list_entries; i++) memb_sort[i] = (uint8_t)memb_list[i].nodeid; for (i = 0; i < left_list_entries; i++) left_sort[i] = (uint8_t)left_list[i].nodeid; for (i = 0; i < join_list_entries; i++) join_sort[i] = (uint8_t)join_list[i].nodeid; qsort(memb_sort, memb_list_entries, sizeof(uint8_t), nodeid_compare); qsort(left_sort, left_list_entries, sizeof(uint8_t), nodeid_compare); qsort(join_sort, join_list_entries, sizeof(uint8_t), nodeid_compare); for (i = 0; i < memb_list_entries; i++) c.memb[i] = memb_sort[i]; for (i = 0; i < left_list_entries; i++) c.left[i] = left_sort[i]; for (i = 0; i < join_list_entries; i++) c.join[i] = join_sort[i]; if (leave_done) { /* our left confchg should be the very last event we see */ log_error("confchg after leave_done"); log_config(&c, 1); return; } if (!leave_wait && in_left(&c, our_nodeid)) { log_error("confchg in_left not leave_wait"); log_config(&c, 1); return; } if (leave_wait && in_left(&c, our_nodeid)) { leave_wait = 0; leave_done = 1; cpg_finalize(our_cpg_handle); client_dead(our_cpg_client); add_history_confchg(&c); return; } if (!in_memb(&c, our_nodeid)) { log_error("confchg without our_nodeid %u", our_nodeid); log_config(&c, 1); return; } if (join_wait && in_memb(&c, our_nodeid)) { join_wait = 0; join_done = 1; sync_wait = 1; sync_done = 0; we_join = 1; join_time = time(NULL); leave_time = join_time + rand_int(1, LEAVE_TIME_MAX); } /* Shortcut to bootstrap things. Doesn't work if more than one node join in the first confchg. If it happens, all the nodes joining together will be sitting waiting for a sync message. To do this properly we'd need to have everyone exchange state messages for each confchg so we can detect when multiple new nodes join in the first confchg of a newly formed cpg. Or, rely on the join list to be the same as the member list? */ if (we_join && c.memb_count == 1) { sync_wait = 0; sync_done = 1; add_history_confchg(&c); if (!list_empty(&nodes)) { log_error("bootstrap nodes not empty"); print_nodes_list(); } node = add_node(our_nodeid); node->is_member = 1; node->needs_sync = 0; return; } /* Case we can't handle with bootstrap shortcut. Confchg for node join while all synced members have called leave, so won't send a sync message to the joined node. The node waiting for a sync then sees all existing nodes leave, having received no sync message. We could process saved_events, declaring first one (our join) to be eventid 0. That still doesn't work if another node joins before all the confchg's for the leaving node (which might be solvable, but gets very complicated; better at that point to exchange state messages for each confchg). */ if (sync_wait && c.memb_count == 1) { /* just exit and restart without an error */ log_debug("confchg left alone without sync"); log_config(&c, 0); fflush(stdout); fflush(stderr); exit(2); } if (sync_wait) { save_confchg(&c, we_join); return; } add_history_confchg(&c); update_nodes_list(&last_config, last_config_eventid); } static void receive_sync(struct dct_header *hd, int len) { if (sync_wait) { if (hd->to_nodeid != our_nodeid) { /* save events to add to history after we're synced; don't save/check the event history for other nodes while waiting for our own sync */ save_message(hd, sizeof(struct dct_header), EV_MSGSYNC); } else { sync_wait = 0; sync_done = 1; read_events(hd, len, 0); read_saved_events(); add_history_message(hd, len); set_node_synced(our_nodeid); } } else { if (hd->to_nodeid == our_nodeid) log_debug("receive_sync from %d redundant", hd->nodeid); /* check the sync message to verify we agree with it */ read_events(hd, len, 1); add_history_message(hd, len); set_node_synced(hd->to_nodeid); } } static void receive_time(struct dct_header *hd, int len) { struct event *ev_buf; struct node *node; uint32_t end_eventid = 0; int i; if (sync_wait) { /* save events to add to history after we're synced */ save_message(hd, len, EV_MSGTIME); return; } /* check if the sender is a member of the last configuration */ if (!in_memb(&last_config, hd->nodeid)) { log_error("receive_time from non member"); log_header(hd, 1); log_config(&last_config, 1); return; } node = get_node(hd->nodeid); if (!node) { log_error("receive_time no node %u", hd->nodeid); log_header(hd, 1); return; } if (node->needs_sync) { log_error("receive_time from %u needs_sync", hd->nodeid); log_header(hd, 1); return; } add_history_message(hd, len); /* events from past history (which everyone should know about) are included in time messages as a way to check that nodes are staying in sync as we go */ ev_buf = (struct event *)((char *)hd + sizeof(struct dct_header)); for (i = 0; i < hd->event_count; i++) { check_event(ev_buf); end_eventid = ev_buf->eventid; ev_buf++; } node->last_check_eventid = end_eventid; /* this check currently fails with corosync */ #if 0 /* Track whether messages sent in configuration C1 are delivered in C1 instead of a subsequent C2. Recent reading led me to believe that this was one of the VS guarantees, but I'm not certain. */ if (hd->last_config != last_config_eventid) { log_error("receive_time in other config"); log_header(hd, 1); log_config(&last_config, 1); } #endif } static void process_message(struct dct_header *hd, int len) { switch (hd->type) { case EV_MSGTIME: receive_time(hd, len); break; case EV_MSGSYNC: receive_sync(hd, len); break; }; } #ifdef WHITETANK static void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *data, int len) #else static void deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *data, size_t len) #endif { struct dct_header *hd = data; dispatch_count++; last_dispatch = time(NULL); if (len < sizeof(struct dct_header)) { log_error("deliver short message %u", (unsigned int)len); log_header(hd, 1); return; } if (hd->nodeid != (uint8_t)nodeid) { log_error("bad msg nodeid %u %u", hd->nodeid, nodeid); log_header(hd, 1); return; } if (join_wait) { /* the we_joined confchg should be the first event we see */ log_error("deliver before joined"); log_header(hd, 1); return; } if (leave_done) { /* the we_left confchg should be the last event we see */ log_error("deliver after left"); log_header(hd, 1); return; } process_message(hd, (int)len); } static cpg_callbacks_t cpg_callbacks = { .cpg_deliver_fn = deliver_cb, .cpg_confchg_fn = confchg_cb, }; static void process_cpg(int ci) { cpg_error_t error; error = cpg_dispatch(our_cpg_handle, CPG_DISPATCH_ALL); if (error != CPG_OK) { log_error("cpg_dispatch error %d", error); return; } /* can't send from dispatch, so just flag the nodes that need syncing during the dispatch and send the sync messages now */ if (join_done && sync_done && !leave_wait && !leave_done) send_syncs(); } int do_join(void) { cpg_error_t error; cpg_handle_t h; int i = 0, fd, ci; uint32_t nodeid; if (!our_name[0]) strcpy(our_name, "cpgx"); sprintf(our_cpg_name.value, our_name); our_cpg_name.length = strlen(our_name) + 1; error = cpg_initialize(&h, &cpg_callbacks); if (error != CPG_OK) { log_error("cpg_initialize error %d", error); log_error("is corosync running?"); goto fail_out; } error = cpg_local_get(h, &nodeid); if (error != CPG_OK) { log_error("cpg_local_get error %d", error); goto fail_fin; } if (nodeid < 1 || nodeid > 255) { log_error("nodeids must be between 1 and 255"); goto fail_fin; } our_nodeid = (uint8_t)nodeid; cpg_fd_get(h, &fd); ci = client_add(fd, process_cpg, cluster_dead); our_cpg_handle = h; our_cpg_client = ci; our_cpg_fd = fd; log_debug("do join our_nodeid %u", our_nodeid); retry: error = cpg_join(h, &our_cpg_name); if (error == CPG_ERR_TRY_AGAIN) { sleep(1); if (!(++i % 10)) log_debug("cpg_join error retrying"); goto retry; } if (error != CPG_OK) { log_error("cpg_join error %d", error); cpg_finalize(h); goto fail; } return 0; fail: client_dead(ci); fail_fin: cpg_finalize(h); fail_out: exit(1); } int do_leave(void) { cpg_error_t error; int i = 0; retry: error = cpg_leave(our_cpg_handle, &our_cpg_name); if (error == CPG_ERR_TRY_AGAIN) { sleep(1); if (!(++i % 10)) log_debug("cpg_leave error retrying"); goto retry; } if (error != CPG_OK) log_error("cpg_leave error %d", error); return 0; } static struct timeval last_send; static int wait_send; /* random interval from 5 to 50 ms between every send */ int we_should_send(void) { struct timeval now; unsigned long ms; gettimeofday(&now, NULL); ms = time_diff_ms(&last_send, &now); if (ms >= wait_send) { last_send = now; wait_send = rand_int(5, 50); return 1; } return 0; } /* TODO: when a node fails, causing activity to stall, the should leave/exit times are not adjusted, so all nodes usually end up leaving/exiting during the period after a node failure. */ /* when we join we pick a random number of seconds to run before either leaving or exiting; half the time leave, half exit */ int we_should_leave(void) { time_t now = time(NULL); int half = leave_time % 2; if (!opt_leave) return 0; if (!opt_exit) half = 1; if (now >= leave_time && half) { log_debug("do leave %lu", leave_time - join_time); return 1; } return 0; } int we_should_exit(void) { time_t now = time(NULL); if (!opt_exit) return 0; if (now >= leave_time) { log_debug("do exit %lu", leave_time - join_time); return 1; } return 0; } int we_should_die(void) { static unsigned int tries; int rv; if (!opt_die) return 0; tries++; rv = rand_int(1, 10000); if (rv == 111) { log_debug("do die %u", tries); return 1; } return 0; } int iterations_done(void) { if (!iterations_sec) return 0; if (time(NULL) - parent_begin > iterations_sec) return 1; return 0; } void restart_cluster(void) { if (exec_addr[0]) { log_debug("%s", iptables_a); system(iptables_a); } log_debug("killing %s %s", exec_name, exec_addr); syslog(LOG_WARNING, "%ld killing %s %s", time(NULL), exec_name, exec_addr); if (exec_name[0] == 'a') system("killall -9 aisexec"); else system("killall -9 corosync"); /* others should see us fail before we rejoin, not sure 10s will be enough for some people */ sleep(restart_sec); if (exec_addr[0]) { log_debug("%s", iptables_d); system(iptables_d); } log_debug("starting %s %s", exec_name, exec_addr); syslog(LOG_WARNING, "%ld starting %s %s", time(NULL), exec_name, exec_addr); if (!exec_join) system("cman_tool join -w"); else system(exec_name); #if 0 /* FIXME */ sleep(5); system("cman_tool nodes"); #endif } void loop(void) { void (*workfn) (int ci); void (*deadfn) (int ci); int poll_timeout = 5; /* ms */ int rv, i; srandom(time(NULL)); memset(events, 0, events_len); free_nodes_list(); dispatch_count = 0; last_dispatch = time(NULL); sync_wait = 0; sync_done = 0; leave_wait = 0; leave_done = 0; join_wait = 1; join_done = 0; eventid = 0; do_join(); for (;;) { rv = poll(pollfd, client_maxi + 1, poll_timeout); if (rv == -1 && errno == EINTR) { if (!prog_quit) continue; } if (rv < 0) log_error("poll errno %d", errno); /* * read events from callbacks */ for (i = 0; i <= client_maxi; i++) { if (client[i].fd < 0) continue; if (pollfd[i].revents & POLLIN) { workfn = client[i].workfn; workfn(i); } if (pollfd[i].revents & (POLLERR | POLLHUP | POLLNVAL)){ deadfn = client[i].deadfn; deadfn(i); } } if (timeout_sec && (time(NULL) - last_dispatch > timeout_sec)) { log_error("no cpg dispatch in %d sec", timeout_sec); print_nodes_list(); } if (got_error) { dump_write(); fflush(stdout); fflush(stderr); exit(EXIT_FAILURE); } if (iterations_done()) exit(EXIT_SUCCESS); /* * do things that create events (send messages, leave, exit) */ if (!sync_done) { /* don't do things until we're synced */ continue; } if (leave_done) break; if (leave_wait) { /* don't send messages while waiting for our leave to complete, but we must keep reading events until we've left, which is when a confchg for our leave arrives. */ continue; } if (we_should_leave()) { leave_wait = 1; leave_done = 0; do_leave(); continue; } if (we_should_send()) send_time(); if (we_should_exit()) { fflush(stdout); fflush(stderr); exit(2); } if (we_should_die()) { fflush(stdout); fflush(stderr); restart_cluster(); exit(2); } } } void print_usage(void) { printf("Options:\n"); printf(" -H [0|1] event history output [off|on], default 1\n"); printf(" -D [0|1] debug output [off|on], default 1\n"); printf(" -l [0|1] leave included in test [off|on], default 1\n"); printf(" (program leaves cpg cleanly then rejoins)\n"); printf(" -e [0|1] exit included in test [off|on], default 1\n"); printf(" (program exits without leaving cpg then rejoins)\n"); printf(" -d [0|1] die included in test [off|on], default 0\n"); printf(" (program kills and restarts %s)\n", exec_name); printf(" -n name of the cpg, default \"cpgx\"\n"); printf(" -s sync up to num events, default %d\n", DEFAULT_SYNC_MAX); printf(" -t timeout after no dispatch for this many seconds, default %d\n", DEFAULT_TIMEOUT_SEC); printf(" (0 to wait forever)\n"); printf(" -i run for this many seconds, default 0 (forever)\n"); printf(" -c continue after error\n"); printf(" -V print version\n"); printf(" die options, used with -d1:\n"); printf(" -j restart cluster by running \"%s\", not cman_tool\n", exec_name); printf(" -w wait this many seconds between kill and restart, default %d\n", DEFAULT_RESTART_SEC); printf(" -a IP address used for %s communication\n", exec_name); printf(" iptables blocks before kill, unblocks before restart\n"); printf(" -p udp port used for %s communication, default %d\n", exec_name, DEFAULT_PORT); printf(" -I [A|D] run iptables Append or Delete to block/unblock %s comms\n", exec_name); printf(" (utility, not used by test, use with -a, optionally -p)\n"); printf("\n"); printf("Output:\n"); printf("