diff options
-rw-r--r-- | ctdb/server/ctdb_call.c | 130 |
1 files changed, 116 insertions, 14 deletions
diff --git a/ctdb/server/ctdb_call.c b/ctdb/server/ctdb_call.c index 7d1dd56c6d..2df86b46c9 100644 --- a/ctdb/server/ctdb_call.c +++ b/ctdb/server/ctdb_call.c @@ -43,7 +43,6 @@ return ctdb_db; } - /* a varient of input packet that can be used in lock requeue */ @@ -489,6 +488,8 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) call->key.dsize = c->keylen; call->call_data.dptr = c->data + c->keylen; call->call_data.dsize = c->calldatalen; + call->reply_data.dptr = NULL; + call->reply_data.dsize = 0; /* determine if we are the dmaster for this key. This also fetches the record data (if any), thus avoiding a 2nd fetch of the data @@ -505,9 +506,36 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) return; } - /* if we are not the dmaster, then send a redirect to the - requesting node */ - if (header.dmaster != ctdb->pnn) { + /* Dont do READONLY if we dont have a tracking database */ + if ((c->flags & CTDB_WANT_READONLY) && ctdb_db->rottdb == NULL) { + c->flags &= ~CTDB_WANT_READONLY; + } + + if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) { + header.flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE); + if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) { + ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag"); + } + } + + /* if we are revoking, we must defer all other calls until the revoke + * had completed. + */ + if (header.flags & CTDB_REC_RO_REVOKING_READONLY) { + talloc_free(data.dptr); + ret = ctdb_ltdb_unlock(ctdb_db, call->key); + + if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) { + ctdb_fatal(ctdb, "Failed to add deferred call for revoke child"); + } + talloc_free(call); + return; + } + + /* if we are not the dmaster and are not hosting any delegations, + then send a redirect to the requesting node */ + if ((header.dmaster != ctdb->pnn) + && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) { talloc_free(data.dptr); ctdb_call_send_redirect(ctdb, call->key, c, &header); @@ -518,6 +546,80 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) return; } + if ( (!(c->flags & CTDB_WANT_READONLY)) + && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) { + header.flags |= CTDB_REC_RO_REVOKING_READONLY; + if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) { + ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set"); + } + ret = ctdb_ltdb_unlock(ctdb_db, call->key); + + if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) { + ctdb_fatal(ctdb, "Failed to start record revoke"); + } + talloc_free(data.dptr); + + if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) { + ctdb_fatal(ctdb, "Failed to add deferred call for revoke child"); + } + talloc_free(call); + + return; + } + + /* If this is the first request for delegation. bump rsn and set + * the delegations flag + */ + if ((c->flags & CTDB_WANT_READONLY) + && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC) + && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) { + header.rsn += 3; + header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS; + if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) { + ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set"); + } + } + if ((c->flags & CTDB_WANT_READONLY) + && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) { + TDB_DATA tdata; + + tdata = tdb_fetch(ctdb_db->rottdb, call->key); + if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) { + ctdb_fatal(ctdb, "Failed to add node to trackingdb"); + } + if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) { + ctdb_fatal(ctdb, "Failed to store trackingdb data"); + } + free(tdata.dptr); + + ret = ctdb_ltdb_unlock(ctdb_db, call->key); + if (ret != 0) { + DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret)); + } + + len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header); + r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, + struct ctdb_reply_call); + CTDB_NO_MEMORY_FATAL(ctdb, r); + r->hdr.destnode = c->hdr.srcnode; + r->hdr.reqid = c->hdr.reqid; + r->status = 0; + r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header); + header.rsn -= 2; + header.flags |= CTDB_REC_RO_HAVE_READONLY; + header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS; + memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header)); + + if (data.dsize) { + memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize); + } + + ctdb_queue_packet(ctdb, &r->hdr); + + talloc_free(r); + return; + } + CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount); /* Try if possible to migrate the record off to the caller node. @@ -922,8 +1024,8 @@ struct revokechild_requeue_handle { void *ctx; }; -static void deferred_call_requeue(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private_data) +static void deferred_call_requeue(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) { struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle); @@ -968,10 +1070,10 @@ static int revokechild_destructor(struct revokechild_handle *rc) return 0; } -static void revokechild_handler(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private_data) +static void revokechild_handler(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private_data) { - struct revokechild_handle *rc = talloc_get_type(private_data, + struct revokechild_handle *rc = talloc_get_type(private_data, struct revokechild_handle); int ret; char c; @@ -1015,8 +1117,8 @@ static void update_record_cb(struct ctdb_client_control_state *state) revoke_state = state->async.private_data; state->async.fn = NULL; - ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL); - if ((ret != 0) || (res != 0)) { + ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL); + if ((ret != 0) || (res != 0)) { DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res)); revoke_state->status = -1; } @@ -1045,8 +1147,8 @@ static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *privat } -static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te, - struct timeval yt, void *private_data) +static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te, + struct timeval yt, void *private_data) { struct ctdb_revoke_state *state = private_data; @@ -1064,7 +1166,7 @@ static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db state->key = key; state->header = header; state->data = data; - + ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state); event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0), ctdb_revoke_timeout_handler, state); |