diff options
-rw-r--r-- | ctdb/include/ctdb_private.h | 4 | ||||
-rw-r--r-- | ctdb/server/ctdb_daemon.c | 207 | ||||
-rw-r--r-- | ctdb/server/ctdb_ltdb_server.c | 11 |
3 files changed, 222 insertions, 0 deletions
diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h index 7e59473013..f1818b98de 100644 --- a/ctdb/include/ctdb_private.h +++ b/ctdb/include/ctdb_private.h @@ -527,6 +527,10 @@ struct ctdb_db_context { struct ctdb_ltdb_header *header, TDB_DATA data); + /* used to track which records we are currently fetching + so we can avoid sending duplicate fetch requests + */ + struct trbt_tree *deferred_fetch; }; diff --git a/ctdb/server/ctdb_daemon.c b/ctdb/server/ctdb_daemon.c index 88d12103f7..69fb6fb1f6 100644 --- a/ctdb/server/ctdb_daemon.c +++ b/ctdb/server/ctdb_daemon.c @@ -27,6 +27,7 @@ #include "system/wait.h" #include "../include/ctdb_client.h" #include "../include/ctdb_private.h" +#include "../common/rb_tree.h" #include <sys/socket.h> struct ctdb_client_pid_list { @@ -359,6 +360,190 @@ static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr) daemon_incoming_packet(client, hdr); } +struct ctdb_deferred_fetch_call { + struct ctdb_deferred_fetch_call *next, *prev; + struct ctdb_req_call *c; + struct ctdb_daemon_packet_wrap *w; +}; + +struct ctdb_deferred_fetch_queue { + struct ctdb_deferred_fetch_call *deferred_calls; +}; + +struct ctdb_deferred_requeue { + struct ctdb_deferred_fetch_call *dfc; + struct ctdb_client *client; +}; + +/* called from a timer event and starts reprocessing the deferred call.*/ +static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data; + struct ctdb_client *client = dfr->client; + + talloc_steal(client, dfr->dfc->c); + daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c); + talloc_free(dfr); +} + +/* the referral context is destroyed either after a timeout or when the initial + fetch-lock has finished. + at this stage, immediately start reprocessing the queued up deferred + calls so they get reprocessed immediately (and since we are dmaster at + this stage, trigger the waiting smbd processes to pick up and aquire the + record right away. +*/ +static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq) +{ + + /* need to reprocess the packets from the queue explicitely instead of + just using a normal destructor since we want, need, to + call the clients in the same oder as the requests queued up + */ + while (dfq->deferred_calls != NULL) { + struct ctdb_client *client; + struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls; + struct ctdb_deferred_requeue *dfr; + + DLIST_REMOVE(dfq->deferred_calls, dfc); + + client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client); + if (client == NULL) { + DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n", + dfc->w->client_id)); + continue; + } + + /* process it by pushing it back onto the eventloop */ + dfr = talloc(client, struct ctdb_deferred_requeue); + if (dfr == NULL) { + DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n")); + continue; + } + + dfr->dfc = talloc_steal(dfr, dfc); + dfr->client = client; + + event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr); + } + + return 0; +} + +/* insert the new deferral context into the rb tree. + there should never be a pre-existing context here, but check for it + warn and destroy the previous context if there is already a deferral context + for this key. +*/ +static void *insert_dfq_callback(void *parm, void *data) +{ + if (data) { + DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm)); + talloc_free(data); + } + return parm; +} + +/* if the original fetch-lock did not complete within a reasonable time, + free the context and context for all deferred requests to cause them to be + re-inserted into the event system. +*/ +static void dfq_timeout(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + talloc_free(private_data); +} + +/* This function is used in the local daemon to register a KEY in a database + for being "fetched" + While the remote fetch is in-flight, any futher attempts to re-fetch the + same record will be deferred until the fetch completes. +*/ +static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) +{ + uint32_t *k; + struct ctdb_deferred_fetch_queue *dfq; + + k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4); + if (k == NULL) { + DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n")); + return -1; + } + + k[0] = (call->key.dsize + 3) / 4 + 1; + memcpy(&k[1], call->key.dptr, call->key.dsize); + + dfq = talloc(call, struct ctdb_deferred_fetch_queue); + if (dfq == NULL) { + DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n")); + talloc_free(k); + return -1; + } + dfq->deferred_calls = NULL; + + trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq); + + talloc_set_destructor(dfq, deferred_fetch_queue_destructor); + + /* if the fetch havent completed in 30 seconds, just tear it all down + and let it try again as the events are reissued */ + event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq); + + talloc_free(k); + return 0; +} + +/* check if this is a duplicate request to a fetch already in-flight + if it is, make this call deferred to be reprocessed later when + the in-flight fetch completes. +*/ +static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c) +{ + uint32_t *k; + struct ctdb_deferred_fetch_queue *dfq; + struct ctdb_deferred_fetch_call *dfc; + + k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4); + if (k == NULL) { + DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n")); + return -1; + } + + k[0] = (key.dsize + 3) / 4 + 1; + memcpy(&k[1], key.dptr, key.dsize); + + dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]); + if (dfq == NULL) { + talloc_free(k); + return -1; + } + + + talloc_free(k); + + dfc = talloc(dfq, struct ctdb_deferred_fetch_call); + if (dfc == NULL) { + DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n")); + return -1; + } + + dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap); + if (dfc->w == NULL) { + DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n")); + talloc_free(dfc); + return -1; + } + + dfc->c = talloc_steal(dfc, c); + dfc->w->ctdb = ctdb_db->ctdb; + dfc->w->client_id = client->client_id; + + DLIST_ADD_END(dfq->deferred_calls, dfc, NULL); + + return 0; +} + /* this is called when the ctdb daemon received a ctdb request call @@ -424,6 +609,20 @@ static void daemon_request_call_from_client(struct ctdb_client *client, return; } + if (c->flags & CTDB_IMMEDIATE_MIGRATION) { + /* check if this fetch-lock request is a duplicate for a + request we already have in flight. If so defer it until + the first request completes. + */ + if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) { + ret = ctdb_ltdb_unlock(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret)); + } + return; + } + } + /* Dont do READONLY if we dont have a tracking database */ if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) { c->flags &= ~CTDB_WANT_READONLY; @@ -513,6 +712,14 @@ static void daemon_request_call_from_client(struct ctdb_client *client, state = ctdb_call_local_send(ctdb_db, call, &header, &data); } else { state = ctdb_daemon_call_send_remote(ctdb_db, call, &header); + if (call->flags & CTDB_IMMEDIATE_MIGRATION) { + /* This request triggered a remote fetch-lock. + set up a deferral for this key so any additional + fetch-locks are deferred until the current one + finishes. + */ + setup_deferred_fetch_locks(ctdb_db, call); + } } ret = ctdb_ltdb_unlock(ctdb_db, key); diff --git a/ctdb/server/ctdb_ltdb_server.c b/ctdb/server/ctdb_ltdb_server.c index 745b3d256b..713606d759 100644 --- a/ctdb/server/ctdb_ltdb_server.c +++ b/ctdb/server/ctdb_ltdb_server.c @@ -962,6 +962,17 @@ again: } } + /* set up a rb tree we can use to track which records we have a + fetch-lock in-flight for so we can defer any additional calls + for the same record. + */ + ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0); + if (ctdb_db->deferred_fetch == NULL) { + DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n")); + talloc_free(ctdb_db); + return -1; + } + DLIST_ADD(ctdb->db_list, ctdb_db); /* setting this can help some high churn databases */ |