summaryrefslogtreecommitdiffstats
path: root/daemons/cmirrord/cluster.c
diff options
context:
space:
mode:
authorJonathan Earl Brassow <jbrassow@redhat.com>2012-02-29 21:15:34 +0000
committerJonathan Earl Brassow <jbrassow@redhat.com>2012-02-29 21:15:34 +0000
commit62e38da133d9801cdf36b0f2aaec615ce14b9000 (patch)
tree5334c911580c6d36f737db08f9675c16a8434b83 /daemons/cmirrord/cluster.c
parent5b613cff974fce84890759e1f1d6cc809e84c398 (diff)
downloadlvm2-62e38da133d9801cdf36b0f2aaec615ce14b9000.tar.gz
lvm2-62e38da133d9801cdf36b0f2aaec615ce14b9000.tar.xz
lvm2-62e38da133d9801cdf36b0f2aaec615ce14b9000.zip
Allow cluster mirrors to handle the absence of the checkpoint lib (libSaCkpt).
The OpenAIS checkpoint library is going away; therefore, cmirrord must operate without it. The algorithms the handle the timing of when to send a checkpoint, the determination of what to send, and which ongoing cluster requests are relevent with respect to the checkpoints are unaffected. We need only replace the functions that actually perform the storing/transmitting and retrieving/receiving of the checkpoint data. Rather than store the checkpoint data in an OpenAIS checkpoint file, we simply transmit it along with the message that notifies the incoming node that the checkpoint is ready.
Diffstat (limited to 'daemons/cmirrord/cluster.c')
-rw-r--r--daemons/cmirrord/cluster.c137
1 files changed, 115 insertions, 22 deletions
diff --git a/daemons/cmirrord/cluster.c b/daemons/cmirrord/cluster.c
index ad1d7dce..f56cb816 100644
--- a/daemons/cmirrord/cluster.c
+++ b/daemons/cmirrord/cluster.c
@@ -20,10 +20,12 @@
#include <corosync/cpg.h>
#include <errno.h>
-#include <openais/saAis.h>
-#include <openais/saCkpt.h>
#include <signal.h>
#include <unistd.h>
+#if CMIRROR_HAS_CHECKPOINT
+#include <openais/saAis.h>
+#include <openais/saCkpt.h>
+#endif
/* Open AIS error codes */
#define str_ais_error(x) \
@@ -62,13 +64,13 @@
RQ_TYPE((x) & ~DM_ULOG_RESPONSE)
static uint32_t my_cluster_id = 0xDEAD;
+#if CMIRROR_HAS_CHECKPOINT
static SaCkptHandleT ckpt_handle = 0;
static SaCkptCallbacksT callbacks = { 0, 0 };
static SaVersionT version = { 'B', 1, 1 };
+#endif
#define DEBUGGING_HISTORY 100
-//static char debugging[DEBUGGING_HISTORY][128];
-//static int idx = 0;
#define LOG_SPRINT(cc, f, arg...) do { \
cc->idx++; \
cc->idx = cc->idx % DEBUGGING_HISTORY; \
@@ -77,6 +79,7 @@ static SaVersionT version = { 'B', 1, 1 };
static int log_resp_rec = 0;
+#define RECOVERING_REGION_SECTION_SIZE 64
struct checkpoint_data {
uint32_t requester;
char uuid[CPG_MAX_NAME_LENGTH];
@@ -128,7 +131,6 @@ static struct dm_list clog_cpg_list;
int cluster_send(struct clog_request *rq)
{
int r;
- int count=0;
int found = 0;
struct iovec iov;
struct clog_cpg *entry;
@@ -165,7 +167,10 @@ int cluster_send(struct clog_request *rq)
if (entry->cpg_state != VALID)
return -EINVAL;
+#if CMIRROR_HAS_CHECKPOINT
do {
+ int count = 0;
+
r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
if (r != SA_AIS_ERR_TRY_AGAIN)
break;
@@ -189,12 +194,14 @@ int cluster_send(struct clog_request *rq)
str_ais_error(r));
usleep(1000);
} while (1);
-
+#else
+ r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
+#endif
if (r == CPG_OK)
return 0;
/* error codes found in openais/cpg.h */
- LOG_ERROR("cpg_mcast_joined error: %s", str_ais_error(r));
+ LOG_ERROR("cpg_mcast_joined error: %d", r);
rq->u_rq.error = -EBADE;
return -EBADE;
@@ -419,6 +426,7 @@ static void free_checkpoint(struct checkpoint_data *cp)
free(cp);
}
+#if CMIRROR_HAS_CHECKPOINT
static int export_checkpoint(struct checkpoint_data *cp)
{
SaCkptCheckpointCreationAttributesT attr;
@@ -587,7 +595,54 @@ rr_create_retry:
return 0;
}
-static int import_checkpoint(struct clog_cpg *entry, int no_read)
+#else
+static int export_checkpoint(struct checkpoint_data *cp)
+{
+ int r, rq_size;
+ struct clog_request *rq;
+
+ rq_size = sizeof(*rq);
+ rq_size += RECOVERING_REGION_SECTION_SIZE;
+ rq_size += cp->bitmap_size * 2; /* clean|sync_bits */
+
+ rq = malloc(rq_size);
+ if (!rq) {
+ LOG_ERROR("export_checkpoint: "
+ "Unable to allocate transfer structs");
+ return -ENOMEM;
+ }
+ memset(rq, 0, rq_size);
+
+ dm_list_init(&rq->u.list);
+ rq->u_rq.request_type = DM_ULOG_CHECKPOINT_READY;
+ rq->originator = cp->requester;
+ strncpy(rq->u_rq.uuid, cp->uuid, CPG_MAX_NAME_LENGTH);
+ rq->u_rq.seq = my_cluster_id;
+ rq->u_rq.data_size = rq_size - sizeof(*rq);
+
+ /* Sync bits */
+ memcpy(rq->u_rq.data, cp->sync_bits, cp->bitmap_size);
+
+ /* Clean bits */
+ memcpy(rq->u_rq.data + cp->bitmap_size, cp->clean_bits, cp->bitmap_size);
+
+ /* Recovering region */
+ memcpy(rq->u_rq.data + (cp->bitmap_size * 2), cp->recovering_region,
+ strlen(cp->recovering_region));
+
+ r = cluster_send(rq);
+ if (r)
+ LOG_ERROR("Failed to send checkpoint ready notice: %s",
+ strerror(-r));
+
+ free(rq);
+ return 0;
+}
+#endif /* CMIRROR_HAS_CHECKPOINT */
+
+#if CMIRROR_HAS_CHECKPOINT
+static int import_checkpoint(struct clog_cpg *entry, int no_read,
+ struct clog_request *rq __attribute__((unused)))
{
int rtn = 0;
SaCkptCheckpointHandleT h;
@@ -742,6 +797,32 @@ no_read:
return rtn;
}
+#else
+static int import_checkpoint(struct clog_cpg *entry, int no_read,
+ struct clog_request *rq)
+{
+ int bitmap_size;
+
+ bitmap_size = (rq->u_rq.data_size - RECOVERING_REGION_SECTION_SIZE) / 2;
+ if (bitmap_size < 0) {
+ LOG_ERROR("Checkpoint has invalid payload size.");
+ return -EINVAL;
+ }
+
+ if (pull_state(entry->name.value, entry->luid, "sync_bits",
+ rq->u_rq.data, bitmap_size) ||
+ pull_state(entry->name.value, entry->luid, "clean_bits",
+ rq->u_rq.data + bitmap_size, bitmap_size) ||
+ pull_state(entry->name.value, entry->luid, "recovering_region",
+ rq->u_rq.data + (bitmap_size * 2),
+ RECOVERING_REGION_SECTION_SIZE)) {
+ LOG_ERROR("Error loading bitmap state from checkpoint.");
+ return -EIO;
+ }
+ return 0;
+}
+#endif /* CMIRROR_HAS_CHECKPOINT */
+
static void do_checkpoints(struct clog_cpg *entry, int leaving)
{
struct checkpoint_data *cp;
@@ -859,13 +940,13 @@ static int resend_requests(struct clog_cpg *entry)
static int do_cluster_work(void *data __attribute__((unused)))
{
- int r = SA_AIS_OK;
+ int r = CPG_OK;
struct clog_cpg *entry, *tmp;
dm_list_iterate_items_safe(entry, tmp, &clog_cpg_list) {
r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL);
- if (r != SA_AIS_OK)
- LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r));
+ if (r != CPG_OK)
+ LOG_ERROR("cpg_dispatch failed: %d", r);
if (entry->free_me) {
free(entry);
@@ -876,7 +957,7 @@ static int do_cluster_work(void *data __attribute__((unused)))
resend_requests(entry);
}
- return (r == SA_AIS_OK) ? 0 : -1; /* FIXME: good error number? */
+ return (r == CPG_OK) ? 0 : -1; /* FIXME: good error number? */
}
static int flush_startup_list(struct clog_cpg *entry)
@@ -941,16 +1022,19 @@ static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gna
struct clog_request *tmp_rq;
struct clog_cpg *match;
- if (clog_request_from_network(rq, msg_len) < 0)
- /* Error message comes from 'clog_request_from_network' */
- return;
-
match = find_clog_cpg(handle);
if (!match) {
LOG_ERROR("Unable to find clog_cpg for cluster message");
return;
}
+ /*
+ * Perform necessary endian and version compatibility conversions
+ */
+ if (clog_request_from_network(rq, msg_len) < 0)
+ /* Any error messages come from 'clog_request_from_network' */
+ return;
+
if ((nodeid == my_cluster_id) &&
!(rq->u_rq.request_type & DM_ULOG_RESPONSE) &&
(rq->u_rq.request_type != DM_ULOG_RESUME) &&
@@ -969,7 +1053,7 @@ static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gna
}
memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
dm_list_init(&tmp_rq->u.list);
- dm_list_add( &match->working_list, &tmp_rq->u.list);
+ dm_list_add(&match->working_list, &tmp_rq->u.list);
}
if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) {
@@ -1022,7 +1106,8 @@ static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gna
/* Redundant checkpoints ignored if match->valid */
LOG_SPRINT(match, "[%s] CHECKPOINT_READY notification from %u",
SHORT_UUID(rq->u_rq.uuid), nodeid);
- if (import_checkpoint(match, (match->state != INVALID))) {
+ if (import_checkpoint(match,
+ (match->state != INVALID), rq)) {
LOG_SPRINT(match,
"[%s] Failed to import checkpoint from %u",
SHORT_UUID(rq->u_rq.uuid), nodeid);
@@ -1415,6 +1500,7 @@ cpg_callbacks_t cpg_callbacks = {
*/
static int remove_checkpoint(struct clog_cpg *entry)
{
+#if CMIRROR_HAS_CHECKPOINT
int len;
SaNameT name;
SaAisErrorT rv;
@@ -1454,6 +1540,10 @@ unlink_retry:
saCkptCheckpointClose(h);
return 1;
+#else
+ /* No checkpoint to remove, so 'success' */
+ return 1;
+#endif
}
int create_cluster_cpg(char *uuid, uint64_t luid)
@@ -1495,14 +1585,14 @@ int create_cluster_cpg(char *uuid, uint64_t luid)
SHORT_UUID(new->name.value));
r = cpg_initialize(&new->handle, &cpg_callbacks);
- if (r != SA_AIS_OK) {
+ if (r != CPG_OK) {
LOG_ERROR("cpg_initialize failed: Cannot join cluster");
free(new);
return -EPERM;
}
r = cpg_join(new->handle, &new->name);
- if (r != SA_AIS_OK) {
+ if (r != CPG_OK) {
LOG_ERROR("cpg_join failed: Cannot join cluster");
free(new);
return -EPERM;
@@ -1593,24 +1683,27 @@ int destroy_cluster_cpg(char *uuid)
int init_cluster(void)
{
+#if CMIRROR_HAS_CHECKPOINT
SaAisErrorT rv;
- dm_list_init(&clog_cpg_list);
rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);
if (rv != SA_AIS_OK)
return EXIT_CLUSTER_CKPT_INIT;
-
+#endif
+ dm_list_init(&clog_cpg_list);
return 0;
}
void cleanup_cluster(void)
{
+#if CMIRROR_HAS_CHECKPOINT
SaAisErrorT err;
err = saCkptFinalize(ckpt_handle);
if (err != SA_AIS_OK)
LOG_ERROR("Failed to finalize checkpoint handle");
+#endif
}
void cluster_debug(void)