summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ctdb/common/ctdb_call.c90
-rw-r--r--ctdb/common/ctdb_client.c4
-rw-r--r--ctdb/common/ctdb_control.c2
-rw-r--r--ctdb/common/ctdb_daemon.c52
-rw-r--r--ctdb/common/ctdb_io.c22
-rw-r--r--ctdb/common/ctdb_util.c46
-rw-r--r--ctdb/direct/ctdbd.c20
-rw-r--r--ctdb/include/ctdb.h1
-rw-r--r--ctdb/include/ctdb_private.h6
9 files changed, 171 insertions, 72 deletions
diff --git a/ctdb/common/ctdb_call.c b/ctdb/common/ctdb_call.c
index 012e6758a5..476d86f911 100644
--- a/ctdb/common/ctdb_call.c
+++ b/ctdb/common/ctdb_call.c
@@ -211,15 +211,19 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
tmp_ctx = talloc_new(ctdb);
/* send the CTDB_REPLY_DMASTER */
- len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize;
+ len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize;
r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
struct ctdb_reply_dmaster);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.destnode = new_dmaster;
r->hdr.reqid = reqid;
+ r->rsn = header->rsn;
+ r->keylen = key.dsize;
r->datalen = data.dsize;
- memcpy(&r->data[0], data.dptr, data.dsize);
+ r->db_id = ctdb_db->db_id;
+ memcpy(&r->data[0], key.dptr, key.dsize);
+ memcpy(&r->data[key.dsize], data.dptr, data.dsize);
ctdb_queue_packet(ctdb, &r->hdr);
@@ -256,6 +260,7 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
r->hdr.destnode = lmaster;
r->hdr.reqid = c->hdr.reqid;
r->db_id = c->db_id;
+ r->rsn = header->rsn;
r->dmaster = c->hdr.srcnode;
r->keylen = key->dsize;
r->datalen = data->dsize;
@@ -276,39 +281,43 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
must be called with the chainlock held. This function releases the chainlock
*/
-static void ctdb_become_dmaster(struct ctdb_context *ctdb,
- uint32_t reqid, TDB_DATA data)
+static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
+ uint32_t reqid, TDB_DATA key, TDB_DATA data,
+ uint64_t rsn)
{
struct ctdb_call_state *state;
- struct ctdb_db_context *ctdb_db;
+ struct ctdb_context *ctdb = ctdb_db->ctdb;
+ struct ctdb_ltdb_header header;
+
+ DEBUG(2,("vnn %u dmaster response %08x\n", ctdb->vnn, ctdb_hash(&key)));
+
+ ZERO_STRUCT(header);
+ header.rsn = rsn;
+ header.dmaster = ctdb->vnn;
+
+ if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
+ ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
+ ctdb_ltdb_unlock(ctdb_db, key);
+ return;
+ }
state = ctdb_reqid_find(ctdb, reqid, struct ctdb_call_state);
if (state == NULL) {
+ DEBUG(0,("vnn %u Invalid reqid %u in ctdb_become_dmaster\n",
+ ctdb->vnn, reqid));
+ ctdb_ltdb_unlock(ctdb_db, key);
return;
}
if (reqid != state->reqid) {
/* we found a record but it was the wrong one */
- DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",reqid));
- return;
- }
-
- ctdb_db = state->ctdb_db;
-
- DEBUG(2,("vnn %u dmaster response %08x\n",
- ctdb->vnn, ctdb_hash(&state->call.key)));
-
- /* we're now the dmaster - update our local ltdb with new header
- and data */
- state->header.dmaster = ctdb->vnn;
-
- if (ctdb_ltdb_store(ctdb_db, state->call.key, &state->header, data) != 0) {
- ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
+ DEBUG(0, ("Dropped orphan in ctdb_become_dmaster with reqid:%d\n",reqid));
+ ctdb_ltdb_unlock(ctdb_db, key);
return;
}
- ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn);
+ ctdb_call_local(ctdb_db, &state->call, &header, &data, ctdb->vnn);
ctdb_ltdb_unlock(ctdb_db, state->call.key);
@@ -381,7 +390,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
/* check if the new dmaster is the lmaster, in which case we
skip the dmaster reply */
if (c->dmaster == ctdb->vnn) {
- ctdb_become_dmaster(ctdb, hdr->reqid, data);
+ ctdb_become_dmaster(ctdb_db, hdr->reqid, key, data, c->rsn);
} else {
ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
ctdb_ltdb_unlock(ctdb_db, key);
@@ -465,7 +474,6 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
struct ctdb_reply_call);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.destnode = hdr->srcnode;
- r->hdr.srcnode = hdr->destnode;
r->hdr.reqid = hdr->reqid;
r->status = call.status;
r->datalen = call.reply_data.dsize;
@@ -498,7 +506,7 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
if (hdr->reqid != state->reqid) {
/* we found a record but it was the wrong one */
- DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",hdr->reqid));
+ DEBUG(0, ("Dropped orphaned call reply with reqid:%d\n",hdr->reqid));
return;
}
@@ -525,26 +533,22 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
{
struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
- struct ctdb_call_state *state;
struct ctdb_db_context *ctdb_db;
- TDB_DATA data;
+ TDB_DATA key, data;
int ret;
- state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
-
- if (state == NULL) {
- return;
- }
-
- if (hdr->reqid != state->reqid) {
- /* we found a record but it was the wrong one */
- DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",hdr->reqid));
+ ctdb_db = find_ctdb_db(ctdb, c->db_id);
+ if (ctdb_db == NULL) {
+ DEBUG(0,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
return;
}
+
+ key.dptr = c->data;
+ key.dsize = c->keylen;
+ data.dptr = &c->data[key.dsize];
+ data.dsize = c->datalen;
- ctdb_db = state->ctdb_db;
-
- ret = ctdb_ltdb_lock_requeue(ctdb_db, state->call.key, hdr,
+ ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
ctdb_recv_raw_pkt, ctdb);
if (ret == -2) {
return;
@@ -554,10 +558,7 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
return;
}
- data.dptr = c->data;
- data.dsize = c->datalen;
-
- ctdb_become_dmaster(ctdb, hdr->reqid, data);
+ ctdb_become_dmaster(ctdb_db, hdr->reqid, key, data, c->rsn);
}
@@ -571,12 +572,14 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
if (state == NULL) {
+ DEBUG(0,("vnn %u Invalid reqid %u in ctdb_reply_error\n",
+ ctdb->vnn, hdr->reqid));
return;
}
if (hdr->reqid != state->reqid) {
/* we found a record but it was the wrong one */
- DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",hdr->reqid));
+ DEBUG(0, ("Dropped orphaned error reply with reqid:%d\n",hdr->reqid));
return;
}
@@ -711,7 +714,6 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
state->call.key.dptr = &state->c->data[0];
state->state = CTDB_CALL_WAIT;
- state->header = *header;
state->ctdb_db = ctdb_db;
ctdb_queue_packet(ctdb, &state->c->hdr);
diff --git a/ctdb/common/ctdb_client.c b/ctdb/common/ctdb_client.c
index 075f8553ae..7257dd5d29 100644
--- a/ctdb/common/ctdb_client.c
+++ b/ctdb/common/ctdb_client.c
@@ -78,7 +78,7 @@ static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_he
if (hdr->reqid != state->reqid) {
/* we found a record but it was the wrong one */
- DEBUG(0, ("Dropped orphaned reply with reqid:%d\n",hdr->reqid));
+ DEBUG(0, ("Dropped client call reply with reqid:%d\n",hdr->reqid));
return;
}
@@ -414,7 +414,6 @@ int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
CTDB_NO_MEMORY(ctdb, r);
r->hdr.destnode = vnn;
- r->hdr.srcnode = ctdb->vnn;
r->srvid = srvid;
r->datalen = data.dsize;
memcpy(&r->data[0], data.dptr, data.dsize);
@@ -674,7 +673,6 @@ int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
c->hdr.reqid = state->reqid;
c->hdr.destnode = destnode;
- c->hdr.srcnode = ctdb->vnn;
c->hdr.reqid = state->reqid;
c->opcode = opcode;
c->srvid = srvid;
diff --git a/ctdb/common/ctdb_control.c b/ctdb/common/ctdb_control.c
index 306323cceb..b24c2686cc 100644
--- a/ctdb/common/ctdb_control.c
+++ b/ctdb/common/ctdb_control.c
@@ -248,6 +248,8 @@ void ctdb_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_control_state);
if (state == NULL) {
+ DEBUG(0,("vnn %u Invalid reqid %u in ctdb_reply_control\n",
+ ctdb->vnn, hdr->reqid));
return;
}
diff --git a/ctdb/common/ctdb_daemon.c b/ctdb/common/ctdb_daemon.c
index d0d2c1a46d..a6d60fc00e 100644
--- a/ctdb/common/ctdb_daemon.c
+++ b/ctdb/common/ctdb_daemon.c
@@ -673,6 +673,48 @@ int ctdb_start(struct ctdb_context *ctdb)
return 0;
}
+
+/*
+ start the protocol going as a daemon
+*/
+int ctdb_start_daemon(struct ctdb_context *ctdb)
+{
+ int res;
+ struct fd_event *fde;
+ const char *domain_socket_name;
+
+ /* get rid of any old sockets */
+ unlink(ctdb->daemon.name);
+
+ /* create a unix domain stream socket to listen to */
+ res = ux_socket_bind(ctdb);
+ if (res!=0) {
+ DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
+ exit(10);
+ }
+
+ if (fork()) {
+ return 0;
+ }
+
+ tdb_reopen_all(False);
+
+ setsid();
+ block_signal(SIGPIPE);
+ block_signal(SIGCHLD);
+
+ /* ensure the socket is deleted on exit of the daemon */
+ domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
+ talloc_set_destructor(domain_socket_name, unlink_destructor);
+
+ ctdb->ev = event_context_init(NULL);
+ fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ,
+ ctdb_accept_client, ctdb);
+ ctdb_main_loop(ctdb);
+
+ return 0;
+}
+
/*
allocate a packet for use in client<->daemon communication
*/
@@ -685,6 +727,7 @@ struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
int size;
struct ctdb_req_header *hdr;
size = ((length+1)+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
+
hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
if (hdr == NULL) {
DEBUG(0,("Unable to allocate packet for operation %u of length %u\n",
@@ -692,11 +735,12 @@ struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
return NULL;
}
talloc_set_name_const(hdr, type);
- memset(hdr, 0, slength);
+ memset(hdr, 0, size);
hdr->operation = operation;
- hdr->length = length;
+ hdr->length = size;
hdr->ctdb_magic = CTDB_MAGIC;
hdr->ctdb_version = CTDB_VERSION;
+ hdr->srcnode = ctdb->vnn;
if (ctdb->vnn_map) {
hdr->generation = ctdb->vnn_map->generation;
}
@@ -724,9 +768,9 @@ struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
return NULL;
}
talloc_set_name_const(hdr, type);
- memset(hdr, 0, slength);
+ memset(hdr, 0, size);
hdr->operation = operation;
- hdr->length = length;
+ hdr->length = size;
hdr->ctdb_magic = CTDB_MAGIC;
hdr->ctdb_version = CTDB_VERSION;
hdr->generation = ctdb->vnn_map->generation;
diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 238f1701cf..6a5aa928b0 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -64,8 +64,10 @@ static void queue_io_read(struct ctdb_queue *queue)
ssize_t nread;
uint8_t *data, *data_base;
- if (ioctl(queue->fd, FIONREAD, &num_ready) != 0 ||
- num_ready == 0) {
+ if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
+ return;
+ }
+ if (num_ready == 0) {
/* the descriptor has been closed */
goto failed;
}
@@ -75,11 +77,14 @@ static void queue_io_read(struct ctdb_queue *queue)
num_ready + queue->partial.length);
if (queue->partial.data == NULL) {
+ DEBUG(0,("read error alloc failed for %u\n",
+ num_ready + queue->partial.length));
goto failed;
}
nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
if (nread <= 0) {
+ DEBUG(0,("read error nread=%d\n", nread));
goto failed;
}
@@ -106,6 +111,7 @@ static void queue_io_read(struct ctdb_queue *queue)
len = *(uint32_t *)data;
d2 = talloc_memdup(queue, data, len);
if (d2 == NULL) {
+ DEBUG(0,("read error memdup failed for %u\n", len));
/* sigh */
goto failed;
}
@@ -122,6 +128,8 @@ static void queue_io_read(struct ctdb_queue *queue)
} else {
queue->partial.data = talloc_memdup(queue, data, nread);
if (queue->partial.data == NULL) {
+ DEBUG(0,("read error memdup partial failed for %u\n",
+ nread));
goto failed;
}
queue->partial.length = nread;
@@ -155,8 +163,11 @@ static void queue_io_write(struct ctdb_queue *queue)
while (queue->out_queue) {
struct ctdb_queue_pkt *pkt = queue->out_queue;
ssize_t n;
-
- n = write(queue->fd, pkt->data, pkt->length);
+ if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
+ n = write(queue->fd, pkt->data, 1);
+ } else {
+ n = write(queue->fd, pkt->data, pkt->length);
+ }
if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
event_add_timed(queue->ctdb->ev, queue, timeval_zero(),
@@ -213,7 +224,8 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
/* if the queue is empty then try an immediate write, avoiding
queue overhead. This relies on non-blocking sockets */
- if (queue->out_queue == NULL && queue->fd != -1) {
+ if (queue->out_queue == NULL && queue->fd != -1 &&
+ !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
ssize_t n = write(queue->fd, data, length2);
if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
event_add_timed(queue->ctdb->ev, queue, timeval_zero(),
diff --git a/ctdb/common/ctdb_util.c b/ctdb/common/ctdb_util.c
index f39e8b6682..e77dc8990d 100644
--- a/ctdb/common/ctdb_util.c
+++ b/ctdb/common/ctdb_util.c
@@ -129,6 +129,51 @@ void ctdb_latency(double *latency, struct timeval t)
}
}
+#if 0
+struct idr_fake {
+ uint32_t size;
+ void **ptrs;
+};
+
+static void idr_fake_init(struct ctdb_context *ctdb)
+{
+ if (ctdb->fidr) return;
+ ctdb->fidr = talloc(ctdb, struct idr_fake);
+ ctdb->fidr->size = 0x10000;
+ ctdb->fidr->ptrs = talloc_zero_array(ctdb->fidr, void *,
+ ctdb->fidr->size);
+}
+
+uint32_t ctdb_reqid_new(struct ctdb_context *ctdb, void *state)
+{
+ uint32_t i;
+ idr_fake_init(ctdb);
+ for (i=0;i<ctdb->fidr->size;i++) {
+ if (ctdb->fidr->ptrs[i] == NULL) {
+ ctdb->fidr->ptrs[i] = state;
+ return i;
+ }
+ }
+ return (uint32_t)-1;
+}
+
+void *_ctdb_reqid_find(struct ctdb_context *ctdb, uint32_t reqid, const char *type, const char *location)
+{
+ idr_fake_init(ctdb);
+ if (ctdb->fidr->ptrs[reqid] == NULL) {
+ DEBUG(0,("bad fidr id %u\n", reqid));
+ }
+ return ctdb->fidr->ptrs[reqid];
+}
+
+
+void ctdb_reqid_remove(struct ctdb_context *ctdb, uint32_t reqid)
+{
+ idr_fake_init(ctdb);
+ ctdb->fidr->ptrs[reqid] = NULL;
+}
+
+#else
uint32_t ctdb_reqid_new(struct ctdb_context *ctdb, void *state)
{
uint32_t id;
@@ -161,3 +206,4 @@ void ctdb_reqid_remove(struct ctdb_context *ctdb, uint32_t reqid)
}
}
+#endif
diff --git a/ctdb/direct/ctdbd.c b/ctdb/direct/ctdbd.c
index 674b54d47a..8274c51531 100644
--- a/ctdb/direct/ctdbd.c
+++ b/ctdb/direct/ctdbd.c
@@ -56,7 +56,6 @@ int main(int argc, const char *argv[])
int opt;
const char **extra_argv;
int extra_argc = 0;
- int ret;
poptContext pc;
struct event_context *ev;
@@ -91,22 +90,13 @@ int main(int argc, const char *argv[])
ctdb_db = ctdb_attach(ctdb, tok, TDB_DEFAULT,
O_RDWR|O_CREAT|O_TRUNC, 0666);
if (!ctdb_db) {
- printf("ctdb_attach to '%s'failed - %s\n", tok,
- ctdb_errstr(ctdb));
+ DEBUG(0,("ctdb_attach to '%s'failed - %s\n", tok,
+ ctdb_errstr(ctdb)));
exit(1);
}
- printf("Attached to database '%s'\n", tok);
+ DEBUG(1, ("Attached to database '%s'\n", tok));
}
- /* start the protocol running */
- ret = ctdb_start(ctdb);
-
-/* event_loop_wait(ev);*/
- while (1) {
- event_loop_once(ev);
- }
-
- /* shut it down */
- talloc_free(ev);
- return 0;
+ /* start the protocol running (as a child) */
+ return ctdb_start_daemon(ctdb);
}
diff --git a/ctdb/include/ctdb.h b/ctdb/include/ctdb.h
index d9e53d30a9..c73f211f6b 100644
--- a/ctdb/include/ctdb.h
+++ b/ctdb/include/ctdb.h
@@ -107,6 +107,7 @@ int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist);
start the ctdb protocol
*/
int ctdb_start(struct ctdb_context *ctdb);
+int ctdb_start_daemon(struct ctdb_context *ctdb);
/*
attach to a ctdb database
diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h
index f9b869c4b2..8bbc35c995 100644
--- a/ctdb/include/ctdb_private.h
+++ b/ctdb/include/ctdb_private.h
@@ -195,6 +195,7 @@ struct ctdb_context {
struct ctdb_status status;
struct ctdb_vnn_map *vnn_map;
uint32_t num_clients;
+ struct idr_fake *fidr;
};
struct ctdb_db_context {
@@ -266,7 +267,6 @@ struct ctdb_call_state {
struct ctdb_db_context *ctdb_db;
const char *errmsg;
struct ctdb_call call;
- struct ctdb_ltdb_header header;
struct {
void (*fn)(struct ctdb_call_state *);
void *private_data;
@@ -347,6 +347,7 @@ struct ctdb_reply_error {
struct ctdb_req_dmaster {
struct ctdb_req_header hdr;
uint32_t db_id;
+ uint64_t rsn;
uint32_t dmaster;
uint32_t keylen;
uint32_t datalen;
@@ -355,6 +356,9 @@ struct ctdb_req_dmaster {
struct ctdb_reply_dmaster {
struct ctdb_req_header hdr;
+ uint32_t db_id;
+ uint64_t rsn;
+ uint32_t keylen;
uint32_t datalen;
uint8_t data[1];
};