summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2009-05-28 13:44:27 -0500
committerDavid Teigland <teigland@redhat.com>2009-05-28 13:44:27 -0500
commit7bf79e8a629d6d17768a5d48db5a2a1f2cdb8a2f (patch)
treea150ef71f31c54ac8adb70a0bae64ab76680f5cb
parentc8665cdb31d4a57764916d5dca863c645c9a72e0 (diff)
downloaddct-stuff-7bf79e8a629d6d17768a5d48db5a2a1f2cdb8a2f.tar.gz
dct-stuff-7bf79e8a629d6d17768a5d48db5a2a1f2cdb8a2f.tar.xz
dct-stuff-7bf79e8a629d6d17768a5d48db5a2a1f2cdb8a2f.zip
cpgx: fix synced_nodes state transfer
The state of synced_nodes need to be snapshotted at the point of a joining node's confchg. Also some code munging. Signed-off-by: David Teigland <teigland@redhat.com>
-rw-r--r--cpgx/cpgx.c130
1 files changed, 60 insertions, 70 deletions
diff --git a/cpgx/cpgx.c b/cpgx/cpgx.c
index 40ec074..5639144 100644
--- a/cpgx/cpgx.c
+++ b/cpgx/cpgx.c
@@ -103,6 +103,7 @@ struct node {
uint32_t last_check_eventid; /* the eventid from the check event in the
most recent time message we received
from this node. not used for anything */
+ uint8_t synced_nodes[8];
};
struct save_event {
@@ -123,7 +124,7 @@ static cpg_handle_t dct_cpg_handle;
static int dct_cpg_client;
static int dct_cpg_fd;
-static int daemon_quit;
+static int prog_quit;
static int cluster_down;
static int opt_leave = 1;
static int opt_fail = 1;
@@ -134,7 +135,7 @@ static int opt_print_debug = 1;
static time_t prog_begin;
static int dump_point;
static int dump_wrap;
-static char daemon_debug_buf[256];
+static char debug_buf[256];
static char dump_buf[DUMP_SIZE];
static uint8_t our_nodeid;
@@ -158,34 +159,32 @@ static int leave_done;
static void process_message(struct dct_header *hd, int len);
void send_sync(uint8_t nodeid);
-void daemon_dump_save(void);
-void daemon_dump_write(void);
+void dump_save(void);
+void dump_write(void);
#define log_error(fmt, args...) \
do { \
- snprintf(daemon_debug_buf, 255, "%ld ERROR: " fmt "\n", \
+ snprintf(debug_buf, 255, "%ld ERROR: " fmt "\n", \
time(NULL), ##args); \
- daemon_dump_save(); \
- fprintf(stderr, "%s", daemon_debug_buf); \
+ dump_save(); \
+ fprintf(stderr, "%s", debug_buf); \
got_error = 1; \
} while (0)
#define log_history(fmt, args...) \
do { \
- snprintf(daemon_debug_buf, 255, "%ld H: " fmt "\n", \
- time(NULL), ##args); \
- daemon_dump_save(); \
+ snprintf(debug_buf, 255, "%ld H: " fmt "\n", time(NULL), ##args); \
+ dump_save(); \
if (opt_print_event) \
- fprintf(stdout, "%s", daemon_debug_buf); \
+ fprintf(stdout, "%s", debug_buf); \
} while (0)
#define log_debug(fmt, args...) \
do { \
- snprintf(daemon_debug_buf, 255, "%ld D: " fmt "\n", \
- time(NULL), ##args); \
- daemon_dump_save(); \
+ snprintf(debug_buf, 255, "%ld D: " fmt "\n", time(NULL), ##args); \
+ dump_save(); \
if (opt_print_debug) \
- fprintf(stdout, "%s", daemon_debug_buf); \
+ fprintf(stdout, "%s", debug_buf); \
} while (0)
static void _log_config(struct dct_config *c, uint32_t id, int error)
@@ -200,19 +199,16 @@ static void _log_config(struct dct_config *c, uint32_t id, int error)
memset(l_buf, 0, sizeof(l_buf));
off = 0;
- for (i = 0; i < c->memb_count; i++) {
+ for (i = 0; i < c->memb_count; i++)
off += sprintf(m_buf+off, " %u", c->memb[i]);
- }
off = 0;
- for (i = 0; i < c->join_count; i++) {
+ for (i = 0; i < c->join_count; i++)
off += sprintf(j_buf+off, " %u", c->join[i]);
- }
off = 0;
- for (i = 0; i < c->left_count; i++) {
+ for (i = 0; i < c->left_count; i++)
off += sprintf(l_buf+off, " %u", c->left[i]);
- }
if (error)
log_error("%08u conf %u %u %u memb%s join%s left%s",
@@ -342,7 +338,7 @@ void cluster_dead(int ci)
{
if (!cluster_down)
log_error("cluster is down, exiting");
- daemon_quit = 1;
+ prog_quit = 1;
cluster_down = 1;
}
@@ -501,8 +497,25 @@ void free_nodes_list(void)
void update_nodes_list(struct dct_config *c, uint32_t id)
{
struct node *node;
+ uint8_t synced_nodes[8];
+ uint8_t low = 0;
int i, is_memb, is_join, is_left;
+ i = 0;
+ memset(&synced_nodes, 0, 8);
+
+ list_for_each_entry(node, &nodes, list) {
+ if (!node->is_member)
+ continue;
+ if (node->needs_sync)
+ continue;
+
+ synced_nodes[i++] = node->nodeid;
+
+ if (!low || node->nodeid < low)
+ low = node->nodeid;
+ }
+
list_for_each_entry(node, &nodes, list) {
is_memb = in_memb(c, node->nodeid);
@@ -511,7 +524,7 @@ void update_nodes_list(struct dct_config *c, uint32_t id)
if (is_memb && node->is_member) {
if (is_join || is_left)
- log_error("member list off %d %d %d %d %u",
+ log_error("member list off a %d %d %d %d %u",
is_memb, is_join, is_left,
node->is_member, node->nodeid);
continue;
@@ -519,7 +532,7 @@ void update_nodes_list(struct dct_config *c, uint32_t id)
if (!is_memb && !node->is_member) {
if (is_join || is_left)
- log_error("member list off %d %d %d %d %u",
+ log_error("member list off b %d %d %d %d %u",
is_memb, is_join, is_left,
node->is_member, node->nodeid);
continue;
@@ -529,7 +542,7 @@ void update_nodes_list(struct dct_config *c, uint32_t id)
if (!is_memb & node->is_member) {
if (is_join || !is_left)
- log_error("member list off %d %d %d %d %u",
+ log_error("member list off c %d %d %d %d %u",
is_memb, is_join, is_left,
node->is_member, node->nodeid);
@@ -543,13 +556,15 @@ void update_nodes_list(struct dct_config *c, uint32_t id)
if (is_memb && !node->is_member) {
if (!is_join || is_left)
- log_error("member list off %d %d %d %d %u",
+ log_error("member list off d %d %d %d %d %u",
is_memb, is_join, is_left,
node->is_member, node->nodeid);
node->is_member = 1;
node->needs_sync = 1;
+ node->sync_from = low;
node->join_eventid = id;
+ memcpy(&node->synced_nodes, &synced_nodes, 8);
continue;
}
@@ -568,7 +583,9 @@ void update_nodes_list(struct dct_config *c, uint32_t id)
node = add_node(c->memb[i]);
node->is_member = 1;
node->needs_sync = 1;
+ node->sync_from = low;
node->join_eventid = id;
+ memcpy(&node->synced_nodes, &synced_nodes, 8);
is_join = in_join(c, node->nodeid);
if (!is_join)
@@ -579,33 +596,6 @@ void update_nodes_list(struct dct_config *c, uint32_t id)
print_nodes_list();
}
-/* a scheme where new nodes are synced from the low synced nodeid needs to
- consider that the low nodeid may fail before syncing the new nodes */
-
-void update_nodes_sync_from(void)
-{
- struct node *node;
- uint8_t low = 0;
-
- list_for_each_entry(node, &nodes, list) {
- if (!node->is_member)
- continue;
- if (node->needs_sync)
- continue;
- if (!low || node->nodeid < low)
- low = node->nodeid;
- }
-
- list_for_each_entry(node, &nodes, list) {
- if (!node->is_member)
- continue;
- if (!node->needs_sync)
- continue;
-
- node->sync_from = low;
- }
-}
-
void send_syncs(void)
{
struct node *node;
@@ -765,13 +755,7 @@ void send_sync(uint8_t nodeid)
/* which nodes are synced is part of the replicated state that needs to
be synced to new nodes */
- i = 0;
- list_for_each_entry(node, &nodes, list) {
- if (!node->is_member)
- continue;
- if (!node->needs_sync)
- hd->synced_nodes[i++] = node->nodeid;
- }
+ memcpy(hd->synced_nodes, node->synced_nodes, 8);
for (i = start_eventid; i < end_eventid + 1; i++) {
memcpy(ev, &history[i], sizeof(struct event));
@@ -1098,9 +1082,12 @@ static void confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
messages for each confchg). */
if (sync_wait && c.memb_count == 1) {
- log_error("confchg left alone without sync");
- log_config(&c, 1);
- return;
+ /* just exit and restart without an error */
+ log_debug("confchg left alone without sync");
+ log_config(&c, 0);
+ fflush(stdout);
+ fflush(stderr);
+ exit(2);
}
if (sync_wait) {
@@ -1110,7 +1097,6 @@ static void confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
add_history_confchg(&c);
update_nodes_list(last_config, last_config_eventid);
- update_nodes_sync_from();
}
static void receive_sync(struct dct_header *hd, int len)
@@ -1387,6 +1373,10 @@ int we_should_send(void)
return 0;
}
+/* TODO: when we join, pick a random number of seconds between 1 and 16;
+ when we've been running for this number of seconds, either leave or fail;
+ leave if the number is even, fail if it's odd. */
+
int we_should_leave(void)
{
static unsigned int tries;
@@ -1456,7 +1446,7 @@ void loop(void)
for (;;) {
rv = poll(pollfd, client_maxi + 1, poll_timeout);
if (rv == -1 && errno == EINTR) {
- if (!daemon_quit)
+ if (!prog_quit)
continue;
}
if (rv < 0)
@@ -1487,7 +1477,7 @@ void loop(void)
if (got_error) {
fflush(stdout);
fflush(stderr);
- daemon_dump_write();
+ dump_write();
exit(EXIT_FAILURE);
}
@@ -1653,14 +1643,14 @@ int main(int argc, char **argv)
return 1;
}
-void daemon_dump_save(void)
+void dump_save(void)
{
int len, i;
- len = strlen(daemon_debug_buf);
+ len = strlen(debug_buf);
for (i = 0; i < len; i++) {
- dump_buf[dump_point++] = daemon_debug_buf[i];
+ dump_buf[dump_point++] = debug_buf[i];
if (dump_point == DUMP_SIZE) {
dump_point = 0;
@@ -1669,7 +1659,7 @@ void daemon_dump_save(void)
}
}
-void daemon_dump_write(void)
+void dump_write(void)
{
char begin[64];
char end[64];