diff options
Diffstat (limited to 'ctdb/common/ctdb_call.c')
-rw-r--r-- | ctdb/common/ctdb_call.c | 90 |
1 files changed, 46 insertions, 44 deletions
diff --git a/ctdb/common/ctdb_call.c b/ctdb/common/ctdb_call.c index 012e6758a5..476d86f911 100644 --- a/ctdb/common/ctdb_call.c +++ b/ctdb/common/ctdb_call.c @@ -211,15 +211,19 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db, tmp_ctx = talloc_new(ctdb); /* send the CTDB_REPLY_DMASTER */ - len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize; + len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize; r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len, struct ctdb_reply_dmaster); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.destnode = new_dmaster; r->hdr.reqid = reqid; + r->rsn = header->rsn; + r->keylen = key.dsize; r->datalen = data.dsize; - memcpy(&r->data[0], data.dptr, data.dsize); + r->db_id = ctdb_db->db_id; + memcpy(&r->data[0], key.dptr, key.dsize); + memcpy(&r->data[key.dsize], data.dptr, data.dsize); ctdb_queue_packet(ctdb, &r->hdr); @@ -256,6 +260,7 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, r->hdr.destnode = lmaster; r->hdr.reqid = c->hdr.reqid; r->db_id = c->db_id; + r->rsn = header->rsn; r->dmaster = c->hdr.srcnode; r->keylen = key->dsize; r->datalen = data->dsize; @@ -276,39 +281,43 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, must be called with the chainlock held. This function releases the chainlock */ -static void ctdb_become_dmaster(struct ctdb_context *ctdb, - uint32_t reqid, TDB_DATA data) +static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db, + uint32_t reqid, TDB_DATA key, TDB_DATA data, + uint64_t rsn) { struct ctdb_call_state *state; - struct ctdb_db_context *ctdb_db; + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_ltdb_header header; + + DEBUG(2,("vnn %u dmaster response %08x\n", ctdb->vnn, ctdb_hash(&key))); + + ZERO_STRUCT(header); + header.rsn = rsn; + header.dmaster = ctdb->vnn; + + if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) { + ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n"); + ctdb_ltdb_unlock(ctdb_db, key); + return; + } state = ctdb_reqid_find(ctdb, reqid, struct ctdb_call_state); if (state == NULL) { + DEBUG(0,("vnn %u Invalid reqid %u in ctdb_become_dmaster\n", + ctdb->vnn, reqid)); + ctdb_ltdb_unlock(ctdb_db, key); return; } if (reqid != state->reqid) { /* we found a record but it was the wrong one */ - DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",reqid)); - return; - } - - ctdb_db = state->ctdb_db; - - DEBUG(2,("vnn %u dmaster response %08x\n", - ctdb->vnn, ctdb_hash(&state->call.key))); - - /* we're now the dmaster - update our local ltdb with new header - and data */ - state->header.dmaster = ctdb->vnn; - - if (ctdb_ltdb_store(ctdb_db, state->call.key, &state->header, data) != 0) { - ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n"); + DEBUG(0, ("Dropped orphan in ctdb_become_dmaster with reqid:%d\n",reqid)); + ctdb_ltdb_unlock(ctdb_db, key); return; } - ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn); + ctdb_call_local(ctdb_db, &state->call, &header, &data, ctdb->vnn); ctdb_ltdb_unlock(ctdb_db, state->call.key); @@ -381,7 +390,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr /* check if the new dmaster is the lmaster, in which case we skip the dmaster reply */ if (c->dmaster == ctdb->vnn) { - ctdb_become_dmaster(ctdb, hdr->reqid, data); + ctdb_become_dmaster(ctdb_db, hdr->reqid, key, data, c->rsn); } else { ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid); ctdb_ltdb_unlock(ctdb_db, key); @@ -465,7 +474,6 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) struct ctdb_reply_call); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.destnode = hdr->srcnode; - r->hdr.srcnode = hdr->destnode; r->hdr.reqid = hdr->reqid; r->status = call.status; r->datalen = call.reply_data.dsize; @@ -498,7 +506,7 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) if (hdr->reqid != state->reqid) { /* we found a record but it was the wrong one */ - DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",hdr->reqid)); + DEBUG(0, ("Dropped orphaned call reply with reqid:%d\n",hdr->reqid)); return; } @@ -525,26 +533,22 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr; - struct ctdb_call_state *state; struct ctdb_db_context *ctdb_db; - TDB_DATA data; + TDB_DATA key, data; int ret; - state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state); - - if (state == NULL) { - return; - } - - if (hdr->reqid != state->reqid) { - /* we found a record but it was the wrong one */ - DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",hdr->reqid)); + ctdb_db = find_ctdb_db(ctdb, c->db_id); + if (ctdb_db == NULL) { + DEBUG(0,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id)); return; } + + key.dptr = c->data; + key.dsize = c->keylen; + data.dptr = &c->data[key.dsize]; + data.dsize = c->datalen; - ctdb_db = state->ctdb_db; - - ret = ctdb_ltdb_lock_requeue(ctdb_db, state->call.key, hdr, + ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, ctdb_recv_raw_pkt, ctdb); if (ret == -2) { return; @@ -554,10 +558,7 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) return; } - data.dptr = c->data; - data.dsize = c->datalen; - - ctdb_become_dmaster(ctdb, hdr->reqid, data); + ctdb_become_dmaster(ctdb_db, hdr->reqid, key, data, c->rsn); } @@ -571,12 +572,14 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state); if (state == NULL) { + DEBUG(0,("vnn %u Invalid reqid %u in ctdb_reply_error\n", + ctdb->vnn, hdr->reqid)); return; } if (hdr->reqid != state->reqid) { /* we found a record but it was the wrong one */ - DEBUG(0, ("Dropped orphaned dmaster reply with reqid:%d\n",hdr->reqid)); + DEBUG(0, ("Dropped orphaned error reply with reqid:%d\n",hdr->reqid)); return; } @@ -711,7 +714,6 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd state->call.key.dptr = &state->c->data[0]; state->state = CTDB_CALL_WAIT; - state->header = *header; state->ctdb_db = ctdb_db; ctdb_queue_packet(ctdb, &state->c->hdr); |