diff options
| author | David Teigland <teigland@redhat.com> | 2009-05-28 13:44:27 -0500 |
|---|---|---|
| committer | David Teigland <teigland@redhat.com> | 2009-05-28 13:44:27 -0500 |
| commit | 7bf79e8a629d6d17768a5d48db5a2a1f2cdb8a2f (patch) | |
| tree | a150ef71f31c54ac8adb70a0bae64ab76680f5cb | |
| parent | c8665cdb31d4a57764916d5dca863c645c9a72e0 (diff) | |
| download | dct-stuff-7bf79e8a629d6d17768a5d48db5a2a1f2cdb8a2f.tar.gz dct-stuff-7bf79e8a629d6d17768a5d48db5a2a1f2cdb8a2f.tar.xz dct-stuff-7bf79e8a629d6d17768a5d48db5a2a1f2cdb8a2f.zip | |
cpgx: fix synced_nodes state transfer
The state of synced_nodes need to be snapshotted at the point of a joining
node's confchg.
Also some code munging.
Signed-off-by: David Teigland <teigland@redhat.com>
| -rw-r--r-- | cpgx/cpgx.c | 130 |
1 files changed, 60 insertions, 70 deletions
diff --git a/cpgx/cpgx.c b/cpgx/cpgx.c index 40ec074..5639144 100644 --- a/cpgx/cpgx.c +++ b/cpgx/cpgx.c @@ -103,6 +103,7 @@ struct node { uint32_t last_check_eventid; /* the eventid from the check event in the most recent time message we received from this node. not used for anything */ + uint8_t synced_nodes[8]; }; struct save_event { @@ -123,7 +124,7 @@ static cpg_handle_t dct_cpg_handle; static int dct_cpg_client; static int dct_cpg_fd; -static int daemon_quit; +static int prog_quit; static int cluster_down; static int opt_leave = 1; static int opt_fail = 1; @@ -134,7 +135,7 @@ static int opt_print_debug = 1; static time_t prog_begin; static int dump_point; static int dump_wrap; -static char daemon_debug_buf[256]; +static char debug_buf[256]; static char dump_buf[DUMP_SIZE]; static uint8_t our_nodeid; @@ -158,34 +159,32 @@ static int leave_done; static void process_message(struct dct_header *hd, int len); void send_sync(uint8_t nodeid); -void daemon_dump_save(void); -void daemon_dump_write(void); +void dump_save(void); +void dump_write(void); #define log_error(fmt, args...) \ do { \ - snprintf(daemon_debug_buf, 255, "%ld ERROR: " fmt "\n", \ + snprintf(debug_buf, 255, "%ld ERROR: " fmt "\n", \ time(NULL), ##args); \ - daemon_dump_save(); \ - fprintf(stderr, "%s", daemon_debug_buf); \ + dump_save(); \ + fprintf(stderr, "%s", debug_buf); \ got_error = 1; \ } while (0) #define log_history(fmt, args...) \ do { \ - snprintf(daemon_debug_buf, 255, "%ld H: " fmt "\n", \ - time(NULL), ##args); \ - daemon_dump_save(); \ + snprintf(debug_buf, 255, "%ld H: " fmt "\n", time(NULL), ##args); \ + dump_save(); \ if (opt_print_event) \ - fprintf(stdout, "%s", daemon_debug_buf); \ + fprintf(stdout, "%s", debug_buf); \ } while (0) #define log_debug(fmt, args...) \ do { \ - snprintf(daemon_debug_buf, 255, "%ld D: " fmt "\n", \ - time(NULL), ##args); \ - daemon_dump_save(); \ + snprintf(debug_buf, 255, "%ld D: " fmt "\n", time(NULL), ##args); \ + dump_save(); \ if (opt_print_debug) \ - fprintf(stdout, "%s", daemon_debug_buf); \ + fprintf(stdout, "%s", debug_buf); \ } while (0) static void _log_config(struct dct_config *c, uint32_t id, int error) @@ -200,19 +199,16 @@ static void _log_config(struct dct_config *c, uint32_t id, int error) memset(l_buf, 0, sizeof(l_buf)); off = 0; - for (i = 0; i < c->memb_count; i++) { + for (i = 0; i < c->memb_count; i++) off += sprintf(m_buf+off, " %u", c->memb[i]); - } off = 0; - for (i = 0; i < c->join_count; i++) { + for (i = 0; i < c->join_count; i++) off += sprintf(j_buf+off, " %u", c->join[i]); - } off = 0; - for (i = 0; i < c->left_count; i++) { + for (i = 0; i < c->left_count; i++) off += sprintf(l_buf+off, " %u", c->left[i]); - } if (error) log_error("%08u conf %u %u %u memb%s join%s left%s", @@ -342,7 +338,7 @@ void cluster_dead(int ci) { if (!cluster_down) log_error("cluster is down, exiting"); - daemon_quit = 1; + prog_quit = 1; cluster_down = 1; } @@ -501,8 +497,25 @@ void free_nodes_list(void) void update_nodes_list(struct dct_config *c, uint32_t id) { struct node *node; + uint8_t synced_nodes[8]; + uint8_t low = 0; int i, is_memb, is_join, is_left; + i = 0; + memset(&synced_nodes, 0, 8); + + list_for_each_entry(node, &nodes, list) { + if (!node->is_member) + continue; + if (node->needs_sync) + continue; + + synced_nodes[i++] = node->nodeid; + + if (!low || node->nodeid < low) + low = node->nodeid; + } + list_for_each_entry(node, &nodes, list) { is_memb = in_memb(c, node->nodeid); @@ -511,7 +524,7 @@ void update_nodes_list(struct dct_config *c, uint32_t id) if (is_memb && node->is_member) { if (is_join || is_left) - log_error("member list off %d %d %d %d %u", + log_error("member list off a %d %d %d %d %u", is_memb, is_join, is_left, node->is_member, node->nodeid); continue; @@ -519,7 +532,7 @@ void update_nodes_list(struct dct_config *c, uint32_t id) if (!is_memb && !node->is_member) { if (is_join || is_left) - log_error("member list off %d %d %d %d %u", + log_error("member list off b %d %d %d %d %u", is_memb, is_join, is_left, node->is_member, node->nodeid); continue; @@ -529,7 +542,7 @@ void update_nodes_list(struct dct_config *c, uint32_t id) if (!is_memb & node->is_member) { if (is_join || !is_left) - log_error("member list off %d %d %d %d %u", + log_error("member list off c %d %d %d %d %u", is_memb, is_join, is_left, node->is_member, node->nodeid); @@ -543,13 +556,15 @@ void update_nodes_list(struct dct_config *c, uint32_t id) if (is_memb && !node->is_member) { if (!is_join || is_left) - log_error("member list off %d %d %d %d %u", + log_error("member list off d %d %d %d %d %u", is_memb, is_join, is_left, node->is_member, node->nodeid); node->is_member = 1; node->needs_sync = 1; + node->sync_from = low; node->join_eventid = id; + memcpy(&node->synced_nodes, &synced_nodes, 8); continue; } @@ -568,7 +583,9 @@ void update_nodes_list(struct dct_config *c, uint32_t id) node = add_node(c->memb[i]); node->is_member = 1; node->needs_sync = 1; + node->sync_from = low; node->join_eventid = id; + memcpy(&node->synced_nodes, &synced_nodes, 8); is_join = in_join(c, node->nodeid); if (!is_join) @@ -579,33 +596,6 @@ void update_nodes_list(struct dct_config *c, uint32_t id) print_nodes_list(); } -/* a scheme where new nodes are synced from the low synced nodeid needs to - consider that the low nodeid may fail before syncing the new nodes */ - -void update_nodes_sync_from(void) -{ - struct node *node; - uint8_t low = 0; - - list_for_each_entry(node, &nodes, list) { - if (!node->is_member) - continue; - if (node->needs_sync) - continue; - if (!low || node->nodeid < low) - low = node->nodeid; - } - - list_for_each_entry(node, &nodes, list) { - if (!node->is_member) - continue; - if (!node->needs_sync) - continue; - - node->sync_from = low; - } -} - void send_syncs(void) { struct node *node; @@ -765,13 +755,7 @@ void send_sync(uint8_t nodeid) /* which nodes are synced is part of the replicated state that needs to be synced to new nodes */ - i = 0; - list_for_each_entry(node, &nodes, list) { - if (!node->is_member) - continue; - if (!node->needs_sync) - hd->synced_nodes[i++] = node->nodeid; - } + memcpy(hd->synced_nodes, node->synced_nodes, 8); for (i = start_eventid; i < end_eventid + 1; i++) { memcpy(ev, &history[i], sizeof(struct event)); @@ -1098,9 +1082,12 @@ static void confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, messages for each confchg). */ if (sync_wait && c.memb_count == 1) { - log_error("confchg left alone without sync"); - log_config(&c, 1); - return; + /* just exit and restart without an error */ + log_debug("confchg left alone without sync"); + log_config(&c, 0); + fflush(stdout); + fflush(stderr); + exit(2); } if (sync_wait) { @@ -1110,7 +1097,6 @@ static void confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, add_history_confchg(&c); update_nodes_list(last_config, last_config_eventid); - update_nodes_sync_from(); } static void receive_sync(struct dct_header *hd, int len) @@ -1387,6 +1373,10 @@ int we_should_send(void) return 0; } +/* TODO: when we join, pick a random number of seconds between 1 and 16; + when we've been running for this number of seconds, either leave or fail; + leave if the number is even, fail if it's odd. */ + int we_should_leave(void) { static unsigned int tries; @@ -1456,7 +1446,7 @@ void loop(void) for (;;) { rv = poll(pollfd, client_maxi + 1, poll_timeout); if (rv == -1 && errno == EINTR) { - if (!daemon_quit) + if (!prog_quit) continue; } if (rv < 0) @@ -1487,7 +1477,7 @@ void loop(void) if (got_error) { fflush(stdout); fflush(stderr); - daemon_dump_write(); + dump_write(); exit(EXIT_FAILURE); } @@ -1653,14 +1643,14 @@ int main(int argc, char **argv) return 1; } -void daemon_dump_save(void) +void dump_save(void) { int len, i; - len = strlen(daemon_debug_buf); + len = strlen(debug_buf); for (i = 0; i < len; i++) { - dump_buf[dump_point++] = daemon_debug_buf[i]; + dump_buf[dump_point++] = debug_buf[i]; if (dump_point == DUMP_SIZE) { dump_point = 0; @@ -1669,7 +1659,7 @@ void daemon_dump_save(void) } } -void daemon_dump_write(void) +void dump_write(void) { char begin[64]; char end[64]; |
