diff options
Diffstat (limited to 'daemons/cmirrord/cluster.c')
-rw-r--r-- | daemons/cmirrord/cluster.c | 1661 |
1 files changed, 1661 insertions, 0 deletions
diff --git a/daemons/cmirrord/cluster.c b/daemons/cmirrord/cluster.c new file mode 100644 index 00000000..0cfe5a9e --- /dev/null +++ b/daemons/cmirrord/cluster.c @@ -0,0 +1,1661 @@ +/* + * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. + * + * This copyrighted material is made available to anyone wishing to use, + * modify, copy, or redistribute it subject to the terms and conditions + * of the GNU Lesser General Public License v.2.1. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include <errno.h> +#include <string.h> +#include <sys/types.h> +#include <unistd.h> +#include <stdint.h> +#include <stdlib.h> +#include <signal.h> +#include <sys/socket.h> /* These are for OpenAIS CPGs */ +#include <sys/select.h> +#include <sys/un.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <openais/saAis.h> +#include <openais/cpg.h> +#include <openais/saCkpt.h> + +#include "dm-log-userspace.h" +#include "libdevmapper.h" +#include "functions.h" +#include "local.h" +#include "common.h" +#include "logging.h" +#include "link_mon.h" +#include "cluster.h" + +/* Open AIS error codes */ +#define str_ais_error(x) \ + ((x) == SA_AIS_OK) ? "SA_AIS_OK" : \ + ((x) == SA_AIS_ERR_LIBRARY) ? "SA_AIS_ERR_LIBRARY" : \ + ((x) == SA_AIS_ERR_VERSION) ? "SA_AIS_ERR_VERSION" : \ + ((x) == SA_AIS_ERR_INIT) ? "SA_AIS_ERR_INIT" : \ + ((x) == SA_AIS_ERR_TIMEOUT) ? "SA_AIS_ERR_TIMEOUT" : \ + ((x) == SA_AIS_ERR_TRY_AGAIN) ? "SA_AIS_ERR_TRY_AGAIN" : \ + ((x) == SA_AIS_ERR_INVALID_PARAM) ? "SA_AIS_ERR_INVALID_PARAM" : \ + ((x) == SA_AIS_ERR_NO_MEMORY) ? "SA_AIS_ERR_NO_MEMORY" : \ + ((x) == SA_AIS_ERR_BAD_HANDLE) ? "SA_AIS_ERR_BAD_HANDLE" : \ + ((x) == SA_AIS_ERR_BUSY) ? "SA_AIS_ERR_BUSY" : \ + ((x) == SA_AIS_ERR_ACCESS) ? "SA_AIS_ERR_ACCESS" : \ + ((x) == SA_AIS_ERR_NOT_EXIST) ? "SA_AIS_ERR_NOT_EXIST" : \ + ((x) == SA_AIS_ERR_NAME_TOO_LONG) ? "SA_AIS_ERR_NAME_TOO_LONG" : \ + ((x) == SA_AIS_ERR_EXIST) ? "SA_AIS_ERR_EXIST" : \ + ((x) == SA_AIS_ERR_NO_SPACE) ? "SA_AIS_ERR_NO_SPACE" : \ + ((x) == SA_AIS_ERR_INTERRUPT) ? "SA_AIS_ERR_INTERRUPT" : \ + ((x) == SA_AIS_ERR_NAME_NOT_FOUND) ? "SA_AIS_ERR_NAME_NOT_FOUND" : \ + ((x) == SA_AIS_ERR_NO_RESOURCES) ? "SA_AIS_ERR_NO_RESOURCES" : \ + ((x) == SA_AIS_ERR_NOT_SUPPORTED) ? "SA_AIS_ERR_NOT_SUPPORTED" : \ + ((x) == SA_AIS_ERR_BAD_OPERATION) ? "SA_AIS_ERR_BAD_OPERATION" : \ + ((x) == SA_AIS_ERR_FAILED_OPERATION) ? "SA_AIS_ERR_FAILED_OPERATION" : \ + ((x) == SA_AIS_ERR_MESSAGE_ERROR) ? "SA_AIS_ERR_MESSAGE_ERROR" : \ + ((x) == SA_AIS_ERR_QUEUE_FULL) ? "SA_AIS_ERR_QUEUE_FULL" : \ + ((x) == SA_AIS_ERR_QUEUE_NOT_AVAILABLE) ? "SA_AIS_ERR_QUEUE_NOT_AVAILABLE" : \ + ((x) == SA_AIS_ERR_BAD_FLAGS) ? "SA_AIS_ERR_BAD_FLAGS" : \ + ((x) == SA_AIS_ERR_TOO_BIG) ? "SA_AIS_ERR_TOO_BIG" : \ + ((x) == SA_AIS_ERR_NO_SECTIONS) ? "SA_AIS_ERR_NO_SECTIONS" : \ + "ais_error_unknown" + +#define DM_ULOG_RESPONSE 0x1000 /* in last byte of 32-bit value */ +#define DM_ULOG_CHECKPOINT_READY 21 +#define DM_ULOG_MEMBER_JOIN 22 + +#define _RQ_TYPE(x) \ + ((x) == DM_ULOG_CHECKPOINT_READY) ? "DM_ULOG_CHECKPOINT_READY": \ + ((x) == DM_ULOG_MEMBER_JOIN) ? "DM_ULOG_MEMBER_JOIN": \ + RQ_TYPE((x) & ~DM_ULOG_RESPONSE) + +static uint32_t my_cluster_id = 0xDEAD; +static SaCkptHandleT ckpt_handle = 0; +static SaCkptCallbacksT callbacks = { 0, 0 }; +static SaVersionT version = { 'B', 1, 1 }; + +#define DEBUGGING_HISTORY 100 +//static char debugging[DEBUGGING_HISTORY][128]; +//static int idx = 0; +#define LOG_SPRINT(cc, f, arg...) do { \ + cc->idx++; \ + cc->idx = cc->idx % DEBUGGING_HISTORY; \ + sprintf(cc->debugging[cc->idx], f, ## arg); \ + } while (0) + +static int log_resp_rec = 0; + +struct checkpoint_data { + uint32_t requester; + char uuid[CPG_MAX_NAME_LENGTH]; + + int bitmap_size; /* in bytes */ + char *sync_bits; + char *clean_bits; + char *recovering_region; + struct checkpoint_data *next; +}; + +#define INVALID 0 +#define VALID 1 +#define LEAVING 2 + +#define MAX_CHECKPOINT_REQUESTERS 10 +struct clog_cpg { + struct dm_list list; + + uint32_t lowest_id; + cpg_handle_t handle; + struct cpg_name name; + uint64_t luid; + + /* Are we the first, or have we received checkpoint? */ + int state; + int cpg_state; /* FIXME: debugging */ + int free_me; + int delay; + int resend_requests; + struct dm_list startup_list; + struct dm_list working_list; + + int checkpoints_needed; + uint32_t checkpoint_requesters[MAX_CHECKPOINT_REQUESTERS]; + struct checkpoint_data *checkpoint_list; + int idx; + char debugging[DEBUGGING_HISTORY][128]; +}; + +static struct dm_list clog_cpg_list; + +/* + * cluster_send + * @rq + * + * Returns: 0 on success, -Exxx on error + */ +int cluster_send(struct clog_request *rq) +{ + int r; + int count=0; + int found; + struct iovec iov; + struct clog_cpg *entry; + + dm_list_iterate_items(entry, &clog_cpg_list) + if (!strncmp(entry->name.value, rq->u_rq.uuid, + CPG_MAX_NAME_LENGTH)) { + found = 1; + break; + } + + if (!found) { + rq->u_rq.error = -ENOENT; + return -ENOENT; + } + + /* + * Once the request heads for the cluster, the luid looses + * all its meaning. + */ + rq->u_rq.luid = 0; + + iov.iov_base = rq; + iov.iov_len = sizeof(struct clog_request) + rq->u_rq.data_size; + + if (entry->cpg_state != VALID) + return -EINVAL; + + do { + r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1); + if (r != SA_AIS_ERR_TRY_AGAIN) + break; + count++; + if (count < 10) + LOG_PRINT("[%s] Retry #%d of cpg_mcast_joined: %s", + SHORT_UUID(rq->u_rq.uuid), count, + str_ais_error(r)); + else if ((count < 100) && !(count % 10)) + LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s", + SHORT_UUID(rq->u_rq.uuid), count, + str_ais_error(r)); + else if ((count < 1000) && !(count % 100)) + LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s", + SHORT_UUID(rq->u_rq.uuid), count, + str_ais_error(r)); + else if ((count < 10000) && !(count % 1000)) + LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s - " + "OpenAIS not handling the load?", + SHORT_UUID(rq->u_rq.uuid), count, + str_ais_error(r)); + usleep(1000); + } while (1); + + if (r == CPG_OK) + return 0; + + /* error codes found in openais/cpg.h */ + LOG_ERROR("cpg_mcast_joined error: %s", str_ais_error(r)); + + rq->u_rq.error = -EBADE; + return -EBADE; +} + +static struct clog_request *get_matching_rq(struct clog_request *rq, + struct dm_list *l) +{ + struct clog_request *match, *n; + + dm_list_iterate_items_safe(match, n, l) + if (match->u_rq.seq == rq->u_rq.seq) { + dm_list_del(&match->list); + return match; + } + + return NULL; +} + +static char rq_buffer[DM_ULOG_REQUEST_SIZE]; +static int handle_cluster_request(struct clog_cpg *entry, + struct clog_request *rq, int server) +{ + int r = 0; + struct clog_request *tmp = (struct clog_request *)rq_buffer; + + /* + * We need a separate dm_ulog_request struct, one that can carry + * a return payload. Otherwise, the memory address after + * rq will be altered - leading to problems + */ + memset(rq_buffer, 0, sizeof(rq_buffer)); + memcpy(tmp, rq, sizeof(struct clog_request) + rq->u_rq.data_size); + + /* + * With resumes, we only handle our own. + * Resume is a special case that requires + * local action (to set up CPG), followed by + * a cluster action to co-ordinate reading + * the disk and checkpointing + */ + if (tmp->u_rq.request_type == DM_ULOG_RESUME) { + if (tmp->originator == my_cluster_id) { + r = do_request(tmp, server); + + r = kernel_send(&tmp->u_rq); + if (r < 0) + LOG_ERROR("Failed to send resume response to kernel"); + } + return r; + } + + r = do_request(tmp, server); + + if (server && + (tmp->u_rq.request_type != DM_ULOG_CLEAR_REGION) && + (tmp->u_rq.request_type != DM_ULOG_POSTSUSPEND)) { + tmp->u_rq.request_type |= DM_ULOG_RESPONSE; + + /* + * Errors from previous functions are in the rq struct. + */ + r = cluster_send(tmp); + if (r < 0) + LOG_ERROR("cluster_send failed: %s", strerror(-r)); + } + + return r; +} + +static int handle_cluster_response(struct clog_cpg *entry, + struct clog_request *rq) +{ + int r = 0; + struct clog_request *orig_rq; + + /* + * If I didn't send it, then I don't care about the response + */ + if (rq->originator != my_cluster_id) + return 0; + + rq->u_rq.request_type &= ~DM_ULOG_RESPONSE; + orig_rq = get_matching_rq(rq, &entry->working_list); + + if (!orig_rq) { + /* Unable to find match for response */ + + LOG_ERROR("[%s] No match for cluster response: %s:%u", + SHORT_UUID(rq->u_rq.uuid), + _RQ_TYPE(rq->u_rq.request_type), + rq->u_rq.seq); + + LOG_ERROR("Current local list:"); + if (dm_list_empty(&entry->working_list)) + LOG_ERROR(" [none]"); + + dm_list_iterate_items(orig_rq, &entry->working_list) + LOG_ERROR(" [%s] %s:%u", + SHORT_UUID(orig_rq->u_rq.uuid), + _RQ_TYPE(orig_rq->u_rq.request_type), + orig_rq->u_rq.seq); + + return -EINVAL; + } + + if (log_resp_rec > 0) { + LOG_COND(log_resend_requests, + "[%s] Response received to %s/#%u", + SHORT_UUID(rq->u_rq.uuid), + _RQ_TYPE(rq->u_rq.request_type), + rq->u_rq.seq); + log_resp_rec--; + } + + /* FIXME: Ensure memcpy cannot explode */ + memcpy(orig_rq, rq, sizeof(*rq) + rq->u_rq.data_size); + + r = kernel_send(&orig_rq->u_rq); + if (r) + LOG_ERROR("Failed to send response to kernel"); + + free(orig_rq); + return r; +} + +static struct clog_cpg *find_clog_cpg(cpg_handle_t handle) +{ + struct clog_cpg *match; + + dm_list_iterate_items(match, &clog_cpg_list) + if (match->handle == handle) + return match; + + return NULL; +} + +/* + * prepare_checkpoint + * @entry: clog_cpg describing the log + * @cp_requester: nodeid requesting the checkpoint + * + * Creates and fills in a new checkpoint_data struct. + * + * Returns: checkpoint_data on success, NULL on error + */ +static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry, + uint32_t cp_requester) +{ + int r; + struct checkpoint_data *new; + + if (entry->state != VALID) { + /* + * We can't store bitmaps yet, because the log is not + * valid yet. + */ + LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet", + cp_requester); + return NULL; + } + + new = malloc(sizeof(*new)); + if (!new) { + LOG_ERROR("Unable to create checkpoint data for %u", + cp_requester); + return NULL; + } + memset(new, 0, sizeof(*new)); + new->requester = cp_requester; + strncpy(new->uuid, entry->name.value, entry->name.length); + + new->bitmap_size = push_state(entry->name.value, entry->luid, + "clean_bits", + &new->clean_bits, cp_requester); + if (new->bitmap_size <= 0) { + LOG_ERROR("Failed to store clean_bits to checkpoint for node %u", + new->requester); + free(new); + return NULL; + } + + new->bitmap_size = push_state(entry->name.value, entry->luid, + "sync_bits", + &new->sync_bits, cp_requester); + if (new->bitmap_size <= 0) { + LOG_ERROR("Failed to store sync_bits to checkpoint for node %u", + new->requester); + free(new->clean_bits); + free(new); + return NULL; + } + + r = push_state(entry->name.value, entry->luid, + "recovering_region", + &new->recovering_region, cp_requester); + if (r <= 0) { + LOG_ERROR("Failed to store recovering_region to checkpoint for node %u", + new->requester); + free(new->sync_bits); + free(new->clean_bits); + free(new); + return NULL; + } + LOG_DBG("[%s] Checkpoint prepared for node %u:", + SHORT_UUID(new->uuid), new->requester); + LOG_DBG(" bitmap_size = %d", new->bitmap_size); + + return new; +} + +/* + * free_checkpoint + * @cp: the checkpoint_data struct to free + * + */ +static void free_checkpoint(struct checkpoint_data *cp) +{ + free(cp->recovering_region); + free(cp->sync_bits); + free(cp->clean_bits); + free(cp); +} + +static int export_checkpoint(struct checkpoint_data *cp) +{ + SaCkptCheckpointCreationAttributesT attr; + SaCkptCheckpointHandleT h; + SaCkptSectionIdT section_id; + SaCkptSectionCreationAttributesT section_attr; + SaCkptCheckpointOpenFlagsT flags; + SaNameT name; + SaAisErrorT rv; + struct clog_request *rq; + int len, r = 0; + char buf[32]; + + LOG_DBG("Sending checkpointed data to %u", cp->requester); + + len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, + "bitmaps_%s_%u", SHORT_UUID(cp->uuid), cp->requester); + name.length = len; + + len = strlen(cp->recovering_region) + 1; + + attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS; + attr.checkpointSize = cp->bitmap_size * 2 + len; + + attr.retentionDuration = SA_TIME_MAX; + attr.maxSections = 4; /* don't know why we need +1 */ + + attr.maxSectionSize = (cp->bitmap_size > len) ? cp->bitmap_size : len; + attr.maxSectionIdSize = 22; + + flags = SA_CKPT_CHECKPOINT_READ | + SA_CKPT_CHECKPOINT_WRITE | + SA_CKPT_CHECKPOINT_CREATE; + +open_retry: + rv = saCkptCheckpointOpen(ckpt_handle, &name, &attr, flags, 0, &h); + if (rv == SA_AIS_ERR_TRY_AGAIN) { + LOG_ERROR("export_checkpoint: ckpt open retry"); + usleep(1000); + goto open_retry; + } + + if (rv == SA_AIS_ERR_EXIST) { + LOG_DBG("export_checkpoint: checkpoint already exists"); + return -EEXIST; + } + + if (rv != SA_AIS_OK) { + LOG_ERROR("[%s] Failed to open checkpoint for %u: %s", + SHORT_UUID(cp->uuid), cp->requester, + str_ais_error(rv)); + return -EIO; /* FIXME: better error */ + } + + /* + * Add section for sync_bits + */ + section_id.idLen = snprintf(buf, 32, "sync_bits"); + section_id.id = (unsigned char *)buf; + section_attr.sectionId = §ion_id; + section_attr.expirationTime = SA_TIME_END; + +sync_create_retry: + rv = saCkptSectionCreate(h, §ion_attr, + cp->sync_bits, cp->bitmap_size); + if (rv == SA_AIS_ERR_TRY_AGAIN) { + LOG_ERROR("Sync checkpoint section create retry"); + usleep(1000); + goto sync_create_retry; + } + + if (rv == SA_AIS_ERR_EXIST) { + LOG_DBG("Sync checkpoint section already exists"); + saCkptCheckpointClose(h); + return -EEXIST; + } + + if (rv != SA_AIS_OK) { + LOG_ERROR("Sync checkpoint section creation failed: %s", + str_ais_error(rv)); + saCkptCheckpointClose(h); + return -EIO; /* FIXME: better error */ + } + + /* + * Add section for clean_bits + */ + section_id.idLen = snprintf(buf, 32, "clean_bits"); + section_id.id = (unsigned char *)buf; + section_attr.sectionId = §ion_id; + section_attr.expirationTime = SA_TIME_END; + +clean_create_retry: + rv = saCkptSectionCreate(h, §ion_attr, cp->clean_bits, cp->bitmap_size); + if (rv == SA_AIS_ERR_TRY_AGAIN) { + LOG_ERROR("Clean checkpoint section create retry"); + usleep(1000); + goto clean_create_retry; + } + + if (rv == SA_AIS_ERR_EXIST) { + LOG_DBG("Clean checkpoint section already exists"); + saCkptCheckpointClose(h); + return -EEXIST; + } + + if (rv != SA_AIS_OK) { + LOG_ERROR("Clean checkpoint section creation failed: %s", + str_ais_error(rv)); + saCkptCheckpointClose(h); + return -EIO; /* FIXME: better error */ + } + + /* + * Add section for recovering_region + */ + section_id.idLen = snprintf(buf, 32, "recovering_region"); + section_id.id = (unsigned char *)buf; + section_attr.sectionId = §ion_id; + section_attr.expirationTime = SA_TIME_END; + +rr_create_retry: + rv = saCkptSectionCreate(h, §ion_attr, cp->recovering_region, + strlen(cp->recovering_region) + 1); + if (rv == SA_AIS_ERR_TRY_AGAIN) { + LOG_ERROR("RR checkpoint section create retry"); + usleep(1000); + goto rr_create_retry; + } + + if (rv == SA_AIS_ERR_EXIST) { + LOG_DBG("RR checkpoint section already exists"); + saCkptCheckpointClose(h); + return -EEXIST; + } + + if (rv != SA_AIS_OK) { + LOG_ERROR("RR checkpoint section creation failed: %s", + str_ais_error(rv)); + saCkptCheckpointClose(h); + return -EIO; /* FIXME: better error */ + } + + LOG_DBG("export_checkpoint: closing checkpoint"); + saCkptCheckpointClose(h); + + rq = malloc(DM_ULOG_REQUEST_SIZE); + if (!rq) { + LOG_ERROR("export_checkpoint: Unable to allocate transfer structs"); + return -ENOMEM; + } + memset(rq, 0, sizeof(*rq)); + + dm_list_init(&rq->list); + rq->u_rq.request_type = DM_ULOG_CHECKPOINT_READY; + rq->originator = cp->requester; /* FIXME: hack to overload meaning of originator */ + strncpy(rq->u_rq.uuid, cp->uuid, CPG_MAX_NAME_LENGTH); + rq->u_rq.seq = my_cluster_id; + + r = cluster_send(rq); + if (r) + LOG_ERROR("Failed to send checkpoint ready notice: %s", + strerror(-r)); + + free(rq); + return 0; +} + +static int import_checkpoint(struct clog_cpg *entry, int no_read) +{ + int rtn = 0; + SaCkptCheckpointHandleT h; + SaCkptSectionIterationHandleT itr; + SaCkptSectionDescriptorT desc; + SaCkptIOVectorElementT iov; + SaNameT name; + SaAisErrorT rv; + char *bitmap = NULL; + int len; + + bitmap = malloc(1024*1024); + if (!bitmap) + return -ENOMEM; + + len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u", + SHORT_UUID(entry->name.value), my_cluster_id); + name.length = len; + +open_retry: + rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL, + SA_CKPT_CHECKPOINT_READ, 0, &h); + if (rv == SA_AIS_ERR_TRY_AGAIN) { + LOG_ERROR("import_checkpoint: ckpt open retry"); + usleep(1000); + goto open_retry; + } + + if (rv != SA_AIS_OK) { + LOG_ERROR("[%s] Failed to open checkpoint: %s", + SHORT_UUID(entry->name.value), str_ais_error(rv)); + return -EIO; /* FIXME: better error */ + } + +unlink_retry: + rv = saCkptCheckpointUnlink(ckpt_handle, &name); + if (rv == SA_AIS_ERR_TRY_AGAIN) { + LOG_ERROR("import_checkpoint: ckpt unlink retry"); + usleep(1000); + goto unlink_retry; + } + + if (no_read) { + LOG_DBG("Checkpoint for this log already received"); + goto no_read; + } + +init_retry: + rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, + SA_TIME_END, &itr); + if (rv == SA_AIS_ERR_TRY_AGAIN) { + LOG_ERROR("import_checkpoint: sync create retry"); + usleep(1000); + goto init_retry; + } + + if (rv != SA_AIS_OK) { + LOG_ERROR("[%s] Sync checkpoint section creation failed: %s", + SHORT_UUID(entry->name.value), str_ais_error(rv)); + return -EIO; /* FIXME: better error */ + } + + len = 0; + while (1) { + rv = saCkptSectionIterationNext(itr, &desc); + if (rv == SA_AIS_OK) + len++; + else if ((rv == SA_AIS_ERR_NO_SECTIONS) && len) + break; + else if (rv != SA_AIS_ERR_TRY_AGAIN) { + LOG_ERROR("saCkptSectionIterationNext failure: %d", rv); + break; + } + } + saCkptSectionIterationFinalize(itr); + if (len != 3) { + LOG_ERROR("import_checkpoint: %d checkpoint sections found", + len); + usleep(1000); + goto init_retry; + } + saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, + SA_TIME_END, &itr); + + while (1) { + rv = saCkptSectionIterationNext(itr, &desc); + if (rv == SA_AIS_ERR_NO_SECTIONS) + break; + + if (rv == SA_AIS_ERR_TRY_AGAIN) { + LOG_ERROR("import_checkpoint: ckpt iternext retry"); + usleep(1000); + continue; + } + + if (rv != SA_AIS_OK) { + LOG_ERROR("import_checkpoint: clean checkpoint section " + "creation failed: %s", str_ais_error(rv)); + rtn = -EIO; /* FIXME: better error */ + goto fail; + } + + if (!desc.sectionSize) { + LOG_ERROR("Checkpoint section empty"); + continue; + } + + memset(bitmap, 0, sizeof(*bitmap)); + iov.sectionId = desc.sectionId; + iov.dataBuffer = bitmap; + iov.dataSize = desc.sectionSize; + iov.dataOffset = 0; + + read_retry: + rv = saCkptCheckpointRead(h, &iov, 1, NULL); + if (rv == SA_AIS_ERR_TRY_AGAIN) { + LOG_ERROR("ckpt read retry"); + usleep(1000); + goto read_retry; + } + + if (rv != SA_AIS_OK) { + LOG_ERROR("import_checkpoint: ckpt read error: %s", + str_ais_error(rv)); + rtn = -EIO; /* FIXME: better error */ + goto fail; + } + + if (iov.readSize) { + if (pull_state(entry->name.value, entry->luid, + (char *)desc.sectionId.id, bitmap, + iov.readSize)) { + LOG_ERROR("Error loading state"); + rtn = -EIO; + goto fail; + } + } else { + /* Need to request new checkpoint */ + rtn = -EAGAIN; + goto fail; + } + } + +fail: + saCkptSectionIterationFinalize(itr); +no_read: + saCkptCheckpointClose(h); + + free(bitmap); + return rtn; +} + +static void do_checkpoints(struct clog_cpg *entry, int leaving) +{ + struct checkpoint_data *cp; + + for (cp = entry->checkpoint_list; cp;) { + /* + * FIXME: Check return code. Could send failure + * notice in rq in export_checkpoint function + * by setting rq->error + */ + switch (export_checkpoint(cp)) { + case -EEXIST: + LOG_SPRINT(entry, "[%s] Checkpoint for %u already handled%s", + SHORT_UUID(entry->name.value), cp->requester, + (leaving) ? "(L)": ""); + LOG_COND(log_checkpoint, + "[%s] Checkpoint for %u already handled%s", + SHORT_UUID(entry->name.value), cp->requester, + (leaving) ? "(L)": ""); + entry->checkpoint_list = cp->next; + free_checkpoint(cp); + cp = entry->checkpoint_list; + break; + case 0: + LOG_SPRINT(entry, "[%s] Checkpoint data available for node %u%s", + SHORT_UUID(entry->name.value), cp->requester, + (leaving) ? "(L)": ""); + LOG_COND(log_checkpoint, + "[%s] Checkpoint data available for node %u%s", + SHORT_UUID(entry->name.value), cp->requester, + (leaving) ? "(L)": ""); + entry->checkpoint_list = cp->next; + free_checkpoint(cp); + cp = entry->checkpoint_list; + break; + default: + /* FIXME: Skipping will cause list corruption */ + LOG_ERROR("[%s] Failed to export checkpoint for %u%s", + SHORT_UUID(entry->name.value), cp->requester, + (leaving) ? "(L)": ""); + } + } +} + +static int resend_requests(struct clog_cpg *entry) +{ + int r = 0; + struct clog_request *rq, *n; + + if (!entry->resend_requests || entry->delay) + return 0; + + if (entry->state != VALID) + return 0; + + entry->resend_requests = 0; + + dm_list_iterate_items_safe(rq, n, &entry->working_list) { + dm_list_del(&rq->list); + + if (strcmp(entry->name.value, rq->u_rq.uuid)) { + LOG_ERROR("[%s] Stray request from another log (%s)", + SHORT_UUID(entry->name.value), + SHORT_UUID(rq->u_rq.uuid)); + free(rq); + continue; + } + + switch (rq->u_rq.request_type) { + case DM_ULOG_SET_REGION_SYNC: + /* + * Some requests simply do not need to be resent. + * If it is a request that just changes log state, + * then it doesn't need to be resent (everyone makes + * updates). + */ + LOG_COND(log_resend_requests, + "[%s] Skipping resend of %s/#%u...", + SHORT_UUID(entry->name.value), + _RQ_TYPE(rq->u_rq.request_type), + rq->u_rq.seq); + LOG_SPRINT(entry, "### No resend: [%s] %s/%u ###", + SHORT_UUID(entry->name.value), + _RQ_TYPE(rq->u_rq.request_type), + rq->u_rq.seq); + + rq->u_rq.data_size = 0; + kernel_send(&rq->u_rq); + + break; + + default: + /* + * If an action or a response is required, then + * the request must be resent. + */ + LOG_COND(log_resend_requests, + "[%s] Resending %s(#%u) due to new server(%u)", + SHORT_UUID(entry->name.value), + _RQ_TYPE(rq->u_rq.request_type), + rq->u_rq.seq, entry->lowest_id); + LOG_SPRINT(entry, "*** Resending: [%s] %s/%u ***", + SHORT_UUID(entry->name.value), + _RQ_TYPE(rq->u_rq.request_type), + rq->u_rq.seq); + r = cluster_send(rq); + if (r < 0) + LOG_ERROR("Failed resend"); + } + free(rq); + } + + return r; +} + +static int do_cluster_work(void *data) +{ + int r = SA_AIS_OK; + struct clog_cpg *entry; + + dm_list_iterate_items(entry, &clog_cpg_list) { + r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL); + if (r != SA_AIS_OK) + LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r)); + + if (entry->free_me) { + free(entry); + continue; + } + do_checkpoints(entry, 0); + + resend_requests(entry); + } + + return (r == SA_AIS_OK) ? 0 : -1; /* FIXME: good error number? */ +} + +static int flush_startup_list(struct clog_cpg *entry) +{ + int r = 0; + int i_was_server; + struct clog_request *rq, *n; + struct checkpoint_data *new; + + dm_list_iterate_items_safe(rq, n, &entry->startup_list) { + dm_list_del(&rq->list); + + if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) { + new = prepare_checkpoint(entry, rq->originator); + if (!new) { + /* + * FIXME: Need better error handling. Other nodes + * will be trying to send the checkpoint too, and we + * must continue processing the list; so report error + * but continue. + */ + LOG_ERROR("Failed to prepare checkpoint for %u!!!", + rq->originator); + free(rq); + continue; + } + LOG_SPRINT(entry, "[%s] Checkpoint prepared for %u", + SHORT_UUID(entry->name.value), rq->originator); + LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u", + SHORT_UUID(entry->name.value), rq->originator); + new->next = entry->checkpoint_list; + entry->checkpoint_list = new; + } else { + LOG_DBG("[%s] Processing delayed request: %s", + SHORT_UUID(rq->u_rq.uuid), + _RQ_TYPE(rq->u_rq.request_type)); + i_was_server = (rq->pit_server == my_cluster_id) ? 1 : 0; + r = handle_cluster_request(entry, rq, i_was_server); + + if (r) + /* + * FIXME: If we error out here, we will never get + * another opportunity to retry these requests + */ + LOG_ERROR("Error while processing delayed CPG message"); + } + free(rq); + } + + return 0; +} + +static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname, + uint32_t nodeid, uint32_t pid, + void *msg, int msg_len) +{ + int i; + int r = 0; + int i_am_server; + int response = 0; + struct clog_request *rq = msg; + struct clog_request *tmp_rq; + struct clog_cpg *match; + + match = find_clog_cpg(handle); + if (!match) { + LOG_ERROR("Unable to find clog_cpg for cluster message"); + return; + } + + if ((nodeid == my_cluster_id) && + !(rq->u_rq.request_type & DM_ULOG_RESPONSE) && + (rq->u_rq.request_type != DM_ULOG_RESUME) && + (rq->u_rq.request_type != DM_ULOG_CLEAR_REGION) && + (rq->u_rq.request_type != DM_ULOG_CHECKPOINT_READY)) { + tmp_rq = malloc(DM_ULOG_REQUEST_SIZE); + if (!tmp_rq) { + /* + * FIXME: It may be possible to continue... but we + * would not be able to resend any messages that might + * be necessary during membership changes + */ + LOG_ERROR("[%s] Unable to record request: -ENOMEM", + SHORT_UUID(rq->u_rq.uuid)); + return; + } + memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size); + dm_list_init(&tmp_rq->list); + dm_list_add( &match->working_list, &tmp_rq->list); + } + + if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) { + /* + * If the server (lowest_id) indicates it is leaving, + * then we must resend any outstanding requests. However, + * we do not want to resend them if the next server in + * line is in the process of leaving. + */ + if (nodeid == my_cluster_id) { + LOG_COND(log_resend_requests, "[%s] I am leaving.1.....", + SHORT_UUID(rq->u_rq.uuid)); + } else { + if (nodeid < my_cluster_id) { + if (nodeid == match->lowest_id) { + match->resend_requests = 1; + LOG_COND(log_resend_requests, "[%s] %u is leaving, resend required%s", + SHORT_UUID(rq->u_rq.uuid), nodeid, + (dm_list_empty(&match->working_list)) ? " -- working_list empty": ""); + + dm_list_iterate_items(tmp_rq, &match->working_list) + LOG_COND(log_resend_requests, + "[%s] %s/%u", + SHORT_UUID(tmp_rq->u_rq.uuid), + _RQ_TYPE(tmp_rq->u_rq.request_type), + tmp_rq->u_rq.seq); + } + + match->delay++; + LOG_COND(log_resend_requests, "[%s] %u is leaving, delay = %d", + SHORT_UUID(rq->u_rq.uuid), nodeid, match->delay); + } + rq->originator = nodeid; /* don't really need this, but nice for debug */ + goto out; + } + } + + /* + * We can receive messages after we do a cpg_leave but before we + * get our config callback. However, since we can't respond after + * leaving, we simply return. + */ + if (match->state == LEAVING) + return; + + i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0; + + if (rq->u_rq.request_type == DM_ULOG_CHECKPOINT_READY) { + if (my_cluster_id == rq->originator) { + /* Redundant checkpoints ignored if match->valid */ + LOG_SPRINT(match, "[%s] CHECKPOINT_READY notification from %u", + SHORT_UUID(rq->u_rq.uuid), nodeid); + if (import_checkpoint(match, (match->state != INVALID))) { + LOG_SPRINT(match, + "[%s] Failed to import checkpoint from %u", + SHORT_UUID(rq->u_rq.uuid), nodeid); + LOG_ERROR("[%s] Failed to import checkpoint from %u", + SHORT_UUID(rq->u_rq.uuid), nodeid); + kill(getpid(), SIGUSR1); + /* Could we retry? */ + goto out; + } else if (match->state == INVALID) { + LOG_SPRINT(match, + "[%s] Checkpoint data received from %u. Log is now valid", + SHORT_UUID(match->name.value), nodeid); + LOG_COND(log_checkpoint, + "[%s] Checkpoint data received from %u. Log is now valid", + SHORT_UUID(match->name.value), nodeid); + match->state = VALID; + + flush_startup_list(match); + } else { + LOG_SPRINT(match, + "[%s] Redundant checkpoint from %u ignored.", + SHORT_UUID(rq->u_rq.uuid), nodeid); + } + } + goto out; + } + + if (rq->u_rq.request_type & DM_ULOG_RESPONSE) { + response = 1; + r = handle_cluster_response(match, rq); + } else { + rq->originator = nodeid; + + if (match->state == LEAVING) { + LOG_ERROR("[%s] Ignoring %s from %u. Reason: I'm leaving", + SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type), + rq->originator); + goto out; + } + + if (match->state == INVALID) { + LOG_DBG("Log not valid yet, storing request"); + tmp_rq = malloc(DM_ULOG_REQUEST_SIZE); + if (!tmp_rq) { + LOG_ERROR("cpg_message_callback: Unable to" + " allocate transfer structs"); + r = -ENOMEM; /* FIXME: Better error #? */ + goto out; + } + + memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size); + tmp_rq->pit_server = match->lowest_id; + dm_list_init(&tmp_rq->list); + dm_list_add(&match->startup_list, &tmp_rq->list); + goto out; + } + + r = handle_cluster_request(match, rq, i_am_server); + } + + /* + * If the log is now valid, we can queue the checkpoints + */ + for (i = match->checkpoints_needed; i; ) { + struct checkpoint_data *new; + + if (log_get_state(&rq->u_rq) != LOG_RESUMED) { + LOG_DBG("[%s] Withholding checkpoints until log is valid (%s from %u)", + SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type), nodeid); + break; + } + + i--; + new = prepare_checkpoint(match, match->checkpoint_requesters[i]); + if (!new) { + /* FIXME: Need better error handling */ + LOG_ERROR("[%s] Failed to prepare checkpoint for %u!!!", + SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]); + break; + } + LOG_SPRINT(match, "[%s] Checkpoint prepared for %u* (%s)", + SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i], + (log_get_state(&rq->u_rq) != LOG_RESUMED)? "LOG_RESUMED": "LOG_SUSPENDED"); + LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u*", + SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]); + match->checkpoints_needed--; + + new->next = match->checkpoint_list; + match->checkpoint_list = new; + } + +out: + /* nothing happens after this point. It is just for debugging */ + if (r) { + LOG_ERROR("[%s] Error while processing CPG message, %s: %s", + SHORT_UUID(rq->u_rq.uuid), + _RQ_TYPE(rq->u_rq.request_type & ~DM_ULOG_RESPONSE), + strerror(-r)); + LOG_ERROR("[%s] Response : %s", SHORT_UUID(rq->u_rq.uuid), + (response) ? "YES" : "NO"); + LOG_ERROR("[%s] Originator: %u", + SHORT_UUID(rq->u_rq.uuid), rq->originator); + if (response) + LOG_ERROR("[%s] Responder : %u", + SHORT_UUID(rq->u_rq.uuid), nodeid); + + LOG_ERROR("HISTORY::"); + for (i = 0; i < DEBUGGING_HISTORY; i++) { + match->idx++; + match->idx = match->idx % DEBUGGING_HISTORY; + if (match->debugging[match->idx][0] == '\0') + continue; + LOG_ERROR("%d:%d) %s", i, match->idx, + match->debugging[match->idx]); + } + } else if (!(rq->u_rq.request_type & DM_ULOG_RESPONSE) || + (rq->originator == my_cluster_id)) { + if (!response) + LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s", + rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid), + _RQ_TYPE(rq->u_rq.request_type), + rq->originator, (response) ? "YES" : "NO"); + else + LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u", + rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid), + _RQ_TYPE(rq->u_rq.request_type), + rq->originator, (response) ? "YES" : "NO", + nodeid); + } +} + +static void cpg_join_callback(struct clog_cpg *match, + struct cpg_address *joined, + struct cpg_address *member_list, + int member_list_entries) +{ + int i; + int my_pid = getpid(); + uint32_t lowest = match->lowest_id; + struct clog_request *rq; + char dbuf[32]; + + /* Assign my_cluster_id */ + if ((my_cluster_id == 0xDEAD) && (joined->pid == my_pid)) + my_cluster_id = joined->nodeid; + + /* Am I the very first to join? */ + if (member_list_entries == 1) { + match->lowest_id = joined->nodeid; + match->state = VALID; + } + + /* If I am part of the joining list, I do not send checkpoints */ + if (joined->nodeid == my_cluster_id) + goto out; + + memset(dbuf, 0, sizeof(dbuf)); + for (i = 0; i < (member_list_entries-1); i++) + sprintf(dbuf+strlen(dbuf), "%u-", member_list[i].nodeid); + sprintf(dbuf+strlen(dbuf), "(%u)", joined->nodeid); + LOG_COND(log_checkpoint, "[%s] Joining node, %u needs checkpoint [%s]", + SHORT_UUID(match->name.value), joined->nodeid, dbuf); + + /* + * FIXME: remove checkpoint_requesters/checkpoints_needed, and use + * the startup_list interface exclusively + */ + if (dm_list_empty(&match->startup_list) && (match->state == VALID) && + (match->checkpoints_needed < MAX_CHECKPOINT_REQUESTERS)) { + match->checkpoint_requesters[match->checkpoints_needed++] = joined->nodeid; + goto out; + } + + rq = malloc(DM_ULOG_REQUEST_SIZE); + if (!rq) { + LOG_ERROR("cpg_config_callback: " + "Unable to allocate transfer structs"); + LOG_ERROR("cpg_config_callback: " + "Unable to perform checkpoint"); + goto out; + } + rq->u_rq.request_type = DM_ULOG_MEMBER_JOIN; + rq->originator = joined->nodeid; + dm_list_init(&rq->list); + dm_list_add(&match->startup_list, &rq->list); + +out: + /* Find the lowest_id, i.e. the server */ + match->lowest_id = member_list[0].nodeid; + for (i = 0; i < member_list_entries; i++) + if (match->lowest_id > member_list[i].nodeid) + match->lowest_id = member_list[i].nodeid; + + if (lowest == 0xDEAD) + LOG_COND(log_membership_change, "[%s] Server change <none> -> %u (%u %s)", + SHORT_UUID(match->name.value), match->lowest_id, + joined->nodeid, (member_list_entries == 1) ? + "is first to join" : "joined"); + else if (lowest != match->lowest_id) + LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u joined)", + SHORT_UUID(match->name.value), lowest, + match->lowest_id, joined->nodeid); + else + LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u joined)", + SHORT_UUID(match->name.value), + lowest, joined->nodeid); + LOG_SPRINT(match, "+++ UUID=%s %u join +++", + SHORT_UUID(match->name.value), joined->nodeid); +} + +static void cpg_leave_callback(struct clog_cpg *match, + struct cpg_address *left, + struct cpg_address *member_list, + int member_list_entries) +{ + int i, j, fd; + uint32_t lowest = match->lowest_id; + struct clog_request *rq, *n; + struct checkpoint_data *p_cp, *c_cp; + + LOG_SPRINT(match, "--- UUID=%s %u left ---", + SHORT_UUID(match->name.value), left->nodeid); + + /* Am I leaving? */ + if (my_cluster_id == left->nodeid) { + LOG_DBG("Finalizing leave..."); + dm_list_del(&match->list); + + cpg_fd_get(match->handle, &fd); + links_unregister(fd); + + cluster_postsuspend(match->name.value, match->luid); + + dm_list_iterate_items_safe(rq, n, &match->working_list) { + dm_list_del(&rq->list); + + if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) + kernel_send(&rq->u_rq); + free(rq); + } + + cpg_finalize(match->handle); + + match->free_me = 1; + match->lowest_id = 0xDEAD; + match->state = INVALID; + } + + /* Remove any pending checkpoints for the leaving node. */ + for (p_cp = NULL, c_cp = match->checkpoint_list; + c_cp && (c_cp->requester != left->nodeid); + p_cp = c_cp, c_cp = c_cp->next); + if (c_cp) { + if (p_cp) + p_cp->next = c_cp->next; + else + match->checkpoint_list = c_cp->next; + + LOG_COND(log_checkpoint, + "[%s] Removing pending checkpoint (%u is leaving)", + SHORT_UUID(match->name.value), left->nodeid); + free_checkpoint(c_cp); + } + dm_list_iterate_items_safe(rq, n, &match->startup_list) { + if ((rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) && + (rq->originator == left->nodeid)) { + LOG_COND(log_checkpoint, + "[%s] Removing pending ckpt from startup list (%u is leaving)", + SHORT_UUID(match->name.value), left->nodeid); + dm_list_del(&rq->list); + free(rq); + } + } + for (i = 0, j = 0; i < match->checkpoints_needed; i++, j++) { + match->checkpoint_requesters[j] = match->checkpoint_requesters[i]; + if (match->checkpoint_requesters[i] == left->nodeid) { + LOG_ERROR("[%s] Removing pending ckpt from needed list (%u is leaving)", + SHORT_UUID(match->name.value), left->nodeid); + j--; + } + } + match->checkpoints_needed = j; + + if (left->nodeid < my_cluster_id) { + match->delay = (match->delay > 0) ? match->delay - 1 : 0; + if (!match->delay && dm_list_empty(&match->working_list)) + match->resend_requests = 0; + LOG_COND(log_resend_requests, "[%s] %u has left, delay = %d%s", + SHORT_UUID(match->name.value), left->nodeid, + match->delay, (dm_list_empty(&match->working_list)) ? + " -- working_list empty": ""); + } + + /* Find the lowest_id, i.e. the server */ + if (!member_list_entries) { + match->lowest_id = 0xDEAD; + LOG_COND(log_membership_change, "[%s] Server change %u -> <none> " + "(%u is last to leave)", + SHORT_UUID(match->name.value), left->nodeid, + left->nodeid); + return; + } + + match->lowest_id = member_list[0].nodeid; + for (i = 0; i < member_list_entries; i++) + if (match->lowest_id > member_list[i].nodeid) + match->lowest_id = member_list[i].nodeid; + + if (lowest != match->lowest_id) { + LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u left)", + SHORT_UUID(match->name.value), lowest, + match->lowest_id, left->nodeid); + } else + LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u left)", + SHORT_UUID(match->name.value), lowest, left->nodeid); + + if ((match->state == INVALID) && !match->free_me) { + /* + * If all CPG members are waiting for checkpoints and they + * are all present in my startup_list, then I was the first to + * join and I must assume control. + * + * We do not normally end up here, but if there was a quick + * 'resume -> suspend -> resume' across the cluster, we may + * have initially thought we were not the first to join because + * of the presence of out-going (and unable to respond) members. + */ + + i = 1; /* We do not have a DM_ULOG_MEMBER_JOIN entry of our own */ + dm_list_iterate_items(rq, &match->startup_list) + if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) + i++; + + if (i == member_list_entries) { + /* + * Last node who could have given me a checkpoint just left. + * Setting log state to VALID and acting as 'first join'. + */ + match->state = VALID; + flush_startup_list(match); + } + } +} + +static void cpg_config_callback(cpg_handle_t handle, struct cpg_name *gname, + struct cpg_address *member_list, + int member_list_entries, + struct cpg_address *left_list, + int left_list_entries, + struct cpg_address *joined_list, + int joined_list_entries) +{ + struct clog_cpg *match; + int found = 0; + + dm_list_iterate_items(match, &clog_cpg_list) + if (match->handle == handle) { + found = 1; + break; + } + + if (!found) { + LOG_ERROR("Unable to find match for CPG config callback"); + return; + } + + if ((joined_list_entries + left_list_entries) > 1) + LOG_ERROR("[%s] More than one node joining/leaving", + SHORT_UUID(match->name.value)); + + if (joined_list_entries) + cpg_join_callback(match, joined_list, + member_list, member_list_entries); + else + cpg_leave_callback(match, left_list, + member_list, member_list_entries); +} + +cpg_callbacks_t cpg_callbacks = { + .cpg_deliver_fn = cpg_message_callback, + .cpg_confchg_fn = cpg_config_callback, +}; + +/* + * remove_checkpoint + * @entry + * + * Returns: 1 if checkpoint removed, 0 if no checkpoints, -EXXX on error + */ +int remove_checkpoint(struct clog_cpg *entry) +{ + int len; + SaNameT name; + SaAisErrorT rv; + SaCkptCheckpointHandleT h; + + len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u", + SHORT_UUID(entry->name.value), my_cluster_id); + name.length = len; + +open_retry: + rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL, + SA_CKPT_CHECKPOINT_READ, 0, &h); + if (rv == SA_AIS_ERR_TRY_AGAIN) { + LOG_ERROR("abort_startup: ckpt open retry"); + usleep(1000); + goto open_retry; + } + + if (rv != SA_AIS_OK) + return 0; + + LOG_DBG("[%s] Removing checkpoint", SHORT_UUID(entry->name.value)); +unlink_retry: + rv = saCkptCheckpointUnlink(ckpt_handle, &name); + if (rv == SA_AIS_ERR_TRY_AGAIN) { + LOG_ERROR("abort_startup: ckpt unlink retry"); + usleep(1000); + goto unlink_retry; + } + + if (rv != SA_AIS_OK) { + LOG_ERROR("[%s] Failed to unlink checkpoint: %s", + SHORT_UUID(entry->name.value), str_ais_error(rv)); + return -EIO; + } + + saCkptCheckpointClose(h); + + return 1; +} + +int create_cluster_cpg(char *uuid, uint64_t luid) +{ + int r; + int size; + struct clog_cpg *new = NULL; + struct clog_cpg *tmp; + + dm_list_iterate_items(tmp, &clog_cpg_list) + if (!strncmp(tmp->name.value, uuid, CPG_MAX_NAME_LENGTH)) { + LOG_ERROR("Log entry already exists: %s", uuid); + return -EEXIST; + } + + new = malloc(sizeof(*new)); + if (!new) { + LOG_ERROR("Unable to allocate memory for clog_cpg"); + return -ENOMEM; + } + memset(new, 0, sizeof(*new)); + dm_list_init(&new->list); + new->lowest_id = 0xDEAD; + dm_list_init(&new->startup_list); + dm_list_init(&new->working_list); + + size = ((strlen(uuid) + 1) > CPG_MAX_NAME_LENGTH) ? + CPG_MAX_NAME_LENGTH : (strlen(uuid) + 1); + strncpy(new->name.value, uuid, size); + new->name.length = size; + new->luid = luid; + + /* + * Ensure there are no stale checkpoints around before we join + */ + if (remove_checkpoint(new) == 1) + LOG_COND(log_checkpoint, + "[%s] Removing checkpoints left from previous session", + SHORT_UUID(new->name.value)); + + r = cpg_initialize(&new->handle, &cpg_callbacks); + if (r != SA_AIS_OK) { + LOG_ERROR("cpg_initialize failed: Cannot join cluster"); + free(new); + return -EPERM; + } + + r = cpg_join(new->handle, &new->name); + if (r != SA_AIS_OK) { + LOG_ERROR("cpg_join failed: Cannot join cluster"); + free(new); + return -EPERM; + } + + new->cpg_state = VALID; + dm_list_add(&clog_cpg_list, &new->list); + LOG_DBG("New handle: %llu", (unsigned long long)new->handle); + LOG_DBG("New name: %s", new->name.value); + + /* FIXME: better variable */ + cpg_fd_get(new->handle, &r); + links_register(r, "cluster", do_cluster_work, NULL); + + return 0; +} + +static void abort_startup(struct clog_cpg *del) +{ + struct clog_request *rq, *n; + + LOG_DBG("[%s] CPG teardown before checkpoint received", + SHORT_UUID(del->name.value)); + + dm_list_iterate_items_safe(rq, n, &del->startup_list) { + dm_list_del(&rq->list); + + LOG_DBG("[%s] Ignoring request from %u: %s", + SHORT_UUID(del->name.value), rq->originator, + _RQ_TYPE(rq->u_rq.request_type)); + free(rq); + } + + remove_checkpoint(del); +} + +static int _destroy_cluster_cpg(struct clog_cpg *del) +{ + int r; + int state; + + LOG_COND(log_resend_requests, "[%s] I am leaving.2.....", + SHORT_UUID(del->name.value)); + + /* + * We must send any left over checkpoints before + * leaving. If we don't, an incoming node could + * be stuck with no checkpoint and stall. + do_checkpoints(del); --- THIS COULD BE CAUSING OUR PROBLEMS: + + - Incoming node deletes old checkpoints before joining + - A stale checkpoint is issued here by leaving node + - (leaving node leaves) + - Incoming node joins cluster and finds stale checkpoint. + - (leaving node leaves - option 2) + */ + do_checkpoints(del, 1); + + state = del->state; + + del->cpg_state = INVALID; + del->state = LEAVING; + + /* + * If the state is VALID, we might be processing the + * startup list. If so, we certainly don't want to + * clear the startup_list here by calling abort_startup + */ + if (!dm_list_empty(&del->startup_list) && (state != VALID)) + abort_startup(del); + + r = cpg_leave(del->handle, &del->name); + if (r != CPG_OK) + LOG_ERROR("Error leaving CPG!"); + return 0; +} + +int destroy_cluster_cpg(char *uuid) +{ + struct clog_cpg *del, *tmp; + + dm_list_iterate_items_safe(del, tmp, &clog_cpg_list) + if (!strncmp(del->name.value, uuid, CPG_MAX_NAME_LENGTH)) + _destroy_cluster_cpg(del); + + return 0; +} + +int init_cluster(void) +{ + SaAisErrorT rv; + + dm_list_init(&clog_cpg_list); + rv = saCkptInitialize(&ckpt_handle, &callbacks, &version); + + if (rv != SA_AIS_OK) + return EXIT_CLUSTER_CKPT_INIT; + + return 0; +} + +void cleanup_cluster(void) +{ + SaAisErrorT err; + + err = saCkptFinalize(ckpt_handle); + if (err != SA_AIS_OK) + LOG_ERROR("Failed to finalize checkpoint handle"); +} + +void cluster_debug(void) +{ + struct checkpoint_data *cp; + struct clog_cpg *entry; + struct clog_request *rq; + int i; + + LOG_ERROR(""); + LOG_ERROR("CLUSTER COMPONENT DEBUGGING::"); + dm_list_iterate_items(entry, &clog_cpg_list) { + LOG_ERROR("%s::", SHORT_UUID(entry->name.value)); + LOG_ERROR(" lowest_id : %u", entry->lowest_id); + LOG_ERROR(" state : %s", (entry->state == INVALID) ? + "INVALID" : (entry->state == VALID) ? "VALID" : + (entry->state == LEAVING) ? "LEAVING" : "UNKNOWN"); + LOG_ERROR(" cpg_state : %d", entry->cpg_state); + LOG_ERROR(" free_me : %d", entry->free_me); + LOG_ERROR(" delay : %d", entry->delay); + LOG_ERROR(" resend_requests : %d", entry->resend_requests); + LOG_ERROR(" checkpoints_needed: %d", entry->checkpoints_needed); + for (i = 0, cp = entry->checkpoint_list; + i < MAX_CHECKPOINT_REQUESTERS; i++) + if (cp) + cp = cp->next; + else + break; + LOG_ERROR(" CKPTs waiting : %d", i); + LOG_ERROR(" Working list:"); + dm_list_iterate_items(rq, &entry->working_list) + LOG_ERROR(" %s/%u", _RQ_TYPE(rq->u_rq.request_type), + rq->u_rq.seq); + + LOG_ERROR(" Startup list:"); + dm_list_iterate_items(rq, &entry->startup_list) + LOG_ERROR(" %s/%u", _RQ_TYPE(rq->u_rq.request_type), + rq->u_rq.seq); + + LOG_ERROR("Command History:"); + for (i = 0; i < DEBUGGING_HISTORY; i++) { + entry->idx++; + entry->idx = entry->idx % DEBUGGING_HISTORY; + if (entry->debugging[entry->idx][0] == '\0') + continue; + LOG_ERROR("%d:%d) %s", i, entry->idx, + entry->debugging[entry->idx]); + } + } +} |