From 01c7e801761cff3c58f3de7dd4e08c99da99d7aa Mon Sep 17 00:00:00 2001 From: Stephen Gallagher Date: Thu, 13 Aug 2009 11:17:08 -0400 Subject: Don't go to the backend for identical cache entry requests Currently, if an additional request comes in for a cache entry while that same entry is already in the process of being refreshed, we start a duplicate cache update request. This patch adds allows the cache to maintain a hash table of all in-progress requests and queue up multiple callbacks for updates in progress. Once the data is returned, all of these callbacks will fire. --- server/Makefile.am | 10 +- server/responder/common/responder.h | 3 + server/responder/common/responder_dp.c | 345 +++++++++++++++++++++++++++------ 3 files changed, 303 insertions(+), 55 deletions(-) diff --git a/server/Makefile.am b/server/Makefile.am index 9616a391d..8b552558f 100644 --- a/server/Makefile.am +++ b/server/Makefile.am @@ -91,6 +91,12 @@ INI_CFG_LIBS = \ -L$(builddir)/../common/ini/.libs/ \ -lini_config +DHASH_CFLAGS = \ + -I$(srcdir)/../common/dhash +DHASH_LIBS = \ + -L$(builddir)/../common/dhash/.libs/ \ + -ldhash + AM_CPPFLAGS = -Wall \ -Iinclude \ -I.. \ @@ -107,7 +113,8 @@ AM_CPPFLAGS = -Wall \ $(PCRE_CFLAGS) \ $(REPLACE_CFLAGS) \ $(COLLECTION_CFLAGS) \ - $(INI_CFG_CFLAGS)\ + $(INI_CFG_CFLAGS) \ + $(DHASH_CFLAGS) \ -DLIBDIR=\"$(libdir)\" \ -DVARDIR=\"$(localstatedir)\" \ -DSHLIBEXT=\"$(SHLIBEXT)\" \ @@ -174,6 +181,7 @@ SSSD_LIBS = \ $(PCRE_LIBS) \ $(INI_CFG_LIBS) \ $(COLLECTION_LIBS) \ + $(DHASH_LIBS) \ $(REPLACE_LIBS) \ $(NSS_LIBS) \ libsss_crypt.la diff --git a/server/responder/common/responder.h b/server/responder/common/responder.h index 881c33067..f5cdff13a 100644 --- a/server/responder/common/responder.h +++ b/server/responder/common/responder.h @@ -29,10 +29,13 @@ #include "talloc.h" #include "tevent.h" #include "ldb.h" +#include "dhash.h" #include "sbus/sssd_dbus.h" #include "../sss_client/sss_cli.h" #include "util/btreemap.h" +extern hash_table_t *dp_requests; + /* if there is a provider other than the special local */ #define NEED_CHECK_PROVIDER(provider) \ (provider != NULL && strcmp(provider, "local") != 0) diff --git a/server/responder/common/responder_dp.c b/server/responder/common/responder_dp.c index 847bedb7a..44f035582 100644 --- a/server/responder/common/responder_dp.c +++ b/server/responder/common/responder_dp.c @@ -21,6 +21,8 @@ struct sss_dp_pvt_ctx { int retries; }; +hash_table_t *dp_requests = NULL; + static int sss_dp_conn_destructor(void *data); static void sss_dp_reconnect(struct tevent_context *ev, struct tevent_timer *te, @@ -118,6 +120,7 @@ int sss_dp_init(struct resp_ctx *rctx, struct sbus_interface *dp_intf, uint16_t cli_type, uint16_t cli_version, const char *cli_name, const char *cli_domain) { + int ret; struct sss_dp_pvt_ctx *pvt; pvt = talloc_zero(rctx, struct sss_dp_pvt_ctx); @@ -132,79 +135,190 @@ int sss_dp_init(struct resp_ctx *rctx, struct sbus_interface *dp_intf, pvt->cli_domain = talloc_strdup(pvt, cli_domain); if (!pvt->cli_domain) return ENOMEM; + /* Create a hash table to handle queued update requests */ + ret = hash_create(10, &dp_requests, NULL); + if (ret != HASH_SUCCESS) { + fprintf(stderr, "cannot create hash table (%s)\n", hash_error_string(ret)); + talloc_zfree(pvt); + return EIO; + } + sss_dp_conn_reconnect(pvt); return EOK; } +struct nss_dp_req; -struct nss_dp_req { +struct nss_dp_callback { + struct nss_dp_callback *prev; + struct nss_dp_callback *next; nss_dp_callback_t callback; + struct nss_dp_req *ndp_req; void *callback_ctx; +}; + +struct nss_dp_req { + struct tevent_context *ev; + struct nss_dp_callback *cb_list; DBusPendingCall *pending_reply; + + char *key; + dbus_uint16_t err_maj; + dbus_uint32_t err_min; + char *err_msg; }; -static int nss_dp_req_destructor(void *ptr) +static int sss_dp_callback_destructor(void *ptr) { - struct nss_dp_req *req = talloc_get_type(ptr, struct nss_dp_req); + struct nss_dp_callback *cb = talloc_get_type(ptr, struct nss_dp_callback); - if (req->pending_reply) { - dbus_pending_call_cancel(req->pending_reply); - } + DLIST_REMOVE(cb->ndp_req->cb_list, cb); - return 0; + return EOK; +} + +static int nss_dp_req_destructor(void *ptr) +{ + struct nss_dp_req *ndp_req = talloc_get_type(ptr, struct nss_dp_req); + hash_key_t key; + + /* No callbacks to invoke. Destroy the hash entry */ + key.type = HASH_KEY_STRING; + key.str = ndp_req->key; + int hret = hash_delete(dp_requests, &key); + if (hret != HASH_SUCCESS) { + DEBUG(0, ("Could not clear entry from request queue\n")); + /* This should never happen */ + return EIO; + } + return EOK; } static int nss_dp_get_reply(DBusPendingCall *pending, dbus_uint16_t *err_maj, dbus_uint32_t *err_min, - const char **err_msg); + char **err_msg); -static void nss_dp_send_acct_callback(DBusPendingCall *pending, void *ptr) +static void sss_dp_invoke_callback(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval t, void *ptr) { struct nss_dp_req *ndp_req; - dbus_uint16_t err_maj; - dbus_uint32_t err_min; - const char *err_msg; - int ret; + struct nss_dp_callback *cb; + struct timeval tv; + struct tevent_timer *tev; ndp_req = talloc_get_type(ptr, struct nss_dp_req); + if (!ndp_req) { + /* We didn't receive an nss_dp_req? */ + return; + } - /* Remove request destructor */ - talloc_set_destructor(ndp_req, NULL); + cb = ndp_req->cb_list; + cb->callback(ndp_req->err_maj, + ndp_req->err_min, + ndp_req->err_msg, + cb->callback_ctx); + + /* Free the callback memory and remove it from the list */ + talloc_zfree(cb); + + /* Call the next callback if needed */ + if (ndp_req->cb_list != NULL) { + tv = tevent_timeval_current(); + tev = tevent_add_timer(ndp_req->ev, ndp_req, tv, + sss_dp_invoke_callback, ndp_req); + if (!te) { + /* Out of memory or other serious error */ + goto done; + } + + return; + } - ret = nss_dp_get_reply(pending, &err_maj, &err_min, &err_msg); + /* No more callbacks to invoke. Destroy the hash entry */ +done: + talloc_zfree(ndp_req); +} + +static void nss_dp_send_acct_callback(DBusPendingCall *pending, void *ptr) +{ + int ret; + struct nss_dp_req *ndp_req; + struct nss_dp_callback *cb; + struct timeval tv; + struct tevent_timer *te; + + ndp_req = talloc_get_type(ptr, struct nss_dp_req); + + ret = nss_dp_get_reply(pending, + &ndp_req->err_maj, + &ndp_req->err_min, + &ndp_req->err_msg); if (ret != EOK) { if (ret == ETIME) { - err_maj = DP_ERR_TIMEOUT; - err_min = ret; - err_msg = "Request timed out"; + ndp_req->err_maj = DP_ERR_TIMEOUT; + ndp_req->err_min = ret; + ndp_req->err_msg = talloc_strdup(ndp_req, "Request timed out"); } else { - err_maj = DP_ERR_FATAL; - err_min = ret; - err_msg = "Failed to get reply from Data Provider"; + ndp_req->err_maj = DP_ERR_FATAL; + ndp_req->err_min = ret; + ndp_req->err_msg = + talloc_strdup(ndp_req, + "Failed to get reply from Data Provider"); } } - ndp_req->callback(err_maj, err_min, err_msg, ndp_req->callback_ctx); + /* Check whether we need to issue any callbacks */ + cb = ndp_req->cb_list; + if (ndp_req->cb_list == NULL) { + if (cb == NULL) { + /* No callbacks to invoke. Destroy the hash entry */ + talloc_zfree(ndp_req); + return; + } + } + + /* Queue up all callbacks */ + tv = tevent_timeval_current(); + te = tevent_add_timer(ndp_req->ev, ndp_req, tv, + sss_dp_invoke_callback, ndp_req); + if (!te) { + /* Out of memory or other serious error */ + goto error; + } + + return; - talloc_free(ndp_req); +error: + talloc_zfree(ndp_req); } +static int nss_dp_send_acct_req_create(struct resp_ctx *rctx, + TALLOC_CTX *memctx, + const char *domain, + uint32_t be_type, + char *filter, + int timeout, + nss_dp_callback_t callback, + void *callback_ctx, + struct nss_dp_req **ndp); + int nss_dp_send_acct_req(struct resp_ctx *rctx, TALLOC_CTX *memctx, nss_dp_callback_t callback, void *callback_ctx, int timeout, const char *domain, int type, const char *opt_name, uint32_t opt_id) { - struct nss_dp_req *ndp_req; - DBusMessage *msg; - DBusPendingCall *pending_reply; - DBusConnection *dbus_conn; - dbus_bool_t ret; + int ret, hret; uint32_t be_type; - const char *attrs = "core"; char *filter; + hash_key_t key; + hash_value_t value; + TALLOC_CTX *tmp_ctx; + struct nss_dp_req *ndp_req; + struct nss_dp_callback *cb; /* either, or, not both */ if (opt_name && opt_id) { @@ -215,6 +329,11 @@ int nss_dp_send_acct_req(struct resp_ctx *rctx, TALLOC_CTX *memctx, return EINVAL; } + tmp_ctx = talloc_new(NULL); + if (!tmp_ctx) { + return ENOMEM; + } + switch (type) { case NSS_DP_USER: be_type = BE_REQ_USER; @@ -229,17 +348,114 @@ int nss_dp_send_acct_req(struct resp_ctx *rctx, TALLOC_CTX *memctx, return EINVAL; } + key.type = HASH_KEY_STRING; + key.str = NULL; + if (opt_name) { - filter = talloc_asprintf(memctx, "name=%s", opt_name); + filter = talloc_asprintf(tmp_ctx, "name=%s", opt_name); + key.str = talloc_asprintf(tmp_ctx, "%d%s@%s", type, opt_name, domain); } else if (opt_id) { - filter = talloc_asprintf(memctx, "idnumber=%u", opt_id); + filter = talloc_asprintf(tmp_ctx, "idnumber=%u", opt_id); + key.str = talloc_asprintf(tmp_ctx, "%d%d@%s", type, opt_id, domain); } else { - filter = talloc_strdup(memctx, "name=*"); + filter = talloc_strdup(tmp_ctx, "name=*"); + key.str = talloc_asprintf(tmp_ctx, "%d*@%s", type, domain); } - if (!filter) { + if (!filter || !key.str) { + talloc_zfree(tmp_ctx); return ENOMEM; } + /* Check whether there's already a request in progress */ + hret = hash_lookup(dp_requests, &key, &value); + switch (hret) { + case HASH_SUCCESS: + /* Request already in progress + * Add an additional callback if needed and return + */ + DEBUG(2, ("Identical request in progress\n")); + if (callback) { + /* We have a new request asking for a callback */ + ndp_req = talloc_get_type(value.ptr, struct nss_dp_req); + if (!ndp_req) { + DEBUG(0, ("Could not retrieve DP request context\n")); + ret = EIO; + goto done; + } + + cb = talloc_zero(memctx, struct nss_dp_callback); + if (!cb) { + ret = ENOMEM; + goto done; + } + + cb->callback = callback; + cb->callback_ctx = callback_ctx; + cb->ndp_req = ndp_req; + + DLIST_ADD_END(ndp_req->cb_list, cb, struct nss_dp_callback *); + talloc_set_destructor((TALLOC_CTX *)cb, sss_dp_callback_destructor); + } + ret = EOK; + goto done; + + case HASH_ERROR_KEY_NOT_FOUND: + /* No such request in progress + * Create a new request + */ + ret = nss_dp_send_acct_req_create(rctx, memctx, domain, + be_type, filter, timeout, + callback, callback_ctx, + &ndp_req); + if (ret == EOK) { + value.type = HASH_VALUE_PTR; + value.ptr = ndp_req; + hret = hash_enter(dp_requests, &key, &value); + if (hret != HASH_SUCCESS) { + DEBUG(0, ("Could not store request query (%s)", + hash_error_string(hret))); + ret = EIO; + goto done; + } + + ndp_req->key = talloc_strdup(ndp_req, key.str); + talloc_set_destructor((TALLOC_CTX *)ndp_req, nss_dp_req_destructor); + } + break; + + default: + DEBUG(0,("Could not query request list (%s)\n", + hash_error_string(hret))); + ret = EIO; + goto done; + } + + ret = EOK; + +done: + talloc_zfree(tmp_ctx); + return ret; +} + +static int nss_dp_send_acct_req_create(struct resp_ctx *rctx, + TALLOC_CTX *memctx, + const char *domain, + uint32_t be_type, + char *filter, + int timeout, + nss_dp_callback_t callback, + void *callback_ctx, + struct nss_dp_req **ndp) +{ + DBusConnection *dbus_conn; + DBusMessage *msg; + DBusPendingCall *pending_reply; + dbus_bool_t dbret; + struct nss_dp_callback *cb; + struct nss_dp_req *ndp_req; + + const char *attrs = "core"; + /* double check dp_ctx has actually been initialized. * in some pathological cases it may happen that nss starts up before * dp connection code is actually able to establish a connection. @@ -264,20 +480,20 @@ int nss_dp_send_acct_req(struct resp_ctx *rctx, TALLOC_CTX *memctx, DEBUG(4, ("Sending request for [%s][%u][%s][%s]\n", domain, be_type, attrs, filter)); - ret = dbus_message_append_args(msg, - DBUS_TYPE_STRING, &domain, - DBUS_TYPE_UINT32, &be_type, - DBUS_TYPE_STRING, &attrs, - DBUS_TYPE_STRING, &filter, - DBUS_TYPE_INVALID); - if (!ret) { + dbret = dbus_message_append_args(msg, + DBUS_TYPE_STRING, &domain, + DBUS_TYPE_UINT32, &be_type, + DBUS_TYPE_STRING, &attrs, + DBUS_TYPE_STRING, &filter, + DBUS_TYPE_INVALID); + if (!dbret) { DEBUG(1,("Failed to build message\n")); return EIO; } - ret = dbus_connection_send_with_reply(dbus_conn, msg, - &pending_reply, timeout); - if (!ret || pending_reply == NULL) { + dbret = dbus_connection_send_with_reply(dbus_conn, msg, + &pending_reply, timeout); + if (!dbret || pending_reply == NULL) { /* * Critical Failure * We can't communicate on this connection @@ -288,31 +504,52 @@ int nss_dp_send_acct_req(struct resp_ctx *rctx, TALLOC_CTX *memctx, return EIO; } - ndp_req = talloc_zero(memctx, struct nss_dp_req); + ndp_req = talloc_zero(NULL, struct nss_dp_req); if (!ndp_req) { dbus_message_unref(msg); return ENOMEM; } - ndp_req->callback = callback; - ndp_req->callback_ctx = callback_ctx; - /* set up destructor */ - ndp_req->pending_reply = pending_reply; - talloc_set_destructor((TALLOC_CTX *)ndp_req, nss_dp_req_destructor); + ndp_req->ev = rctx->ev; + + if (callback) { + cb = talloc_zero(memctx, struct nss_dp_callback); + if (!cb) { + dbus_message_unref(msg); + talloc_zfree(ndp_req); + return ENOMEM; + } + cb->callback = callback; + cb->callback_ctx = callback_ctx; + cb->ndp_req = ndp_req; + + DLIST_ADD(ndp_req->cb_list, cb); + talloc_set_destructor((TALLOC_CTX *)cb, sss_dp_callback_destructor); + } /* Set up the reply handler */ - dbus_pending_call_set_notify(pending_reply, - nss_dp_send_acct_callback, - ndp_req, NULL); + dbret = dbus_pending_call_set_notify(pending_reply, + nss_dp_send_acct_callback, + ndp_req, NULL); + if (!dbret) { + DEBUG(0, ("Could not queue up pending request!")); + talloc_zfree(ndp_req); + dbus_pending_call_cancel(pending_reply); + dbus_message_unref(msg); + return EIO; + } + dbus_message_unref(msg); + *ndp = ndp_req; + return EOK; } static int nss_dp_get_reply(DBusPendingCall *pending, dbus_uint16_t *err_maj, dbus_uint32_t *err_min, - const char **err_msg) + char **err_msg) { DBusMessage *reply; DBusError dbus_error; -- cgit