diff options
| author | David Teigland <teigland@redhat.com> | 2009-05-26 16:13:46 -0500 |
|---|---|---|
| committer | David Teigland <teigland@redhat.com> | 2009-05-26 16:13:46 -0500 |
| commit | 626ac2fc3f29df3344f4da20e3a9016b7cdb3c8a (patch) | |
| tree | ff3bc700dce069ebbaa00284856944499ef7f1c3 | |
| parent | f247b0074950844f7cb1d94e5a8355b2238a95d7 (diff) | |
| download | dct-stuff-626ac2fc3f29df3344f4da20e3a9016b7cdb3c8a.tar.gz dct-stuff-626ac2fc3f29df3344f4da20e3a9016b7cdb3c8a.tar.xz dct-stuff-626ac2fc3f29df3344f4da20e3a9016b7cdb3c8a.zip | |
cpgx: more work
. move code around
. save debug log, write it out on error
. configurable sync_max
. don't save sync history (just header) to others while waiting for our sync
. error if no dispatches for 10 sec
. rework confchg leave/join/sync wait/done state checks
. check for case where all others leave without sending us sync
. check all events after time message
. don't send syncs if we're leaving or left
Signed-off-by: David Teigland <teigland@redhat.com>
| -rw-r--r-- | cpgx/cpgx.c | 398 |
1 files changed, 233 insertions, 165 deletions
diff --git a/cpgx/cpgx.c b/cpgx/cpgx.c index aeeca1c..179638d 100644 --- a/cpgx/cpgx.c +++ b/cpgx/cpgx.c @@ -16,48 +16,11 @@ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ -/* Limitations: max 8 nodes, nodeids between 1 and 255 */ - -/* - Initial outline: - - recv time message - - assert from a member - if sync_done - - history[event_num++] = contents - - assert leave_done == 0 - - assert join_done == 1 - else - - save message - - recv sync message - if !sync_done and to us - - copy contents into history, set event_num - - recv saved time messages - - sync_done = 1 - if !sync_done and not to us - - ignore - if sync_done - - set needs_sync = 0 for node sent to - - confchg - - history[event_num++] = members/joined/left - - if we joined: join_done = 1, sync_done = 0 - - if we left: leave_done = 1 - - for each joined node, set needs_sync = 1 - - if any nodes need sync, and we are synced and we are low_nodeid, - send sync message to each specific node with history from the event_num - when they were removed up to the event_num where they were added - - send time message - - send buf with our time and last event_num - - New history begins at zero every time a new group is formed (all members go +/* New history begins at zero every time a new group is formed (all members go away). Otherwise, when formning a new group we'd have to wait for all members to join and gather the latest history from all so we know where to begin. This also means that a node clears its history before every join, - since history may have been restarted while it was away. -*/ + since history may have been restarted while it was away. */ #include <stdio.h> #include <stdlib.h> @@ -76,50 +39,26 @@ #include <corosync/cpg.h> #include "list.h" +#define CLIENT_NALLOC 2 +#define MAX_NODES 8 /* not easily changed */ +#define DUMP_SIZE (1024 * 1024) +#define HISTORY_EVENTS (1024 * 1024) +#define DEFAULT_SYNC_MAX 1000 /* sync up to this many events */ +#define DUMP_WRITE_PATH "/var/log/cluster/cpgx_debug.txt" + +#define EV_CONFCHG 1 +#define EV_MSGTIME 2 +#define EV_MSGSYNC 3 + struct client { int fd; void *workfn; void *deadfn; }; -#define CLIENT_NALLOC 2 - -static int client_maxi; -static int client_size = 0; -static struct client *client = NULL; -static struct pollfd *pollfd = NULL; - -struct cpg_name dct_cpg_name; -cpg_handle_t dct_cpg_handle; -int dct_cpg_client; -int dct_cpg_fd; - -int daemon_quit; -int cluster_down; -int opt_leave = 1; -int opt_fail = 1; -int got_error = 0; -int continue_after_error = 0; -int opt_print_event = 1; -int opt_print_debug = 1; - -int join_wait; -int join_done; -int sync_wait; -int sync_done; -int leave_wait; -int leave_done; - -uint8_t our_nodeid; -uint32_t eventid; -uint32_t last_config_eventid; -uint32_t dispatch_count; - -#define MAX_NODES 8 - -#define EV_CONFCHG 1 -#define EV_MSGTIME 2 -#define EV_MSGSYNC 3 +/* time msg is followed by N event structs from history for run-time checks + sync msg is followed by M event structs from history to sync new node, + the final one is the confchg event that to_nodeid joined in */ struct dct_header { uint8_t type; /* EV_MSGTIME, EV_MSGSYNC */ @@ -133,10 +72,6 @@ struct dct_header { uint32_t event_count; /* this many events after header */ }; /* 28 bytes */ -/* time msg is followed by N event structs from history for run-time checks */ -/* sync msg is followed by M event structs from history to sync new node, - the final one is the confchg event that to_nodeid joined in */ - struct dct_config { uint8_t type; /* EV_CONFCHG */ uint8_t memb_count; @@ -156,10 +91,6 @@ struct event { }; }; /* 32 bytes */ -int history_len; -struct event *history; -struct dct_config *last_config; - struct node { struct list_head list; uint8_t nodeid; @@ -174,8 +105,6 @@ struct node { from this node. not used for anything */ }; -struct list_head nodes; - struct save_event { struct list_head list; int type; @@ -184,27 +113,79 @@ struct save_event { char buf[0]; }; -struct list_head saved_events; +static int client_maxi; +static int client_size = 0; +static struct client *client = NULL; +static struct pollfd *pollfd = NULL; + +static struct cpg_name dct_cpg_name; +static cpg_handle_t dct_cpg_handle; +static int dct_cpg_client; +static int dct_cpg_fd; + +static int daemon_quit; +static int cluster_down; +static int opt_leave = 1; +static int opt_fail = 1; +static int got_error = 0; +static int continue_after_error = 0; +static int opt_print_event = 1; +static int opt_print_debug = 1; +static time_t prog_begin; +static int dump_point; +static int dump_wrap; +static char daemon_debug_buf[256]; +static char dump_buf[DUMP_SIZE]; + +static uint8_t our_nodeid; +static uint32_t eventid; +static uint32_t last_config_eventid; +static uint32_t sync_max = DEFAULT_SYNC_MAX; +static uint32_t dispatch_count; +static struct timeval last_dispatch; +static int history_len; +static struct event *history; +static struct dct_config *last_config; +static struct list_head nodes; +static struct list_head saved_events; + +static int join_wait; +static int join_done; +static int sync_wait; +static int sync_done; +static int leave_wait; +static int leave_done; static void process_message(struct dct_header *hd, int len); void send_sync(uint8_t nodeid); +void daemon_dump_save(void); +void daemon_dump_write(void); #define log_error(fmt, args...) \ do { \ - fprintf(stderr, "ERROR: " fmt "\n", ##args); \ + snprintf(daemon_debug_buf, 255, "%ld ERROR: " fmt "\n", \ + time(NULL), ##args); \ + daemon_dump_save(); \ + fprintf(stderr, "%s", daemon_debug_buf); \ got_error = 1; \ } while (0) #define log_history(fmt, args...) \ do { \ + snprintf(daemon_debug_buf, 255, "%ld H: " fmt "\n", \ + time(NULL), ##args); \ + daemon_dump_save(); \ if (opt_print_event) \ - fprintf(stdout, "H: " fmt "\n", ##args); \ + fprintf(stdout, "%s", daemon_debug_buf); \ } while (0) #define log_debug(fmt, args...) \ do { \ + snprintf(daemon_debug_buf, 255, "%ld D: " fmt "\n", \ + time(NULL), ##args); \ + daemon_dump_save(); \ if (opt_print_debug) \ - fprintf(stdout, "D: " fmt "\n", ##args); \ + fprintf(stdout, "%s", daemon_debug_buf); \ } while (0) static void _log_config(struct dct_config *c, uint32_t id, int error) @@ -357,11 +338,6 @@ static int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci)) goto again; } -static void sigterm_handler(int sig) -{ - daemon_quit = 1; -} - void cluster_dead(int ci) { if (!cluster_down) @@ -732,8 +708,6 @@ void send_time(void) /* TODO: ability to send full history, cpg max message sizes limit us */ -#define SYNC_MAX 1000 - void send_sync(uint8_t nodeid) { char *buf; @@ -757,8 +731,8 @@ void send_sync(uint8_t nodeid) event_count = node->join_eventid + 1; - if (event_count > SYNC_MAX) { - count = SYNC_MAX; + if (event_count > sync_max) { + count = sync_max; end_eventid = node->join_eventid; start_eventid = end_eventid - count + 1; } else { @@ -834,6 +808,13 @@ void read_events(struct dct_header *hd, int len, int check_only) int count = hd->event_count; int i; + /* we don't save the entire sync message sent to another node on + save_events while waiting for our own sync message; we just save + the header */ + + if (len == sizeof(struct dct_header)) + return; + if (len != sizeof(struct dct_header) + count * sizeof(struct event)) { log_error("read_events bad len %d count %d", len, count); return; @@ -1006,10 +987,11 @@ static void confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, { struct dct_config c; struct node *node; - int we_left, we_join; + int we_join = 0; int i; dispatch_count++; + gettimeofday(&last_dispatch, NULL); memset(&c, 0, sizeof(struct dct_config)); @@ -1027,51 +1009,77 @@ static void confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, c.join[i] = (uint8_t)join_list[i].nodeid; if (leave_done) { - /* the we_left confchg should be the very last event we see */ - log_error("confchg_cb after leave_done"); + /* our left confchg should be the very last event we see */ + log_error("confchg after leave_done"); log_config(&c, 1); return; } - we_left = leave_wait && in_left(&c, our_nodeid); - we_join = join_wait && in_memb(&c, our_nodeid); + if (!leave_wait && in_left(&c, our_nodeid)) { + log_error("confchg in_left not leave_wait"); + log_config(&c, 1); + return; + } - if (we_left) { + if (leave_wait && in_left(&c, our_nodeid)) { leave_wait = 0; leave_done = 1; cpg_finalize(dct_cpg_handle); client_dead(dct_cpg_client); + add_history_confchg(&c); + update_nodes_list(last_config, last_config_eventid); + return; } - if (!we_left && !in_memb(&c, our_nodeid)) { - log_error("confchg_cb without our_nodeid %u", our_nodeid); + if (!in_memb(&c, our_nodeid)) { + log_error("confchg without our_nodeid %u", our_nodeid); log_config(&c, 1); return; } - if (we_join) { + if (join_wait && in_memb(&c, our_nodeid)) { join_wait = 0; join_done = 1; sync_wait = 1; sync_done = 0; + we_join = 1; } - /* shortcut to bootstrap things, doesn't work if more than one node - join in the first confchg. If it happens, we'll see the initial - members all sitting in sync_wait for an existing member (there is - none) to send a sync message. To do this properly we'd probably - need to have everyone exchange state messages for each confchg so we - could detect when multiple new nodes join in the first confchg of a - newly formed cpg. Or, rely on the join list to be the same as the - member list? */ + /* Shortcut to bootstrap things. Doesn't work if more than one node + join in the first confchg. If it happens, all the nodes joining + together will be sitting waiting for a sync message. + To do this properly we'd need to have everyone exchange state + messages for each confchg so we can detect when multiple new nodes + join in the first confchg of a newly formed cpg. Or, rely on the + join list to be the same as the member list? */ if (we_join && c.memb_count == 1) { sync_wait = 0; sync_done = 1; + add_history_confchg(&c); + if (!list_empty(&nodes)) { + log_error("bootstrap nodes not empty"); + print_nodes_list(); + } node = add_node(our_nodeid); node->is_member = 1; node->needs_sync = 0; - add_history_confchg(&c); + return; + } + + /* Case we can't handle with bootstrap shortcut. Confchg for node + join while all synced members have called leave, so won't send a + sync message to the joined node. The node waiting for a sync then + sees all existing nodes leave, having received no sync message. We + could process saved_events, declaring first one (our join) to be + eventid 0. That still doesn't work if another node joins before all + the confchg's for the leaving node (which might be solvable, but + gets very complicated; better at that point to exchange state + messages for each confchg). */ + + if (sync_wait && c.memb_count == 1) { + log_error("confchg left alone without sync"); + log_config(&c, 1); return; } @@ -1089,8 +1097,10 @@ static void receive_sync(struct dct_header *hd, int len) { if (sync_wait) { if (hd->to_nodeid != our_nodeid) { - /* save events to add to history after we're synced */ - save_message(hd, len, EV_MSGSYNC); + /* save events to add to history after we're synced; + don't save/check the event history for other nodes + while waiting for our own sync */ + save_message(hd, sizeof(struct dct_header), EV_MSGSYNC); } else { sync_wait = 0; sync_done = 1; @@ -1105,7 +1115,7 @@ static void receive_sync(struct dct_header *hd, int len) if (hd->to_nodeid == our_nodeid) log_debug("receive_sync from %d redundant", hd->nodeid); - /* check we agree with past events being synced to new node */ + /* check the sync message to verify we agree with it */ read_events(hd, len, 1); add_history_message(hd, len); @@ -1117,6 +1127,8 @@ static void receive_time(struct dct_header *hd, int len) { struct event *ev_buf; struct node *node; + uint32_t end_eventid = 0; + int i; if (sync_wait) { /* save events to add to history after we're synced */ @@ -1133,9 +1145,6 @@ static void receive_time(struct dct_header *hd, int len) return; } - /* check our scheme of syncing nodes, where nodes don't send time - messages until they're synced */ - node = get_node(hd->nodeid); if (!node) { log_error("receive_time no node %u", hd->nodeid); @@ -1151,17 +1160,21 @@ static void receive_time(struct dct_header *hd, int len) add_history_message(hd, len); - /* an event from past history (which everyone should know about) is + /* events from past history (which everyone should know about) are included in time messages as a way to check that nodes are staying in sync as we go */ ev_buf = (struct event *)((char *)hd + sizeof(struct dct_header)); - check_event(ev_buf); - - node->last_check_eventid = ev_buf->eventid; + for (i = 0; i < hd->event_count; i++) { + check_event(ev_buf); + end_eventid = ev_buf->eventid; + ev_buf++; + } + node->last_check_eventid = end_eventid; - /* this check currently fails with corosync */ + /* this check currently fails with corosync, + TODO: add option to enable this check */ #if 0 /* Track whether messages sent in configuration C1 are delivered in C1 instead of a subsequent C2. Recent reading led me to believe that @@ -1194,9 +1207,10 @@ static void deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, struct dct_header *hd = data; dispatch_count++; + gettimeofday(&last_dispatch, NULL); if (len < sizeof(struct dct_header)) { - log_error("deliver_cb short message %d", len); + log_error("deliver short message %d", len); log_header(hd, 1); return; } @@ -1209,14 +1223,14 @@ static void deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, if (join_wait) { /* the we_joined confchg should be the first event we see */ - log_error("deliver_cb before joined"); + log_error("deliver before joined"); log_header(hd, 1); return; } if (leave_done) { /* the we_left confchg should be the last event we see */ - log_error("deliver_cb after left"); + log_error("deliver after left"); log_header(hd, 1); return; } @@ -1242,7 +1256,8 @@ static void process_cpg(int ci) /* can't send from dispatch, so just flag the nodes that need syncing during the dispatch and send the sync messages now */ - send_syncs(); + if (join_done && sync_done && !leave_wait && !leave_done) + send_syncs(); } int do_join(void) @@ -1394,10 +1409,18 @@ void loop(void) void (*workfn) (int ci); void (*deadfn) (int ci); int poll_timeout = 5; /* ms */ + unsigned long ms; + struct timeval now; int rv, i; srandom(time(NULL)); + memset(history, 0, history_len); + free_nodes_list(); + + dispatch_count = 0; + gettimeofday(&last_dispatch, NULL); + sync_wait = 0; sync_done = 0; leave_wait = 0; @@ -1408,22 +1431,16 @@ void loop(void) eventid = 0; - memset(history, 0, history_len); - free_nodes_list(); - do_join(); for (;;) { rv = poll(pollfd, client_maxi + 1, poll_timeout); if (rv == -1 && errno == EINTR) { - if (daemon_quit) - goto out; - continue; + if (!daemon_quit) + continue; } - if (rv < 0) { + if (rv < 0) log_error("poll errno %d", errno); - goto out; - } /* * read events from callbacks @@ -1442,9 +1459,15 @@ void loop(void) } } + gettimeofday(&now, NULL); + ms = time_diff_ms(&last_dispatch, &now); + if (ms > 10000) + log_error("no cpg dispatch in %ul ms", ms); + if (got_error) { fflush(stdout); fflush(stderr); + daemon_dump_write(); exit(EXIT_FAILURE); } @@ -1452,16 +1475,6 @@ void loop(void) * do things that create events (send messages, leave, fail) */ - if (!join_done && dispatch_count) { - /* a confchg for our join should be the first event we - see, so this shouldn't happen. without the - dispatch_count condition this error triggers - sometimes, indicating zero dispatch_count */ - log_error("event before join done dispatches %u", - dispatch_count); - continue; - } - if (!sync_done) { /* don't do things until we're synced */ continue; @@ -1500,24 +1513,27 @@ void loop(void) void print_usage(void) { - printf("Output:\n"); - printf("ERROR: <error string> (stderr)\n"); - printf("H: <event string> (stdout)\n"); - printf("D: <debug string> (stdout)\n"); - printf("\n"); printf("Options:\n"); printf(" -H [0|1] event history output [off|on] default 1\n"); printf(" -D [0|1] debug output [off|on] default 1\n"); printf(" -f [0|1] fail included in test [off|on] default 1\n"); printf(" -l [0|1] leave included in test [off|on] default 1\n"); + printf(" -s <num> sync up to num events, default %d\n", + DEFAULT_SYNC_MAX); printf(" -c continue after error\n"); - - printf("to prevent history from periodically restarting from 0,\n" - "keep one node from either leaving or failing with -f0 -l0\n"); + printf("\n"); + printf("Output:\n"); + printf(" <time> ERROR: <error string> (stderr)\n"); + printf(" <time> H: <event string> (stdout)\n"); + printf(" <time> D: <debug string> (stdout)\n"); + printf("\n"); + printf("Notes:\n"); + printf(" - to prevent history from periodically restarting from 0,\n" + " keep one node from leaving or failing with -f0 -l0\n"); + printf(" - 8 nodes max, nodeids beteen 1 and 255\n"); + printf(" - debug dump on error: %s\n", DUMP_WRITE_PATH); } -#define HISTORY_EVENTS (1024 * 1024) - int main(int argc, char **argv) { pid_t pid; @@ -1526,7 +1542,7 @@ int main(int argc, char **argv) int optchar; while (cont) { - optchar = getopt(argc, argv, "H:D:cf:l:h"); + optchar = getopt(argc, argv, "H:D:f:l:s:ch"); switch (optchar) { case 'H': @@ -1545,6 +1561,10 @@ int main(int argc, char **argv) opt_leave = atoi(optarg); break; + case 's': + sync_max = atoi(optarg); + break; + case 'c': continue_after_error = 1; break; @@ -1574,12 +1594,17 @@ int main(int argc, char **argv) while (1) { pid = fork(); - if (!pid) { /* - * repeat join/work/leave until exit + * repeat join/work/leave until exit. loop() will + * write dump_buf before exiting on an error */ + memset(dump_buf, 0, sizeof(dump_buf)); + dump_point = 0; + dump_wrap = 0; + prog_begin = time(NULL); + while (1) { loop(); sleep(rand_int(0, 3)); @@ -1608,3 +1633,46 @@ int main(int argc, char **argv) return 1; } +void daemon_dump_save(void) +{ + int len, i; + + len = strlen(daemon_debug_buf); + + for (i = 0; i < len; i++) { + dump_buf[dump_point++] = daemon_debug_buf[i]; + + if (dump_point == DUMP_SIZE) { + dump_point = 0; + dump_wrap = 1; + } + } +} + +void daemon_dump_write(void) +{ + char begin[64]; + char end[64]; + time_t now; + FILE *fp; + + fp = fopen(DUMP_WRITE_PATH, "a"); + if (!fp) + return; + + now = time(NULL); + strftime(begin, sizeof(begin), "%b %d %T", localtime(&prog_begin)); + strftime(end, sizeof(end), "%b %d %T", localtime(&now)); + + fprintf(fp, "cpgx %s - %s\n", begin, end); + + if (dump_wrap) + fprintf(fp, "%s", dump_buf + dump_point); + + dump_buf[dump_point] = '\0'; + fprintf(fp, "%s", dump_buf); + + fflush(fp); + fclose(fp); +} + |
