summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ctdb/include/ctdb_private.h4
-rw-r--r--ctdb/server/ctdb_daemon.c207
-rw-r--r--ctdb/server/ctdb_ltdb_server.c11
3 files changed, 222 insertions, 0 deletions
diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h
index 7e59473013..f1818b98de 100644
--- a/ctdb/include/ctdb_private.h
+++ b/ctdb/include/ctdb_private.h
@@ -527,6 +527,10 @@ struct ctdb_db_context {
struct ctdb_ltdb_header *header,
TDB_DATA data);
+ /* used to track which records we are currently fetching
+ so we can avoid sending duplicate fetch requests
+ */
+ struct trbt_tree *deferred_fetch;
};
diff --git a/ctdb/server/ctdb_daemon.c b/ctdb/server/ctdb_daemon.c
index 88d12103f7..69fb6fb1f6 100644
--- a/ctdb/server/ctdb_daemon.c
+++ b/ctdb/server/ctdb_daemon.c
@@ -27,6 +27,7 @@
#include "system/wait.h"
#include "../include/ctdb_client.h"
#include "../include/ctdb_private.h"
+#include "../common/rb_tree.h"
#include <sys/socket.h>
struct ctdb_client_pid_list {
@@ -359,6 +360,190 @@ static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
daemon_incoming_packet(client, hdr);
}
+struct ctdb_deferred_fetch_call {
+ struct ctdb_deferred_fetch_call *next, *prev;
+ struct ctdb_req_call *c;
+ struct ctdb_daemon_packet_wrap *w;
+};
+
+struct ctdb_deferred_fetch_queue {
+ struct ctdb_deferred_fetch_call *deferred_calls;
+};
+
+struct ctdb_deferred_requeue {
+ struct ctdb_deferred_fetch_call *dfc;
+ struct ctdb_client *client;
+};
+
+/* called from a timer event and starts reprocessing the deferred call.*/
+static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private_data)
+{
+ struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
+ struct ctdb_client *client = dfr->client;
+
+ talloc_steal(client, dfr->dfc->c);
+ daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
+ talloc_free(dfr);
+}
+
+/* the referral context is destroyed either after a timeout or when the initial
+ fetch-lock has finished.
+ at this stage, immediately start reprocessing the queued up deferred
+ calls so they get reprocessed immediately (and since we are dmaster at
+ this stage, trigger the waiting smbd processes to pick up and aquire the
+ record right away.
+*/
+static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
+{
+
+ /* need to reprocess the packets from the queue explicitely instead of
+ just using a normal destructor since we want, need, to
+ call the clients in the same oder as the requests queued up
+ */
+ while (dfq->deferred_calls != NULL) {
+ struct ctdb_client *client;
+ struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
+ struct ctdb_deferred_requeue *dfr;
+
+ DLIST_REMOVE(dfq->deferred_calls, dfc);
+
+ client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
+ if (client == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
+ dfc->w->client_id));
+ continue;
+ }
+
+ /* process it by pushing it back onto the eventloop */
+ dfr = talloc(client, struct ctdb_deferred_requeue);
+ if (dfr == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
+ continue;
+ }
+
+ dfr->dfc = talloc_steal(dfr, dfc);
+ dfr->client = client;
+
+ event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
+ }
+
+ return 0;
+}
+
+/* insert the new deferral context into the rb tree.
+ there should never be a pre-existing context here, but check for it
+ warn and destroy the previous context if there is already a deferral context
+ for this key.
+*/
+static void *insert_dfq_callback(void *parm, void *data)
+{
+ if (data) {
+ DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
+ talloc_free(data);
+ }
+ return parm;
+}
+
+/* if the original fetch-lock did not complete within a reasonable time,
+ free the context and context for all deferred requests to cause them to be
+ re-inserted into the event system.
+*/
+static void dfq_timeout(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private_data)
+{
+ talloc_free(private_data);
+}
+
+/* This function is used in the local daemon to register a KEY in a database
+ for being "fetched"
+ While the remote fetch is in-flight, any futher attempts to re-fetch the
+ same record will be deferred until the fetch completes.
+*/
+static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
+{
+ uint32_t *k;
+ struct ctdb_deferred_fetch_queue *dfq;
+
+ k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
+ if (k == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
+ return -1;
+ }
+
+ k[0] = (call->key.dsize + 3) / 4 + 1;
+ memcpy(&k[1], call->key.dptr, call->key.dsize);
+
+ dfq = talloc(call, struct ctdb_deferred_fetch_queue);
+ if (dfq == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
+ talloc_free(k);
+ return -1;
+ }
+ dfq->deferred_calls = NULL;
+
+ trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
+
+ talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
+
+ /* if the fetch havent completed in 30 seconds, just tear it all down
+ and let it try again as the events are reissued */
+ event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
+
+ talloc_free(k);
+ return 0;
+}
+
+/* check if this is a duplicate request to a fetch already in-flight
+ if it is, make this call deferred to be reprocessed later when
+ the in-flight fetch completes.
+*/
+static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
+{
+ uint32_t *k;
+ struct ctdb_deferred_fetch_queue *dfq;
+ struct ctdb_deferred_fetch_call *dfc;
+
+ k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
+ if (k == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
+ return -1;
+ }
+
+ k[0] = (key.dsize + 3) / 4 + 1;
+ memcpy(&k[1], key.dptr, key.dsize);
+
+ dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
+ if (dfq == NULL) {
+ talloc_free(k);
+ return -1;
+ }
+
+
+ talloc_free(k);
+
+ dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
+ if (dfc == NULL) {
+ DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
+ return -1;
+ }
+
+ dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
+ if (dfc->w == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
+ talloc_free(dfc);
+ return -1;
+ }
+
+ dfc->c = talloc_steal(dfc, c);
+ dfc->w->ctdb = ctdb_db->ctdb;
+ dfc->w->client_id = client->client_id;
+
+ DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
+
+ return 0;
+}
+
/*
this is called when the ctdb daemon received a ctdb request call
@@ -424,6 +609,20 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
return;
}
+ if (c->flags & CTDB_IMMEDIATE_MIGRATION) {
+ /* check if this fetch-lock request is a duplicate for a
+ request we already have in flight. If so defer it until
+ the first request completes.
+ */
+ if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
+ ret = ctdb_ltdb_unlock(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+ }
+ return;
+ }
+ }
+
/* Dont do READONLY if we dont have a tracking database */
if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
c->flags &= ~CTDB_WANT_READONLY;
@@ -513,6 +712,14 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
state = ctdb_call_local_send(ctdb_db, call, &header, &data);
} else {
state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
+ if (call->flags & CTDB_IMMEDIATE_MIGRATION) {
+ /* This request triggered a remote fetch-lock.
+ set up a deferral for this key so any additional
+ fetch-locks are deferred until the current one
+ finishes.
+ */
+ setup_deferred_fetch_locks(ctdb_db, call);
+ }
}
ret = ctdb_ltdb_unlock(ctdb_db, key);
diff --git a/ctdb/server/ctdb_ltdb_server.c b/ctdb/server/ctdb_ltdb_server.c
index 745b3d256b..713606d759 100644
--- a/ctdb/server/ctdb_ltdb_server.c
+++ b/ctdb/server/ctdb_ltdb_server.c
@@ -962,6 +962,17 @@ again:
}
}
+ /* set up a rb tree we can use to track which records we have a
+ fetch-lock in-flight for so we can defer any additional calls
+ for the same record.
+ */
+ ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
+ if (ctdb_db->deferred_fetch == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
+ talloc_free(ctdb_db);
+ return -1;
+ }
+
DLIST_ADD(ctdb->db_list, ctdb_db);
/* setting this can help some high churn databases */