summaryrefslogtreecommitdiffstats
path: root/ctdb/common/ctdb_call.c
diff options
context:
space:
mode:
authorAndrew Tridgell <tridge@samba.org>2007-04-29 16:19:40 +0200
committerAndrew Tridgell <tridge@samba.org>2007-04-29 16:19:40 +0200
commite21f69107f423f24f5407adb9323fb0bc8aa3f64 (patch)
treef357af261e07dde8822d0d178f7906c7e1cc9b33 /ctdb/common/ctdb_call.c
parent10910f52eb1e62bf5393952f4c3a7380bf2dc548 (diff)
downloadsamba-e21f69107f423f24f5407adb9323fb0bc8aa3f64.tar.gz
samba-e21f69107f423f24f5407adb9323fb0bc8aa3f64.tar.xz
samba-e21f69107f423f24f5407adb9323fb0bc8aa3f64.zip
yay! finally fixed the bug that volker, ronnie and I have been chasing
for 2 days. The main bug was in smbd, but there was a secondary (and more subtle) bug in ctdb that the bug in smbd exposed. When we get send a dmaster reply, we have to correctly update the dmaster in the recipient even if the original requst has timed out, otherwise ctdbd can get into a loop fighting over who will handle a key. This patch also cleans up the packet allocation, and makes ctdbd become a real daemon. (This used to be ctdb commit 59405e59ef522b97d8e20e4b14310a217141ac7c)
Diffstat (limited to 'ctdb/common/ctdb_call.c')
-rw-r--r--ctdb/common/ctdb_call.c90
1 files changed, 46 insertions, 44 deletions
diff --git a/ctdb/common/ctdb_call.c b/ctdb/common/ctdb_call.c
index 012e6758a5..476d86f911 100644
--- a/ctdb/common/ctdb_call.c
+++ b/ctdb/common/ctdb_call.c
@@ -211,15 +211,19 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
tmp_ctx = talloc_new(ctdb);
/* send the CTDB_REPLY_DMASTER */
- len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize;
+ len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize;
r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
struct ctdb_reply_dmaster);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.destnode = new_dmaster;
r->hdr.reqid = reqid;
+ r->rsn = header->rsn;
+ r->keylen = key.dsize;
r->datalen = data.dsize;
- memcpy(&r->data[0], data.dptr, data.dsize);
+ r->db_id = ctdb_db->db_id;
+ memcpy(&r->data[0], key.dptr, key.dsize);
+ memcpy(&r->data[key.dsize], data.dptr, data.dsize);
ctdb_queue_packet(ctdb, &r->hdr);
@@ -256,6 +260,7 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
r->hdr.destnode = lmaster;
r->hdr.reqid = c->hdr.reqid;
r->db_id = c->db_id;
+ r->rsn = header->rsn;
r->dmaster = c->hdr.srcnode;
r->keylen = key->dsize;
r->datalen = data->dsize;
@@ -276,39 +281,43 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
must be called with the chainlock held. This function releases the chainlock
*/
-static void ctdb_become_dmaster(struct ctdb_context *ctdb,
- uint32_t reqid, TDB_DATA data)
+static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
+ uint32_t reqid, TDB_DATA key, TDB_DATA data,
+ uint64_t rsn)
{
struct ctdb_call_state *state;
- struct ctdb_db_context *ctdb_db;
+ struct ctdb_context *ctdb = ctdb_db->ctdb;
+ struct ctdb_ltdb_header header;
+
+ DEBUG(2,("vnn %u dmaster response %08x\n", ctdb->vnn, ctdb_hash(&key)));
+
+ ZERO_STRUCT(header);
+ header.rsn = rsn;
+ header.dmaster = ctdb->vnn;
+
+ if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
+ ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
+ ctdb_ltdb_unlock(ctdb_db, key);
+ return;
+ }
state = ctdb_reqid_find(ctdb, reqid, struct ctdb_call_state);
if (state == NULL) {
+ DEBUG(0,("vnn %u Invalid reqid %u in ctdb_become_dmaster\n",
+ ctdb->vnn, reqid));
+ ctdb_ltdb_unlock(ctdb_db, key);
return;
}
if (reqid != state->reqid) {
/* we found a record but it was the wrong one */
- DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",reqid));
- return;
- }
-
- ctdb_db = state->ctdb_db;
-
- DEBUG(2,("vnn %u dmaster response %08x\n",
- ctdb->vnn, ctdb_hash(&state->call.key)));
-
- /* we're now the dmaster - update our local ltdb with new header
- and data */
- state->header.dmaster = ctdb->vnn;
-
- if (ctdb_ltdb_store(ctdb_db, state->call.key, &state->header, data) != 0) {
- ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
+ DEBUG(0, ("Dropped orphan in ctdb_become_dmaster with reqid:%d\n",reqid));
+ ctdb_ltdb_unlock(ctdb_db, key);
return;
}
- ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn);
+ ctdb_call_local(ctdb_db, &state->call, &header, &data, ctdb->vnn);
ctdb_ltdb_unlock(ctdb_db, state->call.key);
@@ -381,7 +390,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
/* check if the new dmaster is the lmaster, in which case we
skip the dmaster reply */
if (c->dmaster == ctdb->vnn) {
- ctdb_become_dmaster(ctdb, hdr->reqid, data);
+ ctdb_become_dmaster(ctdb_db, hdr->reqid, key, data, c->rsn);
} else {
ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
ctdb_ltdb_unlock(ctdb_db, key);
@@ -465,7 +474,6 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
struct ctdb_reply_call);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.destnode = hdr->srcnode;
- r->hdr.srcnode = hdr->destnode;
r->hdr.reqid = hdr->reqid;
r->status = call.status;
r->datalen = call.reply_data.dsize;
@@ -498,7 +506,7 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
if (hdr->reqid != state->reqid) {
/* we found a record but it was the wrong one */
- DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",hdr->reqid));
+ DEBUG(0, ("Dropped orphaned call reply with reqid:%d\n",hdr->reqid));
return;
}
@@ -525,26 +533,22 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
{
struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
- struct ctdb_call_state *state;
struct ctdb_db_context *ctdb_db;
- TDB_DATA data;
+ TDB_DATA key, data;
int ret;
- state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
-
- if (state == NULL) {
- return;
- }
-
- if (hdr->reqid != state->reqid) {
- /* we found a record but it was the wrong one */
- DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",hdr->reqid));
+ ctdb_db = find_ctdb_db(ctdb, c->db_id);
+ if (ctdb_db == NULL) {
+ DEBUG(0,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
return;
}
+
+ key.dptr = c->data;
+ key.dsize = c->keylen;
+ data.dptr = &c->data[key.dsize];
+ data.dsize = c->datalen;
- ctdb_db = state->ctdb_db;
-
- ret = ctdb_ltdb_lock_requeue(ctdb_db, state->call.key, hdr,
+ ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
ctdb_recv_raw_pkt, ctdb);
if (ret == -2) {
return;
@@ -554,10 +558,7 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
return;
}
- data.dptr = c->data;
- data.dsize = c->datalen;
-
- ctdb_become_dmaster(ctdb, hdr->reqid, data);
+ ctdb_become_dmaster(ctdb_db, hdr->reqid, key, data, c->rsn);
}
@@ -571,12 +572,14 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
if (state == NULL) {
+ DEBUG(0,("vnn %u Invalid reqid %u in ctdb_reply_error\n",
+ ctdb->vnn, hdr->reqid));
return;
}
if (hdr->reqid != state->reqid) {
/* we found a record but it was the wrong one */
- DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",hdr->reqid));
+ DEBUG(0, ("Dropped orphaned error reply with reqid:%d\n",hdr->reqid));
return;
}
@@ -711,7 +714,6 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
state->call.key.dptr = &state->c->data[0];
state->state = CTDB_CALL_WAIT;
- state->header = *header;
state->ctdb_db = ctdb_db;
ctdb_queue_packet(ctdb, &state->c->hdr);