diff options
| author | David Teigland <teigland@redhat.com> | 2009-05-22 16:06:40 -0500 |
|---|---|---|
| committer | David Teigland <teigland@redhat.com> | 2009-05-22 16:06:40 -0500 |
| commit | f32d252bbbe6c63fee2248c0b0667351bdb303bb (patch) | |
| tree | dcce40287df56b1ba2ba76fae3e5bce055cbd3ba | |
cpgx: initial import
Signed-off-by: David Teigland <teigland@redhat.com>
| -rw-r--r-- | cpgx/Makefile | 4 | ||||
| -rw-r--r-- | cpgx/cpgx.c | 1610 | ||||
| -rw-r--r-- | cpgx/list.h | 336 |
3 files changed, 1950 insertions, 0 deletions
diff --git a/cpgx/Makefile b/cpgx/Makefile new file mode 100644 index 0000000..54e3860 --- /dev/null +++ b/cpgx/Makefile @@ -0,0 +1,4 @@ +cpgx: cpgx.c + gcc -L/usr/lib64 cpgx.c -g -o cpgx -lcpg + + diff --git a/cpgx/cpgx.c b/cpgx/cpgx.c new file mode 100644 index 0000000..bc6887f --- /dev/null +++ b/cpgx/cpgx.c @@ -0,0 +1,1610 @@ +/* + * Copyright (c) 2009 David Teigland + * All Rights Reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License V2 + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it would be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/* + Initial outline: + + recv time message + - assert from a member + if sync_done + - history[event_num++] = contents + - assert leave_done == 0 + - assert join_done == 1 + else + - save message + + recv sync message + if !sync_done and to us + - copy contents into history, set event_num + - recv saved time messages + - sync_done = 1 + if !sync_done and not to us + - ignore + if sync_done + - set needs_sync = 0 for node sent to + + confchg + - history[event_num++] = members/joined/left + - if we joined: join_done = 1, sync_done = 0 + - if we left: leave_done = 1 + - for each joined node, set needs_sync = 1 + - if any nodes need sync, and we are synced and we are low_nodeid, + send sync message to each specific node with history from the event_num + when they were removed up to the event_num where they were added + + send time message + - send buf with our time and last event_num + + New history begins at zero every time a new group is formed (all members go + away). Otherwise, when formning a new group we'd have to wait for all + members to join and gather the latest history from all so we know where to + begin. This also means that a node clears its history before every join, + since history may have been restarted while it was away. +*/ + +/* Limitations: max 8 nodes, nodeids between 1 and 255 */ + +#include <stdio.h> +#include <stdlib.h> +#include <stddef.h> +#include <stdint.h> +#include <unistd.h> +#include <string.h> +#include <limits.h> +#include <time.h> +#include <errno.h> +#include <signal.h> +#include <sys/poll.h> +#include <sys/time.h> +#include <sys/wait.h> +#include <sys/types.h> +#include <corosync/cpg.h> +#include "list.h" + +struct client { + int fd; + void *workfn; + void *deadfn; +}; + +#define CLIENT_NALLOC 2 + +static int client_maxi; +static int client_size = 0; +static struct client *client = NULL; +static struct pollfd *pollfd = NULL; + +struct cpg_name dct_cpg_name; +cpg_handle_t dct_cpg_handle; +int dct_cpg_client; +int dct_cpg_fd; + +int daemon_quit; +int cluster_down; +int opt_leave = 1; +int opt_fail = 1; +int got_error = 0; +int continue_after_error = 0; +int opt_print_event = 1; +int opt_print_debug = 1; + +int join_wait; +int join_done; +int sync_wait; +int sync_done; +int leave_wait; +int leave_done; + +uint8_t our_nodeid; +uint32_t eventid; +uint32_t last_config_eventid; +uint32_t dispatch_count; + +#define MAX_NODES 8 + +#define EV_CONFCHG 1 +#define EV_MSGTIME 2 +#define EV_MSGSYNC 3 + +struct dct_header { + uint8_t type; /* EV_MSGTIME, EV_MSGSYNC */ + uint8_t nodeid; /* sender */ + uint8_t to_nodeid; /* for MSGSYNC */ + uint8_t pad; + uint8_t synced_nodes[8];/* for MSGSYNC, nodes in sync */ + uint32_t tv_sec; + uint32_t tv_usec; + uint32_t last_config; /* last config eventid we've seen */ + uint32_t event_count; /* this many events after header */ +}; /* 28 bytes */ + +/* time msg is followed by N event structs from history for run-time checks */ +/* sync msg is followed by M event structs from history to sync new node, + the final one is the confchg event that to_nodeid joined in */ + +struct dct_config { + uint8_t type; /* EV_CONFCHG */ + uint8_t memb_count; + uint8_t join_count; + uint8_t left_count; + uint8_t memb[8]; + uint8_t join[8]; + uint8_t left[8]; +}; /* 28 bytes */ + +struct event { + uint32_t eventid; + union { + struct dct_header header; + struct dct_config config; + char buf[28]; + }; +}; /* 32 bytes */ + +int history_len; +struct event *history; +struct dct_config *last_config; + +struct node { + struct list_head list; + uint8_t nodeid; + uint8_t sync_from; /* node that will send the sync to this node */ + int is_member; + int needs_sync; /* when this node joins we set this to 1, and clear + it when we see sync message for it */ + int sent_sync; /* we sent a sync */ + uint32_t join_eventid; + uint32_t last_check_eventid; /* the eventid from the check event in the + most recent time message we received + from this node. not used for anything */ +}; + +struct list_head nodes; + +struct save_event { + struct list_head list; + int type; + int we_join; + int len; + char buf[0]; +}; + +struct list_head saved_events; + +static void process_message(struct dct_header *hd, int len); +void send_sync(uint8_t nodeid); + +#define log_error(fmt, args...) \ +do { \ + fprintf(stderr, "ERROR: " fmt "\n", ##args); \ + got_error = 1; \ +} while (0) + +#define log_history(fmt, args...) \ +do { \ + if (opt_print_event) \ + fprintf(stdout, "H: " fmt "\n", ##args); \ +} while (0) + +#define log_debug(fmt, args...) \ +do { \ + if (opt_print_debug) \ + fprintf(stdout, "D: " fmt "\n", ##args); \ +} while (0) + +static void _log_config(struct dct_config *c, uint32_t id, int error) +{ + char m_buf[32]; + char j_buf[32]; + char l_buf[32]; + int i, off; + + memset(m_buf, 0, sizeof(m_buf)); + memset(j_buf, 0, sizeof(j_buf)); + memset(l_buf, 0, sizeof(l_buf)); + + off = 0; + for (i = 0; i < c->memb_count; i++) { + off += sprintf(m_buf+off, " %u", c->memb[i]); + } + + off = 0; + for (i = 0; i < c->join_count; i++) { + off += sprintf(j_buf+off, " %u", c->join[i]); + } + + off = 0; + for (i = 0; i < c->left_count; i++) { + off += sprintf(l_buf+off, " %u", c->left[i]); + } + + if (error) + log_error("%08u conf %u %u %u memb%s join%s left%s", + id, c->memb_count, c->join_count, c->left_count, + m_buf, j_buf, l_buf); + else + log_history("%08u conf %u %u %u memb%s join%s left%s", + id, c->memb_count, c->join_count, c->left_count, + m_buf, j_buf, l_buf); + +} +#define log_config(_c, _er) _log_config(_c, 0, _er) + +static void _log_header(struct dct_header *h, uint32_t id, int error) +{ + if (error && h->type == EV_MSGTIME) + log_error("%08u time %u tv %u.%06u config %u", + id, h->nodeid, h->tv_sec, h->tv_usec, + h->last_config); + + else if (error && h->type == EV_MSGSYNC) + log_error("%08u sync %u tv %u.%06u config %u to %u count %u", + id, h->nodeid, h->tv_sec, h->tv_usec, + h->last_config, h->to_nodeid, h->event_count); + + else if (!error && h->type == EV_MSGTIME) + log_history("%08u time %u tv %u.%06u config %u", + id, h->nodeid, h->tv_sec, h->tv_usec, + h->last_config); + + else if (!error && h->type == EV_MSGSYNC) + log_history("%08u sync %u tv %u.%06u config %u to %u count %u", + id, h->nodeid, h->tv_sec, h->tv_usec, + h->last_config, h->to_nodeid, h->event_count); + + else + log_error("%08u unknown message type %d", id, h->type); +} +#define log_header(_h, _er) _log_header(_h, 0, _er) + +static void log_event(struct event *ev, int error) +{ + struct dct_config *c = &ev->config; + + if (c->type == EV_CONFCHG) + _log_config(&ev->config, ev->eventid, error); + else + _log_header(&ev->header, ev->eventid, error); +} + +static int rand_int(int a, int b) +{ + return a + (int) (((float)(b - a + 1)) * random() / (RAND_MAX+1.0)); +} + +static unsigned long time_diff_ms(struct timeval *begin, struct timeval *end) +{ + struct timeval result; + timersub(end, begin, &result); + return (result.tv_sec * 1000) + (result.tv_usec / 1000); +} + +static void client_alloc(void) +{ + int i; + + if (!client) { + client = malloc(CLIENT_NALLOC * sizeof(struct client)); + pollfd = malloc(CLIENT_NALLOC * sizeof(struct pollfd)); + } else { + client = realloc(client, (client_size + CLIENT_NALLOC) * + sizeof(struct client)); + pollfd = realloc(pollfd, (client_size + CLIENT_NALLOC) * + sizeof(struct pollfd)); + if (!pollfd) + log_error("can't alloc for pollfd"); + } + if (!client || !pollfd) + log_error("can't alloc for client array"); + + for (i = client_size; i < client_size + CLIENT_NALLOC; i++) { + client[i].workfn = NULL; + client[i].deadfn = NULL; + client[i].fd = -1; + pollfd[i].fd = -1; + pollfd[i].revents = 0; + } + client_size += CLIENT_NALLOC; +} + +static void client_dead(int ci) +{ + close(client[ci].fd); + client[ci].workfn = NULL; + client[ci].fd = -1; + pollfd[ci].fd = -1; +} + +static int client_add(int fd, void (*workfn)(int ci), void (*deadfn)(int ci)) +{ + int i; + + if (!client) + client_alloc(); + again: + for (i = 0; i < client_size; i++) { + if (client[i].fd == -1) { + client[i].workfn = workfn; + if (deadfn) + client[i].deadfn = deadfn; + else + client[i].deadfn = client_dead; + client[i].fd = fd; + pollfd[i].fd = fd; + pollfd[i].events = POLLIN; + if (i > client_maxi) + client_maxi = i; + return i; + } + } + + client_alloc(); + goto again; +} + +static void sigterm_handler(int sig) +{ + daemon_quit = 1; +} + +void cluster_dead(int ci) +{ + if (!cluster_down) + log_error("cluster is down, exiting"); + daemon_quit = 1; + cluster_down = 1; +} + +struct node *get_node(uint8_t nodeid) +{ + struct node *node; + + list_for_each_entry(node, &nodes, list) { + if (node->nodeid == nodeid) + return node; + } + return NULL; +} + +struct node *add_node(uint8_t nodeid) +{ + struct node *node; + + node = malloc(sizeof(struct node)); + if (!node) + return NULL; + memset(node, 0, sizeof(struct node)); + node->nodeid = nodeid; + + list_add_tail(&node->list, &nodes); + return node; +} + +void set_node_synced(uint8_t nodeid) +{ + struct node *node; + + node = get_node(nodeid); + node->needs_sync = 0; + log_debug("nodeid %d needs_sync 0", nodeid); +} + +int in_memb(struct dct_config *c, uint8_t nodeid) +{ + int i; + + for (i = 0; i < c->memb_count; i++) { + if (c->memb[i] == nodeid) + return 1; + } + return 0; +} + +int in_join(struct dct_config *c, uint8_t nodeid) +{ + int i; + + for (i = 0; i < c->join_count; i++) { + if (c->join[i] == nodeid) + return 1; + } + return 0; +} + +int in_left(struct dct_config *c, uint8_t nodeid) +{ + int i; + + for (i = 0; i < c->left_count; i++) { + if (c->left[i] == nodeid) + return 1; + } + return 0; +} + + +void print_nodes_list(void) +{ + struct node *node; + + list_for_each_entry(node, &nodes, list) { + log_debug("nodeid %u is_member %d needs_sync %d join %u check %u", + node->nodeid, node->is_member, node->needs_sync, + node->join_eventid, node->last_check_eventid); + } +} + +void init_join_eventid(struct node *node) +{ + struct event *ev; + struct dct_config *c; + int i = eventid - 1; + + while (1) { + ev = &history[i]; + c = &ev->config; + + if ((c->type == EV_CONFCHG) && in_join(c, node->nodeid)) { + node->join_eventid = ev->eventid; + return; + } + + if (!i) + break; + i--; + } +} + +/* hd is the header of our sync message, c is the config for our join event */ + +void init_nodes_list(struct dct_header *hd, struct dct_config *c) +{ + struct node *node; + uint8_t nodeid; + int i; + + if (!list_empty(&nodes)) { + log_error("init_nodes_list nodes not empty"); + return; + } + + for (i = 0; i < c->memb_count; i++) { + node = add_node(c->memb[i]); + node->is_member = 1; + node->needs_sync = 1; + } + + for (i = 0; i < 8; i++) { + nodeid = hd->synced_nodes[i]; + if (!nodeid) + break; + + node = get_node(nodeid); + if (!node) + log_error("init_nodes_list synced_nodes %u", nodeid); + else + node->needs_sync = 0; + } + + /* I don't think there's any case where we'll need to know the + join_eventid of an earlier joining node to send it a sync */ +#if 0 + list_for_each_entry(node, &nodes, list) + init_join_eventid(node); +#endif + + log_debug("init_nodes_list"); + print_nodes_list(); +} + +void free_nodes_list(void) +{ + struct node *node, *safe; + + list_for_each_entry_safe(node, safe, &nodes, list) { + list_del(&node->list); + free(node); + } +} + +void update_nodes_list(struct dct_config *c, uint32_t id) +{ + struct node *node; + int i, is_memb, is_join, is_left; + + list_for_each_entry(node, &nodes, list) { + + is_memb = in_memb(c, node->nodeid); + is_join = in_join(c, node->nodeid); + is_left = in_left(c, node->nodeid); + + if (is_memb && node->is_member) { + if (is_join || is_left) + log_error("member list off %d %d %d %d %u", + is_memb, is_join, is_left, + node->is_member, node->nodeid); + continue; + } + + if (!is_memb && !node->is_member) { + if (is_join || is_left) + log_error("member list off %d %d %d %d %u", + is_memb, is_join, is_left, + node->is_member, node->nodeid); + continue; + } + + /* node has failed/left */ + + if (!is_memb & node->is_member) { + if (is_join || !is_left) + log_error("member list off %d %d %d %d %u", + is_memb, is_join, is_left, + node->is_member, node->nodeid); + + node->is_member = 0; + node->sync_from = 0; + node->sent_sync = 0; + continue; + } + + /* old node has joined again */ + + if (is_memb && !node->is_member) { + if (!is_join || is_left) + log_error("member list off %d %d %d %d %u", + is_memb, is_join, is_left, + node->is_member, node->nodeid); + + node->is_member = 1; + node->needs_sync = 1; + node->join_eventid = id; + continue; + } + + log_error("update_nodes_list shouldn't get here %d %d %d %d %u", + is_memb, is_join, is_left, + node->is_member, node->nodeid); + } + + /* add new node we've not seen before */ + + for (i = 0; i < c->memb_count; i++) { + node = get_node(c->memb[i]); + if (node) + continue; + + node = add_node(c->memb[i]); + node->is_member = 1; + node->needs_sync = 1; + node->join_eventid = id; + + is_join = in_join(c, node->nodeid); + if (!is_join) + log_error("join list off %d", node->nodeid); + } + + log_debug("update_nodes_list"); + print_nodes_list(); +} + +/* a scheme where new nodes are synced from the low synced nodeid needs to + consider that the low nodeid may fail before syncing the new nodes */ + +void update_nodes_sync_from(void) +{ + struct node *node; + uint8_t low = 0; + + list_for_each_entry(node, &nodes, list) { + if (!node->is_member) + continue; + if (node->needs_sync) + continue; + if (!low || node->nodeid < low) + low = node->nodeid; + } + + list_for_each_entry(node, &nodes, list) { + if (!node->is_member) + continue; + if (!node->needs_sync) + continue; + + node->sync_from = low; + } +} + +void send_syncs(void) +{ + struct node *node; + + list_for_each_entry(node, &nodes, list) { + if (!node->is_member) + continue; + if (!node->needs_sync) + continue; + if (node->sync_from != our_nodeid) + continue; + if (node->sent_sync) + continue; + + send_sync(node->nodeid); + node->sent_sync = 1; + } +} + +static int _send_message(cpg_handle_t h, char *buf, int len, int type) +{ + struct iovec iov; + cpg_error_t error; + int retries = 0; + + iov.iov_base = buf; + iov.iov_len = len; + + retry: + error = cpg_mcast_joined(h, CPG_TYPE_AGREED, &iov, 1); + if (error == CPG_ERR_TRY_AGAIN) { + retries++; + usleep(1000); + if (!(retries % 100)) + log_debug("cpg_mcast_joined retry %d %d", + retries, type); + goto retry; + } + if (error != CPG_OK) { + log_error("cpg_mcast_joined error %d handle %llx %d", + error, (unsigned long long)h, type); + return -1; + } + + return 0; +} + +#define CHECK_COUNT 8 + +void send_time(void) +{ + char *buf; + struct timeval tv; + struct dct_header *hd; + struct event *ev; + uint32_t start_eventid, end_eventid; /* inclusive */ + int i, count, event_count, len; + + event_count = eventid; + + if (event_count < CHECK_COUNT) { + count = event_count; + end_eventid = eventid - 1; + start_eventid = 0; + } else { + count = CHECK_COUNT; + end_eventid = eventid - 1; + start_eventid = end_eventid - count + 1; + } + len = sizeof(struct dct_header) + count * sizeof(struct event); + + buf = malloc(len); + if (!buf) { + log_error("send_time no mem %d", len); + return; + } + memset(buf, 0, len); + + hd = (struct dct_header *)buf; + ev = (struct event *)(buf + sizeof(struct dct_header)); + + gettimeofday(&tv, NULL); + + hd->type = EV_MSGTIME; + hd->nodeid = our_nodeid; + hd->tv_sec = tv.tv_sec; + hd->tv_usec = tv.tv_usec; + hd->last_config = last_config_eventid; + hd->event_count = count; + + for (i = start_eventid; i < end_eventid + 1; i++) { + memcpy(ev, &history[i], sizeof(struct event)); + ev++; + } + + _send_message(dct_cpg_handle, buf, len, EV_MSGTIME); + + free(buf); +} + +/* TODO: ability to send full history, cpg max message sizes limit us */ + +#define SYNC_MAX 1000 + +void send_sync(uint8_t nodeid) +{ + char *buf; + struct timeval tv; + struct dct_header *hd; + struct event *ev; + struct node *node; + uint32_t start_eventid, end_eventid; /* inclusive */ + int i, count, event_count, len; + + node = get_node(nodeid); + if (!node) { + log_error("send_sync no node %u", nodeid); + return; + } + + if (!node->join_eventid) { + log_error("send_sync nodeid %d zero join_eventid", nodeid); + return; + } + + event_count = node->join_eventid + 1; + + if (event_count > SYNC_MAX) { + count = SYNC_MAX; + end_eventid = node->join_eventid; + start_eventid = end_eventid - count + 1; + } else { + count = event_count; + end_eventid = node->join_eventid; + start_eventid = 0; + } + len = sizeof(struct dct_header) + count * sizeof(struct event); + + buf = malloc(len); + if (!buf) { + log_error("send_sync no mem %d", len); + return; + } + memset(buf, 0, len); + + hd = (struct dct_header *)buf; + ev = (struct event *)(buf + sizeof(struct dct_header)); + + gettimeofday(&tv, NULL); + + hd->type = EV_MSGSYNC; + hd->nodeid = our_nodeid; + hd->to_nodeid = nodeid; + hd->tv_sec = tv.tv_sec; + hd->tv_usec = tv.tv_usec; + hd->last_config = last_config_eventid; + hd->event_count = count; + + /* which nodes are synced is part of the replicated state that needs to + be synced to new nodes */ + + i = 0; + list_for_each_entry(node, &nodes, list) { + if (!node->is_member) + continue; + if (!node->needs_sync) + hd->synced_nodes[i++] = node->nodeid; + } + + for (i = start_eventid; i < end_eventid + 1; i++) { + memcpy(ev, &history[i], sizeof(struct event)); + ev++; + } + + log_debug("send_sync to %u len %d count %d events %u-%u", + nodeid, len, count, start_eventid, end_eventid); + + _send_message(dct_cpg_handle, buf, len, EV_MSGSYNC); + + free(buf); +} + +void check_event(struct event *ev_buf) +{ + if (memcmp(ev_buf, &history[ev_buf->eventid], sizeof(struct event))) { + log_error("check_event %u", ev_buf->eventid); + log_event(ev_buf, 1); + log_event(&history[ev_buf->eventid], 1); + } +} + +/* last event read should be the one in which we joined, + after that, we process saved events, after that we + add the sync message these events are being read from */ + +void read_events(struct dct_header *hd, int len, int check_only) +{ + struct event *ev_buf, *ev_his; + struct dct_config *c1, *c2; + struct save_event *se; + uint32_t start_eventid, end_eventid; /* inclusive */ + int count = hd->event_count; + int i; + + if (len != sizeof(struct dct_header) + count * sizeof(struct event)) { + log_error("read_events bad len %d count %d", len, count); + return; + } + + ev_buf = (struct event *)((char *)hd + sizeof(struct dct_header)); + + start_eventid = ev_buf->eventid; + + for (i = 0; i < count; i++) { + ev_his = &history[ev_buf->eventid]; + + if (check_only || ev_his->eventid) + check_event(ev_buf); + else + memcpy(ev_his, ev_buf, sizeof(struct event)); + + end_eventid = ev_buf->eventid; + ev_buf++; + } + + log_debug("read_events from %u to %u len %d count %d events %u-%u %s", + hd->nodeid, hd->to_nodeid, len, count, start_eventid, + end_eventid, check_only ? "check" : "copy"); + + if (check_only) + return; + + /* verify last event in sync series is our join. it should match the + first saved event which should be our join config. we remove this + first saved event which overlaps what we got from sync */ + + ev_his = &history[end_eventid]; + c1 = &ev_his->config; + + if (c1->type != EV_CONFCHG) { + log_error("read_events history event not confchg"); + log_event(ev_his, 1); + } + + se = list_first_entry(&saved_events, struct save_event, list); + c2 = (struct dct_config *)se->buf; + + if (se->type != EV_CONFCHG) { + log_error("read_events saved event not confchg"); + log_event((struct event *)se->buf, 1); + } + + if (!se->we_join) { + log_error("read_events first entry not our join"); + log_config(c2, 1); + } + + if (memcmp(c1, c2, sizeof(struct dct_config))) { + log_error("read_events no config match"); + log_config(c1, 1); + log_config(c2, 1); + } + + eventid = end_eventid + 1; + last_config_eventid = end_eventid; + last_config = c1; + + list_del(&se->list); + free(se); + + init_nodes_list(hd, c1); +} + +void add_history_confchg(struct dct_config *c) +{ + struct event *ev; + + ev = &history[eventid]; + ev->eventid = eventid++; + + memcpy(&ev->config, c, sizeof(struct dct_config)); + + _log_config(c, ev->eventid, 0); + + last_config_eventid = ev->eventid; + last_config = &ev->config; +} + +void add_history_message(struct dct_header *h, int len) +{ + struct event *ev; + + ev = &history[eventid]; + ev->eventid = eventid++; + + memcpy(&ev->header, h, sizeof(struct dct_header)); + + _log_header(h, ev->eventid, 0); +} + +/* process events that occured between our join event (which is processed in + receive_sync and removed from saved_events) and our sync event */ + +void read_saved_events(void) +{ + struct save_event *se, *safe; + int count_c = 0, count_m = 0; + + log_debug("read_saved_events"); + + list_for_each_entry_safe(se, safe, &saved_events, list) { + if (se->type == EV_CONFCHG) { + add_history_confchg((struct dct_config *)&se->buf); + update_nodes_list(last_config, last_config_eventid); + count_c++; + } else { + process_message((struct dct_header *)&se->buf, se->len); + count_m++; + } + + list_del(&se->list); + free(se); + } + + log_debug("read_saved_events confchg %d message %d", count_c, count_m); +} + +void save_message(struct dct_header *hd, int len, int type) +{ + struct save_event *se; + + se = malloc(sizeof(struct save_event) + len); + if (!se) { + log_error("save_message no mem %d", len); + return; + } + memset(se, 0, sizeof(struct save_event) + len); + + se->type = type; + se->len = len; + memcpy(&se->buf, hd, len); + + list_add_tail(&se->list, &saved_events); +} + +void save_confchg(struct dct_config *c, int we_join) +{ + struct save_event *se; + int len; + + len = sizeof(struct save_event) + sizeof(struct dct_config); + + se = malloc(len); + if (!se) { + log_error("save_confchg no mem %d", len); + return; + } + memset(se, 0, len); + + se->type = EV_CONFCHG; + se->we_join = we_join; + memcpy(&se->buf, c, sizeof(struct dct_config)); + + list_add_tail(&se->list, &saved_events); +} + +static void confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, + const struct cpg_address *memb_list, + size_t memb_list_entries, + const struct cpg_address *left_list, + size_t left_list_entries, + const struct cpg_address *join_list, + size_t join_list_entries) +{ + struct dct_config c; + struct node *node; + int we_left, we_join; + int i; + + dispatch_count++; + + memset(&c, 0, sizeof(struct dct_config)); + + c.type = EV_CONFCHG; + + c.memb_count = memb_list_entries; + c.left_count = left_list_entries; + c.join_count = join_list_entries; + + for (i = 0; i < memb_list_entries; i++) + c.memb[i] = (uint8_t)memb_list[i].nodeid; + for (i = 0; i < left_list_entries; i++) + c.left[i] = (uint8_t)left_list[i].nodeid; + for (i = 0; i < join_list_entries; i++) + c.join[i] = (uint8_t)join_list[i].nodeid; + + if (leave_done) { + /* the we_left confchg should be the very last event we see */ + log_error("confchg_cb after leave_done"); + log_config(&c, 1); + return; + } + + we_left = leave_wait && in_left(&c, our_nodeid); + we_join = join_wait && in_memb(&c, our_nodeid); + + if (we_left) { + leave_wait = 0; + leave_done = 1; + cpg_finalize(dct_cpg_handle); + client_dead(dct_cpg_client); + } + + if (!we_left && !in_memb(&c, our_nodeid)) { + log_error("confchg_cb without our_nodeid %u", our_nodeid); + log_config(&c, 1); + return; + } + + if (we_join) { + join_wait = 0; + join_done = 1; + sync_wait = 1; + sync_done = 0; + } + + /* shortcut to bootstrap things, doesn't work if more than one node + join in the first confchg. If it happens, we'll see the initial + members all sitting in sync_wait for an existing member (there is + none) to send a sync message. To do this properly we'd probably + need to have everyone exchange state messages for each confchg so we + could detect when multiple new nodes join in the first confchg of a + newly formed cpg. Or, rely on the join list to be the same as the + member list? */ + + if (we_join && c.memb_count == 1) { + sync_wait = 0; + sync_done = 1; + node = add_node(our_nodeid); + node->is_member = 1; + node->needs_sync = 0; + add_history_confchg(&c); + return; + } + + if (sync_wait) { + save_confchg(&c, we_join); + return; + } + + add_history_confchg(&c); + update_nodes_list(last_config, last_config_eventid); + update_nodes_sync_from(); +} + +static void receive_sync(struct dct_header *hd, int len) +{ + if (sync_wait) { + if (hd->to_nodeid != our_nodeid) { + /* save events to add to history after we're synced */ + save_message(hd, len, EV_MSGSYNC); + } else { + sync_wait = 0; + sync_done = 1; + + read_events(hd, len, 0); + read_saved_events(); + + add_history_message(hd, len); + set_node_synced(our_nodeid); + } + } else { + if (hd->to_nodeid == our_nodeid) + log_debug("receive_sync from %d redundant", hd->nodeid); + + /* check we agree with past events being synced to new node */ + read_events(hd, len, 1); + + add_history_message(hd, len); + set_node_synced(hd->to_nodeid); + } +} + +static void receive_time(struct dct_header *hd, int len) +{ + struct event *ev_buf; + struct node *node; + + if (sync_wait) { + /* save events to add to history after we're synced */ + save_message(hd, len, EV_MSGTIME); + return; + } + + /* check if the sender is a member of the last configuration */ + + if (!in_memb(last_config, hd->nodeid)) { + log_error("receive_time from non member"); + log_header(hd, 1); + log_config(last_config, 1); + return; + } + + /* check our scheme of syncing nodes, where nodes don't send time + messages until they're synced */ + + node = get_node(hd->nodeid); + if (!node) { + log_error("receive_time no node %u", hd->nodeid); + log_header(hd, 1); + return; + } + + if (node->needs_sync) { + log_error("receive_time from %u needs_sync", hd->nodeid); + log_header(hd, 1); + return; + } + + add_history_message(hd, len); + + /* an event from past history (which everyone should know about) is + included in time messages as a way to check that nodes are staying + in sync as we go */ + + ev_buf = (struct event *)((char *)hd + sizeof(struct dct_header)); + + check_event(ev_buf); + + node->last_check_eventid = ev_buf->eventid; + + /* this check currently fails with corosync */ +#if 0 + /* Track whether messages sent in configuration C1 are delivered in C1 + instead of a subsequent C2. Recent reading led me to believe that + this was one of the VS guarantees, but I'm not certain. */ + + if (hd->last_config != last_config_eventid) { + log_error("receive_time in other config"); + log_header(hd, 1); + log_config(last_config, 1); + } +#endif +} + +static void process_message(struct dct_header *hd, int len) +{ + switch (hd->type) { + case EV_MSGTIME: + receive_time(hd, len); + break; + + case EV_MSGSYNC: + receive_sync(hd, len); + break; + }; +} + +static void deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, + uint32_t nodeid, uint32_t pid, void *data, size_t len) +{ + struct dct_header *hd = data; + + dispatch_count++; + + if (len < sizeof(struct dct_header)) { + log_error("deliver_cb short message %d", len); + log_header(hd, 1); + return; + } + + if (hd->nodeid != (uint8_t)nodeid) { + log_error("bad msg nodeid %u %u", hd->nodeid, nodeid); + log_header(hd, 1); + return; + } + + if (join_wait) { + /* the we_joined confchg should be the first event we see */ + log_error("deliver_cb before joined"); + log_header(hd, 1); + return; + } + + if (leave_done) { + /* the we_left confchg should be the last event we see */ + log_error("deliver_cb after left"); + log_header(hd, 1); + return; + } + + process_message(hd, (int)len); +} + +static cpg_callbacks_t cpg_callbacks = { + .cpg_deliver_fn = deliver_cb, + .cpg_confchg_fn = confchg_cb, +}; + +static void process_cpg(int ci) +{ + cpg_error_t error; + + error = cpg_dispatch(dct_cpg_handle, CPG_DISPATCH_ALL); + if (error != CPG_OK) { + log_error("cpg_dispatch error %d", error); + return; + } + + /* can't send from dispatch, so just flag the nodes that need + syncing during the dispatch and send the sync messages now */ + + send_syncs(); +} + +int do_join(void) +{ + cpg_error_t error; + cpg_handle_t h; + int i = 0, fd, ci, rv; + int unused; + uint32_t nodeid; + + sprintf(dct_cpg_name.value, "dct_cpg"); + dct_cpg_name.length = 8; + + error = cpg_initialize(&h, &cpg_callbacks); + if (error != CPG_OK) { + log_error("cpg_initialize error %d", error); + rv = -1; + goto fail_out; + } + + error = cpg_local_get(h, &nodeid); + if (error != CPG_OK) { + log_error("cpg_local_get error %d", error); + rv = -1; + goto fail_fin; + } + + if (nodeid < 1 || nodeid > 255) { + log_error("nodeids must be between 1 and 255"); + rv = -1; + goto fail_fin; + } + our_nodeid = (uint8_t)nodeid; + + cpg_fd_get(h, &fd); + + ci = client_add(fd, process_cpg, cluster_dead); + + dct_cpg_handle = h; + dct_cpg_client = ci; + dct_cpg_fd = fd; + + log_debug("do join our_nodeid %u", our_nodeid); + retry: + error = cpg_join(h, &dct_cpg_name); + if (error == CPG_ERR_TRY_AGAIN) { + sleep(1); + if (!(++i % 10)) + log_debug("cpg_join error retrying"); + goto retry; + } + if (error != CPG_OK) { + log_error("cpg_join error %d", error); + cpg_finalize(h); + rv = -1; + goto fail; + } + + return 0; + + fail: + client_dead(ci); + fail_fin: + cpg_finalize(h); + fail_out: + return rv; +} + +int do_leave(void) +{ + cpg_error_t error; + int i = 0; + + retry: + error = cpg_leave(dct_cpg_handle, &dct_cpg_name); + if (error == CPG_ERR_TRY_AGAIN) { + sleep(1); + if (!(++i % 10)) + log_debug("cpg_leave error retrying"); + goto retry; + } + if (error != CPG_OK) + log_error("cpg_leave error %d", error); + + return 0; +} + +static struct timeval last_send; +static int wait_send; + +/* random interval from 5 to 100 ms between every send */ + +int we_should_send(void) +{ + struct timeval now; + unsigned long ms; + + gettimeofday(&now, NULL); + + ms = time_diff_ms(&last_send, &now); + + if (ms >= wait_send) { + last_send = now; + wait_send = rand_int(5, 100); + return 1; + } + return 0; +} + +int we_should_leave(void) +{ + static unsigned int tries; + int rv; + + if (!opt_leave) + return 0; + + tries++; + + rv = rand_int(1, 2000); + if (rv == 200) { + log_debug("do leave %u", tries); + tries = 0; + return 1; + } + return 0; +} + +int we_should_fail(void) +{ + static unsigned int tries; + int rv; + + if (!opt_fail) + return 0; + + tries++; + + rv = rand_int(1, 2000); + if (rv == 100) { + log_debug("do fail %u", tries); + return 1; + } + return 0; +} + +void loop(void) +{ + void (*workfn) (int ci); + void (*deadfn) (int ci); + int poll_timeout = 5; /* ms */ + int rv, i; + + srandom(time(NULL)); + + sync_wait = 0; + sync_done = 0; + leave_wait = 0; + leave_done = 0; + + join_wait = 1; + join_done = 0; + + eventid = 0; + + memset(history, 0, history_len); + free_nodes_list(); + + do_join(); + + for (;;) { + rv = poll(pollfd, client_maxi + 1, poll_timeout); + if (rv == -1 && errno == EINTR) { + if (daemon_quit) + goto out; + continue; + } + if (rv < 0) { + log_error("poll errno %d", errno); + goto out; + } + + /* + * read events from callbacks + */ + + for (i = 0; i <= client_maxi; i++) { + if (client[i].fd < 0) + continue; + if (pollfd[i].revents & POLLIN) { + workfn = client[i].workfn; + workfn(i); + } + if (pollfd[i].revents & (POLLERR | POLLHUP | POLLNVAL)){ + deadfn = client[i].deadfn; + deadfn(i); + } + } + + if (got_error) { + fflush(stdout); + fflush(stderr); + exit(EXIT_FAILURE); + } + + /* + * do things that create events (send messages, leave, fail) + */ + + if (!join_done && dispatch_count) { + /* a confchg for our join should be the first event we + see, so this shouldn't happen. without the + dispatch_count condition this error triggers + sometimes, indicating zero dispatch_count */ + log_error("event before join done dispatches %u", + dispatch_count); + continue; + } + + if (!sync_done) { + /* don't do things until we're synced */ + continue; + } + + if (leave_done) + break; + + if (leave_wait) { + /* don't send messages while waiting for our leave + to complete, but we must keep reading events until + we've left, which is when a confchg for our leave + arrives. */ + continue; + } + + if (we_should_leave()) { + leave_wait = 1; + leave_done = 0; + do_leave(); + continue; + } + + if (we_should_send()) + send_time(); + + if (we_should_fail()) { + fflush(stdout); + fflush(stderr); + exit(2); + } + } + out: + return; +} + +void print_usage(void) +{ + printf("Output:\n"); + printf("ERROR: <error string> (stderr)\n"); + printf("H: <event string> (stdout)\n"); + printf("D: <debug string> (stdout)\n"); + printf("\n"); + printf("Options:\n"); + printf(" -H [0|1] event history output [off|on] default 1\n"); + printf(" -D [0|1] debug output [off|on] default 1\n"); + printf(" -f [0|1] fail included in test [off|on] default 1\n"); + printf(" -l [0|1] leave included in test [off|on] default 1\n"); + printf(" -c continue after error\n"); + + printf("to prevent history from periodically restarting from 0,\n" + "keep one node from either leaving or failing with -f0 -l0\n"); +} + +#define HISTORY_EVENTS (1024 * 1024) + +int main(int argc, char **argv) +{ + pid_t pid; + int status, code; + int cont = 1; + int optchar; + + while (cont) { + optchar = getopt(argc, argv, "H:D:cf:l:h"); + + switch (optchar) { + case 'H': + opt_print_event = atoi(optarg); + break; + + case 'D': + opt_print_debug = atoi(optarg); + break; + + case 'f': + opt_fail = atoi(optarg); + break; + + case 'l': + opt_leave = atoi(optarg); + break; + + case 'c': + continue_after_error = 1; + break; + + case 'h': + print_usage(); + exit(EXIT_SUCCESS); + + case EOF: + cont = 0; + break; + }; + } + + srandom(time(NULL)); + + history_len = HISTORY_EVENTS * sizeof(struct event); + + history = malloc(history_len); + if (!history) { + log_error("history no mem %d", history_len); + exit(-1); + } + + INIT_LIST_HEAD(&nodes); + INIT_LIST_HEAD(&saved_events); + + while (1) { + pid = fork(); + + if (!pid) { + /* + * repeat join/work/leave until exit + */ + + while (1) { + loop(); + sleep(rand_int(0, 3)); + } + } + + /* + * parent waits for exit, + * exit 1 is error; stop to see what went wrong + * exit 2 is intentional part of test, keep going + */ + + waitpid(pid, &status, 0); + + if (WIFEXITED(status)) { + code = WEXITSTATUS(status); + if (code == 1 && !continue_after_error) + break; + } else { + printf("not WIFEXITED\n"); + break; + } + sleep(rand_int(0, 3)); + } + + return 1; +} + diff --git a/cpgx/list.h b/cpgx/list.h new file mode 100644 index 0000000..8100cbc --- /dev/null +++ b/cpgx/list.h @@ -0,0 +1,336 @@ +/* Copied from include/linux/list.h */ + +#ifndef _LINUX_LIST_H +#define _LINUX_LIST_H + +/** + * container_of - cast a member of a structure out to the containing structure + * + * @ptr: the pointer to the member. + * @type: the type of the container struct this is embedded in. + * @member: the name of the member within the struct. + * + */ +#define container_of(ptr, type, member) ({ \ + const typeof( ((type *)0)->member ) *__mptr = (ptr); \ + (type *)( (char *)__mptr - offsetof(type,member) );}) + + +/* + * These are non-NULL pointers that will result in page faults + * under normal circumstances, used to verify that nobody uses + * non-initialized list entries. + */ +#define LIST_POISON1 ((void *) 0x00100100) +#define LIST_POISON2 ((void *) 0x00200200) + +/* + * Simple doubly linked list implementation. + * + * Some of the internal functions ("__xxx") are useful when + * manipulating whole lists rather than single entries, as + * sometimes we already know the next/prev entries and we can + * generate better code by using them directly rather than + * using the generic single-entry routines. + */ + +struct list_head { + struct list_head *next, *prev; +}; + +#define LIST_HEAD_INIT(name) { &(name), &(name) } + +#define LIST_HEAD(name) \ + struct list_head name = LIST_HEAD_INIT(name) + +#define INIT_LIST_HEAD(ptr) do { \ + (ptr)->next = (ptr); (ptr)->prev = (ptr); \ +} while (0) + +/* + * Insert a new entry between two known consecutive entries. + * + * This is only for internal list manipulation where we know + * the prev/next entries already! + */ +static inline void __list_add(struct list_head *new, + struct list_head *prev, + struct list_head *next) +{ + next->prev = new; + new->next = next; + new->prev = prev; + prev->next = new; +} + +/** + * list_add - add a new entry + * @new: new entry to be added + * @head: list head to add it after + * + * Insert a new entry after the specified head. + * This is good for implementing stacks. + */ +static inline void list_add(struct list_head *new, struct list_head *head) +{ + __list_add(new, head, head->next); +} + +/** + * list_add_tail - add a new entry + * @new: new entry to be added + * @head: list head to add it before + * + * Insert a new entry before the specified head. + * This is useful for implementing queues. + */ +static inline void list_add_tail(struct list_head *new, struct list_head *head) +{ + __list_add(new, head->prev, head); +} + +/* + * Delete a list entry by making the prev/next entries + * point to each other. + * + * This is only for internal list manipulation where we know + * the prev/next entries already! + */ +static inline void __list_del(struct list_head * prev, struct list_head * next) +{ + next->prev = prev; + prev->next = next; +} + +/** + * list_del - deletes entry from list. + * @entry: the element to delete from the list. + * Note: list_empty on entry does not return true after this, the entry is + * in an undefined state. + */ +static inline void list_del(struct list_head *entry) +{ + __list_del(entry->prev, entry->next); + entry->next = LIST_POISON1; + entry->prev = LIST_POISON2; +} + +/** + * list_del_init - deletes entry from list and reinitialize it. + * @entry: the element to delete from the list. + */ +static inline void list_del_init(struct list_head *entry) +{ + __list_del(entry->prev, entry->next); + INIT_LIST_HEAD(entry); +} + +/** + * list_move - delete from one list and add as another's head + * @list: the entry to move + * @head: the head that will precede our entry + */ +static inline void list_move(struct list_head *list, struct list_head *head) +{ + __list_del(list->prev, list->next); + list_add(list, head); +} + +/** + * list_move_tail - delete from one list and add as another's tail + * @list: the entry to move + * @head: the head that will follow our entry + */ +static inline void list_move_tail(struct list_head *list, + struct list_head *head) +{ + __list_del(list->prev, list->next); + list_add_tail(list, head); +} + +/** + * list_empty - tests whether a list is empty + * @head: the list to test. + */ +static inline int list_empty(const struct list_head *head) +{ + return head->next == head; +} + +/** + * list_empty_careful - tests whether a list is + * empty _and_ checks that no other CPU might be + * in the process of still modifying either member + * + * NOTE: using list_empty_careful() without synchronization + * can only be safe if the only activity that can happen + * to the list entry is list_del_init(). Eg. it cannot be used + * if another CPU could re-list_add() it. + * + * @head: the list to test. + */ +static inline int list_empty_careful(const struct list_head *head) +{ + struct list_head *next = head->next; + return (next == head) && (next == head->prev); +} + +static inline void __list_splice(struct list_head *list, + struct list_head *head) +{ + struct list_head *first = list->next; + struct list_head *last = list->prev; + struct list_head *at = head->next; + + first->prev = head; + head->next = first; + + last->next = at; + at->prev = last; +} + +/** + * list_splice - join two lists + * @list: the new list to add. + * @head: the place to add it in the first list. + */ +static inline void list_splice(struct list_head *list, struct list_head *head) +{ + if (!list_empty(list)) + __list_splice(list, head); +} + +/** + * list_splice_init - join two lists and reinitialise the emptied list. + * @list: the new list to add. + * @head: the place to add it in the first list. + * + * The list at @list is reinitialised + */ +static inline void list_splice_init(struct list_head *list, + struct list_head *head) +{ + if (!list_empty(list)) { + __list_splice(list, head); + INIT_LIST_HEAD(list); + } +} + +/** + * list_entry - get the struct for this entry + * @ptr: the &struct list_head pointer. + * @type: the type of the struct this is embedded in. + * @member: the name of the list_struct within the struct. + */ +#define list_entry(ptr, type, member) \ + container_of(ptr, type, member) + +/** + * list_first_entry - get the first element from a list + * @ptr: the list head to take the element from. + * @type: the type of the struct this is embedded in. + * @member: the name of the list_struct within the struct. + * + * Note, that list is expected to be not empty. + */ +#define list_first_entry(ptr, type, member) \ + list_entry((ptr)->next, type, member) + +/** + * list_for_each - iterate over a list + * @pos: the &struct list_head to use as a loop counter. + * @head: the head for your list. + */ +#define list_for_each(pos, head) \ + for (pos = (head)->next; pos != (head); pos = pos->next) + +/** + * __list_for_each - iterate over a list + * @pos: the &struct list_head to use as a loop counter. + * @head: the head for your list. + * + * This variant differs from list_for_each() in that it's the + * simplest possible list iteration code, no prefetching is done. + * Use this for code that knows the list to be very short (empty + * or 1 entry) most of the time. + */ +#define __list_for_each(pos, head) \ + for (pos = (head)->next; pos != (head); pos = pos->next) + +/** + * list_for_each_prev - iterate over a list backwards + * @pos: the &struct list_head to use as a loop counter. + * @head: the head for your list. + */ +#define list_for_each_prev(pos, head) \ + for (pos = (head)->prev; pos != (head); pos = pos->prev) + +/** + * list_for_each_safe - iterate over a list safe against removal of list entry + * @pos: the &struct list_head to use as a loop counter. + * @n: another &struct list_head to use as temporary storage + * @head: the head for your list. + */ +#define list_for_each_safe(pos, n, head) \ + for (pos = (head)->next, n = pos->next; pos != (head); \ + pos = n, n = pos->next) + +/** + * list_for_each_entry - iterate over list of given type + * @pos: the type * to use as a loop counter. + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry(pos, head, member) \ + for (pos = list_entry((head)->next, typeof(*pos), member); \ + &pos->member != (head); \ + pos = list_entry(pos->member.next, typeof(*pos), member)) + +/** + * list_for_each_entry_reverse - iterate backwards over list of given type. + * @pos: the type * to use as a loop counter. + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry_reverse(pos, head, member) \ + for (pos = list_entry((head)->prev, typeof(*pos), member); \ + &pos->member != (head); \ + pos = list_entry(pos->member.prev, typeof(*pos), member)) + +/** + * list_prepare_entry - prepare a pos entry for use as a start point in + * list_for_each_entry_continue + * @pos: the type * to use as a start point + * @head: the head of the list + * @member: the name of the list_struct within the struct. + */ +#define list_prepare_entry(pos, head, member) \ + ((pos) ? : list_entry(head, typeof(*pos), member)) + +/** + * list_for_each_entry_continue - iterate over list of given type + * continuing after existing point + * @pos: the type * to use as a loop counter. + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry_continue(pos, head, member) \ + for (pos = list_entry(pos->member.next, typeof(*pos), member); \ + &pos->member != (head); \ + pos = list_entry(pos->member.next, typeof(*pos), member)) + +/** + * list_for_each_entry_safe - iterate over list of given type safe against removal of list entry + * @pos: the type * to use as a loop counter. + * @n: another type * to use as temporary storage + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry_safe(pos, n, head, member) \ + for (pos = list_entry((head)->next, typeof(*pos), member), \ + n = list_entry(pos->member.next, typeof(*pos), member); \ + &pos->member != (head); \ + pos = n, n = list_entry(n->member.next, typeof(*n), member)) + + +#endif |
