diff options
-rw-r--r-- | ctdb/common/ctdb_call.c | 90 | ||||
-rw-r--r-- | ctdb/common/ctdb_client.c | 4 | ||||
-rw-r--r-- | ctdb/common/ctdb_control.c | 2 | ||||
-rw-r--r-- | ctdb/common/ctdb_daemon.c | 52 | ||||
-rw-r--r-- | ctdb/common/ctdb_io.c | 22 | ||||
-rw-r--r-- | ctdb/common/ctdb_util.c | 46 | ||||
-rw-r--r-- | ctdb/direct/ctdbd.c | 20 | ||||
-rw-r--r-- | ctdb/include/ctdb.h | 1 | ||||
-rw-r--r-- | ctdb/include/ctdb_private.h | 6 |
9 files changed, 171 insertions, 72 deletions
diff --git a/ctdb/common/ctdb_call.c b/ctdb/common/ctdb_call.c index 012e6758a5..476d86f911 100644 --- a/ctdb/common/ctdb_call.c +++ b/ctdb/common/ctdb_call.c @@ -211,15 +211,19 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db, tmp_ctx = talloc_new(ctdb); /* send the CTDB_REPLY_DMASTER */ - len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize; + len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize; r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len, struct ctdb_reply_dmaster); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.destnode = new_dmaster; r->hdr.reqid = reqid; + r->rsn = header->rsn; + r->keylen = key.dsize; r->datalen = data.dsize; - memcpy(&r->data[0], data.dptr, data.dsize); + r->db_id = ctdb_db->db_id; + memcpy(&r->data[0], key.dptr, key.dsize); + memcpy(&r->data[key.dsize], data.dptr, data.dsize); ctdb_queue_packet(ctdb, &r->hdr); @@ -256,6 +260,7 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, r->hdr.destnode = lmaster; r->hdr.reqid = c->hdr.reqid; r->db_id = c->db_id; + r->rsn = header->rsn; r->dmaster = c->hdr.srcnode; r->keylen = key->dsize; r->datalen = data->dsize; @@ -276,39 +281,43 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, must be called with the chainlock held. This function releases the chainlock */ -static void ctdb_become_dmaster(struct ctdb_context *ctdb, - uint32_t reqid, TDB_DATA data) +static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db, + uint32_t reqid, TDB_DATA key, TDB_DATA data, + uint64_t rsn) { struct ctdb_call_state *state; - struct ctdb_db_context *ctdb_db; + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_ltdb_header header; + + DEBUG(2,("vnn %u dmaster response %08x\n", ctdb->vnn, ctdb_hash(&key))); + + ZERO_STRUCT(header); + header.rsn = rsn; + header.dmaster = ctdb->vnn; + + if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) { + ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n"); + ctdb_ltdb_unlock(ctdb_db, key); + return; + } state = ctdb_reqid_find(ctdb, reqid, struct ctdb_call_state); if (state == NULL) { + DEBUG(0,("vnn %u Invalid reqid %u in ctdb_become_dmaster\n", + ctdb->vnn, reqid)); + ctdb_ltdb_unlock(ctdb_db, key); return; } if (reqid != state->reqid) { /* we found a record but it was the wrong one */ - DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",reqid)); - return; - } - - ctdb_db = state->ctdb_db; - - DEBUG(2,("vnn %u dmaster response %08x\n", - ctdb->vnn, ctdb_hash(&state->call.key))); - - /* we're now the dmaster - update our local ltdb with new header - and data */ - state->header.dmaster = ctdb->vnn; - - if (ctdb_ltdb_store(ctdb_db, state->call.key, &state->header, data) != 0) { - ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n"); + DEBUG(0, ("Dropped orphan in ctdb_become_dmaster with reqid:%d\n",reqid)); + ctdb_ltdb_unlock(ctdb_db, key); return; } - ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn); + ctdb_call_local(ctdb_db, &state->call, &header, &data, ctdb->vnn); ctdb_ltdb_unlock(ctdb_db, state->call.key); @@ -381,7 +390,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr /* check if the new dmaster is the lmaster, in which case we skip the dmaster reply */ if (c->dmaster == ctdb->vnn) { - ctdb_become_dmaster(ctdb, hdr->reqid, data); + ctdb_become_dmaster(ctdb_db, hdr->reqid, key, data, c->rsn); } else { ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid); ctdb_ltdb_unlock(ctdb_db, key); @@ -465,7 +474,6 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) struct ctdb_reply_call); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.destnode = hdr->srcnode; - r->hdr.srcnode = hdr->destnode; r->hdr.reqid = hdr->reqid; r->status = call.status; r->datalen = call.reply_data.dsize; @@ -498,7 +506,7 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) if (hdr->reqid != state->reqid) { /* we found a record but it was the wrong one */ - DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",hdr->reqid)); + DEBUG(0, ("Dropped orphaned call reply with reqid:%d\n",hdr->reqid)); return; } @@ -525,26 +533,22 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr; - struct ctdb_call_state *state; struct ctdb_db_context *ctdb_db; - TDB_DATA data; + TDB_DATA key, data; int ret; - state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state); - - if (state == NULL) { - return; - } - - if (hdr->reqid != state->reqid) { - /* we found a record but it was the wrong one */ - DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",hdr->reqid)); + ctdb_db = find_ctdb_db(ctdb, c->db_id); + if (ctdb_db == NULL) { + DEBUG(0,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id)); return; } + + key.dptr = c->data; + key.dsize = c->keylen; + data.dptr = &c->data[key.dsize]; + data.dsize = c->datalen; - ctdb_db = state->ctdb_db; - - ret = ctdb_ltdb_lock_requeue(ctdb_db, state->call.key, hdr, + ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, ctdb_recv_raw_pkt, ctdb); if (ret == -2) { return; @@ -554,10 +558,7 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) return; } - data.dptr = c->data; - data.dsize = c->datalen; - - ctdb_become_dmaster(ctdb, hdr->reqid, data); + ctdb_become_dmaster(ctdb_db, hdr->reqid, key, data, c->rsn); } @@ -571,12 +572,14 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state); if (state == NULL) { + DEBUG(0,("vnn %u Invalid reqid %u in ctdb_reply_error\n", + ctdb->vnn, hdr->reqid)); return; } if (hdr->reqid != state->reqid) { /* we found a record but it was the wrong one */ - DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",hdr->reqid)); + DEBUG(0, ("Dropped orphaned error reply with reqid:%d\n",hdr->reqid)); return; } @@ -711,7 +714,6 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd state->call.key.dptr = &state->c->data[0]; state->state = CTDB_CALL_WAIT; - state->header = *header; state->ctdb_db = ctdb_db; ctdb_queue_packet(ctdb, &state->c->hdr); diff --git a/ctdb/common/ctdb_client.c b/ctdb/common/ctdb_client.c index 075f8553ae..7257dd5d29 100644 --- a/ctdb/common/ctdb_client.c +++ b/ctdb/common/ctdb_client.c @@ -78,7 +78,7 @@ static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_he if (hdr->reqid != state->reqid) { /* we found a record but it was the wrong one */ - DEBUG(0, ("Dropped orphaned reply with reqid:%d\n",hdr->reqid)); + DEBUG(0, ("Dropped client call reply with reqid:%d\n",hdr->reqid)); return; } @@ -414,7 +414,6 @@ int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, CTDB_NO_MEMORY(ctdb, r); r->hdr.destnode = vnn; - r->hdr.srcnode = ctdb->vnn; r->srvid = srvid; r->datalen = data.dsize; memcpy(&r->data[0], data.dptr, data.dsize); @@ -674,7 +673,6 @@ int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, c->hdr.reqid = state->reqid; c->hdr.destnode = destnode; - c->hdr.srcnode = ctdb->vnn; c->hdr.reqid = state->reqid; c->opcode = opcode; c->srvid = srvid; diff --git a/ctdb/common/ctdb_control.c b/ctdb/common/ctdb_control.c index 306323cceb..b24c2686cc 100644 --- a/ctdb/common/ctdb_control.c +++ b/ctdb/common/ctdb_control.c @@ -248,6 +248,8 @@ void ctdb_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_control_state); if (state == NULL) { + DEBUG(0,("vnn %u Invalid reqid %u in ctdb_reply_control\n", + ctdb->vnn, hdr->reqid)); return; } diff --git a/ctdb/common/ctdb_daemon.c b/ctdb/common/ctdb_daemon.c index d0d2c1a46d..a6d60fc00e 100644 --- a/ctdb/common/ctdb_daemon.c +++ b/ctdb/common/ctdb_daemon.c @@ -673,6 +673,48 @@ int ctdb_start(struct ctdb_context *ctdb) return 0; } + +/* + start the protocol going as a daemon +*/ +int ctdb_start_daemon(struct ctdb_context *ctdb) +{ + int res; + struct fd_event *fde; + const char *domain_socket_name; + + /* get rid of any old sockets */ + unlink(ctdb->daemon.name); + + /* create a unix domain stream socket to listen to */ + res = ux_socket_bind(ctdb); + if (res!=0) { + DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n")); + exit(10); + } + + if (fork()) { + return 0; + } + + tdb_reopen_all(False); + + setsid(); + block_signal(SIGPIPE); + block_signal(SIGCHLD); + + /* ensure the socket is deleted on exit of the daemon */ + domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name); + talloc_set_destructor(domain_socket_name, unlink_destructor); + + ctdb->ev = event_context_init(NULL); + fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, + ctdb_accept_client, ctdb); + ctdb_main_loop(ctdb); + + return 0; +} + /* allocate a packet for use in client<->daemon communication */ @@ -685,6 +727,7 @@ struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb, int size; struct ctdb_req_header *hdr; size = ((length+1)+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); + hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size); if (hdr == NULL) { DEBUG(0,("Unable to allocate packet for operation %u of length %u\n", @@ -692,11 +735,12 @@ struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb, return NULL; } talloc_set_name_const(hdr, type); - memset(hdr, 0, slength); + memset(hdr, 0, size); hdr->operation = operation; - hdr->length = length; + hdr->length = size; hdr->ctdb_magic = CTDB_MAGIC; hdr->ctdb_version = CTDB_VERSION; + hdr->srcnode = ctdb->vnn; if (ctdb->vnn_map) { hdr->generation = ctdb->vnn_map->generation; } @@ -724,9 +768,9 @@ struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb, return NULL; } talloc_set_name_const(hdr, type); - memset(hdr, 0, slength); + memset(hdr, 0, size); hdr->operation = operation; - hdr->length = length; + hdr->length = size; hdr->ctdb_magic = CTDB_MAGIC; hdr->ctdb_version = CTDB_VERSION; hdr->generation = ctdb->vnn_map->generation; diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c index 238f1701cf..6a5aa928b0 100644 --- a/ctdb/common/ctdb_io.c +++ b/ctdb/common/ctdb_io.c @@ -64,8 +64,10 @@ static void queue_io_read(struct ctdb_queue *queue) ssize_t nread; uint8_t *data, *data_base; - if (ioctl(queue->fd, FIONREAD, &num_ready) != 0 || - num_ready == 0) { + if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) { + return; + } + if (num_ready == 0) { /* the descriptor has been closed */ goto failed; } @@ -75,11 +77,14 @@ static void queue_io_read(struct ctdb_queue *queue) num_ready + queue->partial.length); if (queue->partial.data == NULL) { + DEBUG(0,("read error alloc failed for %u\n", + num_ready + queue->partial.length)); goto failed; } nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready); if (nread <= 0) { + DEBUG(0,("read error nread=%d\n", nread)); goto failed; } @@ -106,6 +111,7 @@ static void queue_io_read(struct ctdb_queue *queue) len = *(uint32_t *)data; d2 = talloc_memdup(queue, data, len); if (d2 == NULL) { + DEBUG(0,("read error memdup failed for %u\n", len)); /* sigh */ goto failed; } @@ -122,6 +128,8 @@ static void queue_io_read(struct ctdb_queue *queue) } else { queue->partial.data = talloc_memdup(queue, data, nread); if (queue->partial.data == NULL) { + DEBUG(0,("read error memdup partial failed for %u\n", + nread)); goto failed; } queue->partial.length = nread; @@ -155,8 +163,11 @@ static void queue_io_write(struct ctdb_queue *queue) while (queue->out_queue) { struct ctdb_queue_pkt *pkt = queue->out_queue; ssize_t n; - - n = write(queue->fd, pkt->data, pkt->length); + if (queue->ctdb->flags & CTDB_FLAG_TORTURE) { + n = write(queue->fd, pkt->data, 1); + } else { + n = write(queue->fd, pkt->data, pkt->length); + } if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { event_add_timed(queue->ctdb->ev, queue, timeval_zero(), @@ -213,7 +224,8 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length) /* if the queue is empty then try an immediate write, avoiding queue overhead. This relies on non-blocking sockets */ - if (queue->out_queue == NULL && queue->fd != -1) { + if (queue->out_queue == NULL && queue->fd != -1 && + !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) { ssize_t n = write(queue->fd, data, length2); if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { event_add_timed(queue->ctdb->ev, queue, timeval_zero(), diff --git a/ctdb/common/ctdb_util.c b/ctdb/common/ctdb_util.c index f39e8b6682..e77dc8990d 100644 --- a/ctdb/common/ctdb_util.c +++ b/ctdb/common/ctdb_util.c @@ -129,6 +129,51 @@ void ctdb_latency(double *latency, struct timeval t) } } +#if 0 +struct idr_fake { + uint32_t size; + void **ptrs; +}; + +static void idr_fake_init(struct ctdb_context *ctdb) +{ + if (ctdb->fidr) return; + ctdb->fidr = talloc(ctdb, struct idr_fake); + ctdb->fidr->size = 0x10000; + ctdb->fidr->ptrs = talloc_zero_array(ctdb->fidr, void *, + ctdb->fidr->size); +} + +uint32_t ctdb_reqid_new(struct ctdb_context *ctdb, void *state) +{ + uint32_t i; + idr_fake_init(ctdb); + for (i=0;i<ctdb->fidr->size;i++) { + if (ctdb->fidr->ptrs[i] == NULL) { + ctdb->fidr->ptrs[i] = state; + return i; + } + } + return (uint32_t)-1; +} + +void *_ctdb_reqid_find(struct ctdb_context *ctdb, uint32_t reqid, const char *type, const char *location) +{ + idr_fake_init(ctdb); + if (ctdb->fidr->ptrs[reqid] == NULL) { + DEBUG(0,("bad fidr id %u\n", reqid)); + } + return ctdb->fidr->ptrs[reqid]; +} + + +void ctdb_reqid_remove(struct ctdb_context *ctdb, uint32_t reqid) +{ + idr_fake_init(ctdb); + ctdb->fidr->ptrs[reqid] = NULL; +} + +#else uint32_t ctdb_reqid_new(struct ctdb_context *ctdb, void *state) { uint32_t id; @@ -161,3 +206,4 @@ void ctdb_reqid_remove(struct ctdb_context *ctdb, uint32_t reqid) } } +#endif diff --git a/ctdb/direct/ctdbd.c b/ctdb/direct/ctdbd.c index 674b54d47a..8274c51531 100644 --- a/ctdb/direct/ctdbd.c +++ b/ctdb/direct/ctdbd.c @@ -56,7 +56,6 @@ int main(int argc, const char *argv[]) int opt; const char **extra_argv; int extra_argc = 0; - int ret; poptContext pc; struct event_context *ev; @@ -91,22 +90,13 @@ int main(int argc, const char *argv[]) ctdb_db = ctdb_attach(ctdb, tok, TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666); if (!ctdb_db) { - printf("ctdb_attach to '%s'failed - %s\n", tok, - ctdb_errstr(ctdb)); + DEBUG(0,("ctdb_attach to '%s'failed - %s\n", tok, + ctdb_errstr(ctdb))); exit(1); } - printf("Attached to database '%s'\n", tok); + DEBUG(1, ("Attached to database '%s'\n", tok)); } - /* start the protocol running */ - ret = ctdb_start(ctdb); - -/* event_loop_wait(ev);*/ - while (1) { - event_loop_once(ev); - } - - /* shut it down */ - talloc_free(ev); - return 0; + /* start the protocol running (as a child) */ + return ctdb_start_daemon(ctdb); } diff --git a/ctdb/include/ctdb.h b/ctdb/include/ctdb.h index d9e53d30a9..c73f211f6b 100644 --- a/ctdb/include/ctdb.h +++ b/ctdb/include/ctdb.h @@ -107,6 +107,7 @@ int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist); start the ctdb protocol */ int ctdb_start(struct ctdb_context *ctdb); +int ctdb_start_daemon(struct ctdb_context *ctdb); /* attach to a ctdb database diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h index f9b869c4b2..8bbc35c995 100644 --- a/ctdb/include/ctdb_private.h +++ b/ctdb/include/ctdb_private.h @@ -195,6 +195,7 @@ struct ctdb_context { struct ctdb_status status; struct ctdb_vnn_map *vnn_map; uint32_t num_clients; + struct idr_fake *fidr; }; struct ctdb_db_context { @@ -266,7 +267,6 @@ struct ctdb_call_state { struct ctdb_db_context *ctdb_db; const char *errmsg; struct ctdb_call call; - struct ctdb_ltdb_header header; struct { void (*fn)(struct ctdb_call_state *); void *private_data; @@ -347,6 +347,7 @@ struct ctdb_reply_error { struct ctdb_req_dmaster { struct ctdb_req_header hdr; uint32_t db_id; + uint64_t rsn; uint32_t dmaster; uint32_t keylen; uint32_t datalen; @@ -355,6 +356,9 @@ struct ctdb_req_dmaster { struct ctdb_reply_dmaster { struct ctdb_req_header hdr; + uint32_t db_id; + uint64_t rsn; + uint32_t keylen; uint32_t datalen; uint8_t data[1]; }; |