summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2009-05-26 16:13:46 -0500
committerDavid Teigland <teigland@redhat.com>2009-05-26 16:13:46 -0500
commit626ac2fc3f29df3344f4da20e3a9016b7cdb3c8a (patch)
treeff3bc700dce069ebbaa00284856944499ef7f1c3
parentf247b0074950844f7cb1d94e5a8355b2238a95d7 (diff)
downloaddct-stuff-626ac2fc3f29df3344f4da20e3a9016b7cdb3c8a.tar.gz
dct-stuff-626ac2fc3f29df3344f4da20e3a9016b7cdb3c8a.tar.xz
dct-stuff-626ac2fc3f29df3344f4da20e3a9016b7cdb3c8a.zip
cpgx: more work
. move code around . save debug log, write it out on error . configurable sync_max . don't save sync history (just header) to others while waiting for our sync . error if no dispatches for 10 sec . rework confchg leave/join/sync wait/done state checks . check for case where all others leave without sending us sync . check all events after time message . don't send syncs if we're leaving or left Signed-off-by: David Teigland <teigland@redhat.com>
-rw-r--r--cpgx/cpgx.c398
1 files changed, 233 insertions, 165 deletions
diff --git a/cpgx/cpgx.c b/cpgx/cpgx.c
index aeeca1c..179638d 100644
--- a/cpgx/cpgx.c
+++ b/cpgx/cpgx.c
@@ -16,48 +16,11 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
-/* Limitations: max 8 nodes, nodeids between 1 and 255 */
-
-/*
- Initial outline:
-
- recv time message
- - assert from a member
- if sync_done
- - history[event_num++] = contents
- - assert leave_done == 0
- - assert join_done == 1
- else
- - save message
-
- recv sync message
- if !sync_done and to us
- - copy contents into history, set event_num
- - recv saved time messages
- - sync_done = 1
- if !sync_done and not to us
- - ignore
- if sync_done
- - set needs_sync = 0 for node sent to
-
- confchg
- - history[event_num++] = members/joined/left
- - if we joined: join_done = 1, sync_done = 0
- - if we left: leave_done = 1
- - for each joined node, set needs_sync = 1
- - if any nodes need sync, and we are synced and we are low_nodeid,
- send sync message to each specific node with history from the event_num
- when they were removed up to the event_num where they were added
-
- send time message
- - send buf with our time and last event_num
-
- New history begins at zero every time a new group is formed (all members go
+/* New history begins at zero every time a new group is formed (all members go
away). Otherwise, when formning a new group we'd have to wait for all
members to join and gather the latest history from all so we know where to
begin. This also means that a node clears its history before every join,
- since history may have been restarted while it was away.
-*/
+ since history may have been restarted while it was away. */
#include <stdio.h>
#include <stdlib.h>
@@ -76,50 +39,26 @@
#include <corosync/cpg.h>
#include "list.h"
+#define CLIENT_NALLOC 2
+#define MAX_NODES 8 /* not easily changed */
+#define DUMP_SIZE (1024 * 1024)
+#define HISTORY_EVENTS (1024 * 1024)
+#define DEFAULT_SYNC_MAX 1000 /* sync up to this many events */
+#define DUMP_WRITE_PATH "/var/log/cluster/cpgx_debug.txt"
+
+#define EV_CONFCHG 1
+#define EV_MSGTIME 2
+#define EV_MSGSYNC 3
+
struct client {
int fd;
void *workfn;
void *deadfn;
};
-#define CLIENT_NALLOC 2
-
-static int client_maxi;
-static int client_size = 0;
-static struct client *client = NULL;
-static struct pollfd *pollfd = NULL;
-
-struct cpg_name dct_cpg_name;
-cpg_handle_t dct_cpg_handle;
-int dct_cpg_client;
-int dct_cpg_fd;
-
-int daemon_quit;
-int cluster_down;
-int opt_leave = 1;
-int opt_fail = 1;
-int got_error = 0;
-int continue_after_error = 0;
-int opt_print_event = 1;
-int opt_print_debug = 1;
-
-int join_wait;
-int join_done;
-int sync_wait;
-int sync_done;
-int leave_wait;
-int leave_done;
-
-uint8_t our_nodeid;
-uint32_t eventid;
-uint32_t last_config_eventid;
-uint32_t dispatch_count;
-
-#define MAX_NODES 8
-
-#define EV_CONFCHG 1
-#define EV_MSGTIME 2
-#define EV_MSGSYNC 3
+/* time msg is followed by N event structs from history for run-time checks
+ sync msg is followed by M event structs from history to sync new node,
+ the final one is the confchg event that to_nodeid joined in */
struct dct_header {
uint8_t type; /* EV_MSGTIME, EV_MSGSYNC */
@@ -133,10 +72,6 @@ struct dct_header {
uint32_t event_count; /* this many events after header */
}; /* 28 bytes */
-/* time msg is followed by N event structs from history for run-time checks */
-/* sync msg is followed by M event structs from history to sync new node,
- the final one is the confchg event that to_nodeid joined in */
-
struct dct_config {
uint8_t type; /* EV_CONFCHG */
uint8_t memb_count;
@@ -156,10 +91,6 @@ struct event {
};
}; /* 32 bytes */
-int history_len;
-struct event *history;
-struct dct_config *last_config;
-
struct node {
struct list_head list;
uint8_t nodeid;
@@ -174,8 +105,6 @@ struct node {
from this node. not used for anything */
};
-struct list_head nodes;
-
struct save_event {
struct list_head list;
int type;
@@ -184,27 +113,79 @@ struct save_event {
char buf[0];
};
-struct list_head saved_events;
+static int client_maxi;
+static int client_size = 0;
+static struct client *client = NULL;
+static struct pollfd *pollfd = NULL;
+
+static struct cpg_name dct_cpg_name;
+static cpg_handle_t dct_cpg_handle;
+static int dct_cpg_client;
+static int dct_cpg_fd;
+
+static int daemon_quit;
+static int cluster_down;
+static int opt_leave = 1;
+static int opt_fail = 1;
+static int got_error = 0;
+static int continue_after_error = 0;
+static int opt_print_event = 1;
+static int opt_print_debug = 1;
+static time_t prog_begin;
+static int dump_point;
+static int dump_wrap;
+static char daemon_debug_buf[256];
+static char dump_buf[DUMP_SIZE];
+
+static uint8_t our_nodeid;
+static uint32_t eventid;
+static uint32_t last_config_eventid;
+static uint32_t sync_max = DEFAULT_SYNC_MAX;
+static uint32_t dispatch_count;
+static struct timeval last_dispatch;
+static int history_len;
+static struct event *history;
+static struct dct_config *last_config;
+static struct list_head nodes;
+static struct list_head saved_events;
+
+static int join_wait;
+static int join_done;
+static int sync_wait;
+static int sync_done;
+static int leave_wait;
+static int leave_done;
static void process_message(struct dct_header *hd, int len);
void send_sync(uint8_t nodeid);
+void daemon_dump_save(void);
+void daemon_dump_write(void);
#define log_error(fmt, args...) \
do { \
- fprintf(stderr, "ERROR: " fmt "\n", ##args); \
+ snprintf(daemon_debug_buf, 255, "%ld ERROR: " fmt "\n", \
+ time(NULL), ##args); \
+ daemon_dump_save(); \
+ fprintf(stderr, "%s", daemon_debug_buf); \
got_error = 1; \
} while (0)
#define log_history(fmt, args...) \
do { \
+ snprintf(daemon_debug_buf, 255, "%ld H: " fmt "\n", \
+ time(NULL), ##args); \
+ daemon_dump_save(); \
if (opt_print_event) \
- fprintf(stdout, "H: " fmt "\n", ##args); \
+ fprintf(stdout, "%s", daemon_debug_buf); \
} while (0)
#define log_debug(fmt, args...) \
do { \
+ snprintf(daemon_debug_buf, 255, "%ld D: " fmt "\n", \
+ time(NULL), ##args); \
+ daemon_dump_save(); \
if (opt_print_debug) \
- fprintf(stdout, "D: " fmt "\n", ##args); \
+ fprintf(stdout, "%s", daemon_debug_buf); \
} while (0)
static void _log_config(struct dct_config *c, uint32_t id, int error)
@@ -357,11 +338,6 @@ static int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci))
goto again;
}
-static void sigterm_handler(int sig)
-{
- daemon_quit = 1;
-}
-
void cluster_dead(int ci)
{
if (!cluster_down)
@@ -732,8 +708,6 @@ void send_time(void)
/* TODO: ability to send full history, cpg max message sizes limit us */
-#define SYNC_MAX 1000
-
void send_sync(uint8_t nodeid)
{
char *buf;
@@ -757,8 +731,8 @@ void send_sync(uint8_t nodeid)
event_count = node->join_eventid + 1;
- if (event_count > SYNC_MAX) {
- count = SYNC_MAX;
+ if (event_count > sync_max) {
+ count = sync_max;
end_eventid = node->join_eventid;
start_eventid = end_eventid - count + 1;
} else {
@@ -834,6 +808,13 @@ void read_events(struct dct_header *hd, int len, int check_only)
int count = hd->event_count;
int i;
+ /* we don't save the entire sync message sent to another node on
+ save_events while waiting for our own sync message; we just save
+ the header */
+
+ if (len == sizeof(struct dct_header))
+ return;
+
if (len != sizeof(struct dct_header) + count * sizeof(struct event)) {
log_error("read_events bad len %d count %d", len, count);
return;
@@ -1006,10 +987,11 @@ static void confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
{
struct dct_config c;
struct node *node;
- int we_left, we_join;
+ int we_join = 0;
int i;
dispatch_count++;
+ gettimeofday(&last_dispatch, NULL);
memset(&c, 0, sizeof(struct dct_config));
@@ -1027,51 +1009,77 @@ static void confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
c.join[i] = (uint8_t)join_list[i].nodeid;
if (leave_done) {
- /* the we_left confchg should be the very last event we see */
- log_error("confchg_cb after leave_done");
+ /* our left confchg should be the very last event we see */
+ log_error("confchg after leave_done");
log_config(&c, 1);
return;
}
- we_left = leave_wait && in_left(&c, our_nodeid);
- we_join = join_wait && in_memb(&c, our_nodeid);
+ if (!leave_wait && in_left(&c, our_nodeid)) {
+ log_error("confchg in_left not leave_wait");
+ log_config(&c, 1);
+ return;
+ }
- if (we_left) {
+ if (leave_wait && in_left(&c, our_nodeid)) {
leave_wait = 0;
leave_done = 1;
cpg_finalize(dct_cpg_handle);
client_dead(dct_cpg_client);
+ add_history_confchg(&c);
+ update_nodes_list(last_config, last_config_eventid);
+ return;
}
- if (!we_left && !in_memb(&c, our_nodeid)) {
- log_error("confchg_cb without our_nodeid %u", our_nodeid);
+ if (!in_memb(&c, our_nodeid)) {
+ log_error("confchg without our_nodeid %u", our_nodeid);
log_config(&c, 1);
return;
}
- if (we_join) {
+ if (join_wait && in_memb(&c, our_nodeid)) {
join_wait = 0;
join_done = 1;
sync_wait = 1;
sync_done = 0;
+ we_join = 1;
}
- /* shortcut to bootstrap things, doesn't work if more than one node
- join in the first confchg. If it happens, we'll see the initial
- members all sitting in sync_wait for an existing member (there is
- none) to send a sync message. To do this properly we'd probably
- need to have everyone exchange state messages for each confchg so we
- could detect when multiple new nodes join in the first confchg of a
- newly formed cpg. Or, rely on the join list to be the same as the
- member list? */
+ /* Shortcut to bootstrap things. Doesn't work if more than one node
+ join in the first confchg. If it happens, all the nodes joining
+ together will be sitting waiting for a sync message.
+ To do this properly we'd need to have everyone exchange state
+ messages for each confchg so we can detect when multiple new nodes
+ join in the first confchg of a newly formed cpg. Or, rely on the
+ join list to be the same as the member list? */
if (we_join && c.memb_count == 1) {
sync_wait = 0;
sync_done = 1;
+ add_history_confchg(&c);
+ if (!list_empty(&nodes)) {
+ log_error("bootstrap nodes not empty");
+ print_nodes_list();
+ }
node = add_node(our_nodeid);
node->is_member = 1;
node->needs_sync = 0;
- add_history_confchg(&c);
+ return;
+ }
+
+ /* Case we can't handle with bootstrap shortcut. Confchg for node
+ join while all synced members have called leave, so won't send a
+ sync message to the joined node. The node waiting for a sync then
+ sees all existing nodes leave, having received no sync message. We
+ could process saved_events, declaring first one (our join) to be
+ eventid 0. That still doesn't work if another node joins before all
+ the confchg's for the leaving node (which might be solvable, but
+ gets very complicated; better at that point to exchange state
+ messages for each confchg). */
+
+ if (sync_wait && c.memb_count == 1) {
+ log_error("confchg left alone without sync");
+ log_config(&c, 1);
return;
}
@@ -1089,8 +1097,10 @@ static void receive_sync(struct dct_header *hd, int len)
{
if (sync_wait) {
if (hd->to_nodeid != our_nodeid) {
- /* save events to add to history after we're synced */
- save_message(hd, len, EV_MSGSYNC);
+ /* save events to add to history after we're synced;
+ don't save/check the event history for other nodes
+ while waiting for our own sync */
+ save_message(hd, sizeof(struct dct_header), EV_MSGSYNC);
} else {
sync_wait = 0;
sync_done = 1;
@@ -1105,7 +1115,7 @@ static void receive_sync(struct dct_header *hd, int len)
if (hd->to_nodeid == our_nodeid)
log_debug("receive_sync from %d redundant", hd->nodeid);
- /* check we agree with past events being synced to new node */
+ /* check the sync message to verify we agree with it */
read_events(hd, len, 1);
add_history_message(hd, len);
@@ -1117,6 +1127,8 @@ static void receive_time(struct dct_header *hd, int len)
{
struct event *ev_buf;
struct node *node;
+ uint32_t end_eventid = 0;
+ int i;
if (sync_wait) {
/* save events to add to history after we're synced */
@@ -1133,9 +1145,6 @@ static void receive_time(struct dct_header *hd, int len)
return;
}
- /* check our scheme of syncing nodes, where nodes don't send time
- messages until they're synced */
-
node = get_node(hd->nodeid);
if (!node) {
log_error("receive_time no node %u", hd->nodeid);
@@ -1151,17 +1160,21 @@ static void receive_time(struct dct_header *hd, int len)
add_history_message(hd, len);
- /* an event from past history (which everyone should know about) is
+ /* events from past history (which everyone should know about) are
included in time messages as a way to check that nodes are staying
in sync as we go */
ev_buf = (struct event *)((char *)hd + sizeof(struct dct_header));
- check_event(ev_buf);
-
- node->last_check_eventid = ev_buf->eventid;
+ for (i = 0; i < hd->event_count; i++) {
+ check_event(ev_buf);
+ end_eventid = ev_buf->eventid;
+ ev_buf++;
+ }
+ node->last_check_eventid = end_eventid;
- /* this check currently fails with corosync */
+ /* this check currently fails with corosync,
+ TODO: add option to enable this check */
#if 0
/* Track whether messages sent in configuration C1 are delivered in C1
instead of a subsequent C2. Recent reading led me to believe that
@@ -1194,9 +1207,10 @@ static void deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
struct dct_header *hd = data;
dispatch_count++;
+ gettimeofday(&last_dispatch, NULL);
if (len < sizeof(struct dct_header)) {
- log_error("deliver_cb short message %d", len);
+ log_error("deliver short message %d", len);
log_header(hd, 1);
return;
}
@@ -1209,14 +1223,14 @@ static void deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
if (join_wait) {
/* the we_joined confchg should be the first event we see */
- log_error("deliver_cb before joined");
+ log_error("deliver before joined");
log_header(hd, 1);
return;
}
if (leave_done) {
/* the we_left confchg should be the last event we see */
- log_error("deliver_cb after left");
+ log_error("deliver after left");
log_header(hd, 1);
return;
}
@@ -1242,7 +1256,8 @@ static void process_cpg(int ci)
/* can't send from dispatch, so just flag the nodes that need
syncing during the dispatch and send the sync messages now */
- send_syncs();
+ if (join_done && sync_done && !leave_wait && !leave_done)
+ send_syncs();
}
int do_join(void)
@@ -1394,10 +1409,18 @@ void loop(void)
void (*workfn) (int ci);
void (*deadfn) (int ci);
int poll_timeout = 5; /* ms */
+ unsigned long ms;
+ struct timeval now;
int rv, i;
srandom(time(NULL));
+ memset(history, 0, history_len);
+ free_nodes_list();
+
+ dispatch_count = 0;
+ gettimeofday(&last_dispatch, NULL);
+
sync_wait = 0;
sync_done = 0;
leave_wait = 0;
@@ -1408,22 +1431,16 @@ void loop(void)
eventid = 0;
- memset(history, 0, history_len);
- free_nodes_list();
-
do_join();
for (;;) {
rv = poll(pollfd, client_maxi + 1, poll_timeout);
if (rv == -1 && errno == EINTR) {
- if (daemon_quit)
- goto out;
- continue;
+ if (!daemon_quit)
+ continue;
}
- if (rv < 0) {
+ if (rv < 0)
log_error("poll errno %d", errno);
- goto out;
- }
/*
* read events from callbacks
@@ -1442,9 +1459,15 @@ void loop(void)
}
}
+ gettimeofday(&now, NULL);
+ ms = time_diff_ms(&last_dispatch, &now);
+ if (ms > 10000)
+ log_error("no cpg dispatch in %ul ms", ms);
+
if (got_error) {
fflush(stdout);
fflush(stderr);
+ daemon_dump_write();
exit(EXIT_FAILURE);
}
@@ -1452,16 +1475,6 @@ void loop(void)
* do things that create events (send messages, leave, fail)
*/
- if (!join_done && dispatch_count) {
- /* a confchg for our join should be the first event we
- see, so this shouldn't happen. without the
- dispatch_count condition this error triggers
- sometimes, indicating zero dispatch_count */
- log_error("event before join done dispatches %u",
- dispatch_count);
- continue;
- }
-
if (!sync_done) {
/* don't do things until we're synced */
continue;
@@ -1500,24 +1513,27 @@ void loop(void)
void print_usage(void)
{
- printf("Output:\n");
- printf("ERROR: <error string> (stderr)\n");
- printf("H: <event string> (stdout)\n");
- printf("D: <debug string> (stdout)\n");
- printf("\n");
printf("Options:\n");
printf(" -H [0|1] event history output [off|on] default 1\n");
printf(" -D [0|1] debug output [off|on] default 1\n");
printf(" -f [0|1] fail included in test [off|on] default 1\n");
printf(" -l [0|1] leave included in test [off|on] default 1\n");
+ printf(" -s <num> sync up to num events, default %d\n",
+ DEFAULT_SYNC_MAX);
printf(" -c continue after error\n");
-
- printf("to prevent history from periodically restarting from 0,\n"
- "keep one node from either leaving or failing with -f0 -l0\n");
+ printf("\n");
+ printf("Output:\n");
+ printf(" <time> ERROR: <error string> (stderr)\n");
+ printf(" <time> H: <event string> (stdout)\n");
+ printf(" <time> D: <debug string> (stdout)\n");
+ printf("\n");
+ printf("Notes:\n");
+ printf(" - to prevent history from periodically restarting from 0,\n"
+ " keep one node from leaving or failing with -f0 -l0\n");
+ printf(" - 8 nodes max, nodeids beteen 1 and 255\n");
+ printf(" - debug dump on error: %s\n", DUMP_WRITE_PATH);
}
-#define HISTORY_EVENTS (1024 * 1024)
-
int main(int argc, char **argv)
{
pid_t pid;
@@ -1526,7 +1542,7 @@ int main(int argc, char **argv)
int optchar;
while (cont) {
- optchar = getopt(argc, argv, "H:D:cf:l:h");
+ optchar = getopt(argc, argv, "H:D:f:l:s:ch");
switch (optchar) {
case 'H':
@@ -1545,6 +1561,10 @@ int main(int argc, char **argv)
opt_leave = atoi(optarg);
break;
+ case 's':
+ sync_max = atoi(optarg);
+ break;
+
case 'c':
continue_after_error = 1;
break;
@@ -1574,12 +1594,17 @@ int main(int argc, char **argv)
while (1) {
pid = fork();
-
if (!pid) {
/*
- * repeat join/work/leave until exit
+ * repeat join/work/leave until exit. loop() will
+ * write dump_buf before exiting on an error
*/
+ memset(dump_buf, 0, sizeof(dump_buf));
+ dump_point = 0;
+ dump_wrap = 0;
+ prog_begin = time(NULL);
+
while (1) {
loop();
sleep(rand_int(0, 3));
@@ -1608,3 +1633,46 @@ int main(int argc, char **argv)
return 1;
}
+void daemon_dump_save(void)
+{
+ int len, i;
+
+ len = strlen(daemon_debug_buf);
+
+ for (i = 0; i < len; i++) {
+ dump_buf[dump_point++] = daemon_debug_buf[i];
+
+ if (dump_point == DUMP_SIZE) {
+ dump_point = 0;
+ dump_wrap = 1;
+ }
+ }
+}
+
+void daemon_dump_write(void)
+{
+ char begin[64];
+ char end[64];
+ time_t now;
+ FILE *fp;
+
+ fp = fopen(DUMP_WRITE_PATH, "a");
+ if (!fp)
+ return;
+
+ now = time(NULL);
+ strftime(begin, sizeof(begin), "%b %d %T", localtime(&prog_begin));
+ strftime(end, sizeof(end), "%b %d %T", localtime(&now));
+
+ fprintf(fp, "cpgx %s - %s\n", begin, end);
+
+ if (dump_wrap)
+ fprintf(fp, "%s", dump_buf + dump_point);
+
+ dump_buf[dump_point] = '\0';
+ fprintf(fp, "%s", dump_buf);
+
+ fflush(fp);
+ fclose(fp);
+}
+