summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRaghavendra Gowdappa <rgowdapp@redhat.com>2018-10-31 16:10:58 +0530
committerRaghavendra G <rgowdapp@redhat.com>2018-11-29 01:19:12 +0000
commit95e380eca19b9f0d03a53429535f15556e5724ad (patch)
treebe32fca4fbfa7d29e8571545af26e784d34e294c
parentf0232d07f7e6543b56830be28f6e80f9085e6241 (diff)
downloadglusterfs-95e380eca19b9f0d03a53429535f15556e5724ad.tar.gz
glusterfs-95e380eca19b9f0d03a53429535f15556e5724ad.tar.xz
glusterfs-95e380eca19b9f0d03a53429535f15556e5724ad.zip
rpcsvc: provide each request handler thread its own queue
A single global per program queue is contended by all request handler threads and event threads. This can lead to high contention. So, reduce the contention by providing each request handler thread its own private queue. Thanks to "Manoj Pillai"<mpillai@redhat.com> for the idea of pairing a single queue with a fixed request-handler-thread and event-thread, which brought down the performance regression due to overhead of queuing significantly. Thanks to "Xavi Hernandez"<xhernandez@redhat.com> for discussion on how to communicate the event-thread death to request-handler-thread. Thanks to "Karan Sandha"<ksandha@redhat.com> for voluntarily running the perf benchmarks to qualify that performance regression introduced by ping-timer-fixes is fixed with this patch and patiently running many iterations of regression tests while RCAing the issue. Thanks to "Milind Changire"<mchangir@redhat.com> for patiently running the many iterations of perf benchmarking tests while RCAing the regression caused by ping-timer-expiry fixes. Change-Id: I578c3fc67713f4234bd3abbec5d3fbba19059ea5 Fixes: bz#1644629 Signed-off-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
-rw-r--r--cli/src/cli-rl.c5
-rw-r--r--libglusterfs/src/event-epoll.c114
-rw-r--r--libglusterfs/src/event-poll.c10
-rw-r--r--libglusterfs/src/event.c10
-rw-r--r--libglusterfs/src/gf-event.h19
-rw-r--r--rpc/rpc-lib/src/autoscale-threads.c1
-rw-r--r--rpc/rpc-lib/src/libgfrpc.sym1
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c6
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c4
-rw-r--r--rpc/rpc-lib/src/rpc-transport.h3
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c412
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h34
-rw-r--r--rpc/rpc-transport/socket/src/socket.c28
-rw-r--r--xlators/protocol/server/src/server.c9
14 files changed, 479 insertions, 177 deletions
diff --git a/cli/src/cli-rl.c b/cli/src/cli-rl.c
index 7831d0bcb4..38aa6f4b7a 100644
--- a/cli/src/cli-rl.c
+++ b/cli/src/cli-rl.c
@@ -104,7 +104,7 @@ cli_rl_process_line(char *line)
int
cli_rl_stdin(int fd, int idx, int gen, void *data, int poll_out, int poll_in,
- int poll_err)
+ int poll_err, char event_thread_died)
{
struct cli_state *state = NULL;
@@ -376,7 +376,8 @@ cli_rl_enable(struct cli_state *state)
goto out;
}
- ret = event_register(state->ctx->event_pool, 0, cli_rl_stdin, state, 1, 0);
+ ret = event_register(state->ctx->event_pool, 0, cli_rl_stdin, state, 1, 0,
+ 0);
if (ret == -1)
goto out;
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c
index 9826cc9e27..041a7e6c58 100644
--- a/libglusterfs/src/event-epoll.c
+++ b/libglusterfs/src/event-epoll.c
@@ -30,6 +30,7 @@ struct event_slot_epoll {
int fd;
int events;
int gen;
+ int idx;
gf_atomic_t ref;
int do_close;
int in_handler;
@@ -37,6 +38,7 @@ struct event_slot_epoll {
void *data;
event_handler_t handler;
gf_lock_t lock;
+ struct list_head poller_death;
};
struct event_thread_data {
@@ -57,6 +59,7 @@ __event_newtable(struct event_pool *event_pool, int table_idx)
for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {
table[i].fd = -1;
LOCK_INIT(&table[i].lock);
+ INIT_LIST_HEAD(&table[i].poller_death);
}
event_pool->ereg[table_idx] = table;
@@ -66,7 +69,8 @@ __event_newtable(struct event_pool *event_pool, int table_idx)
}
static int
-__event_slot_alloc(struct event_pool *event_pool, int fd)
+__event_slot_alloc(struct event_pool *event_pool, int fd,
+ char notify_poller_death)
{
int i = 0;
int table_idx = -1;
@@ -109,8 +113,15 @@ __event_slot_alloc(struct event_pool *event_pool, int fd)
table[i].gen = gen + 1;
LOCK_INIT(&table[i].lock);
+ INIT_LIST_HEAD(&table[i].poller_death);
table[i].fd = fd;
+ if (notify_poller_death) {
+ table[i].idx = table_idx * EVENT_EPOLL_SLOTS + i;
+ list_add_tail(&table[i].poller_death,
+ &event_pool->poller_death);
+ }
+
event_pool->slots_used[table_idx]++;
break;
@@ -121,13 +132,14 @@ __event_slot_alloc(struct event_pool *event_pool, int fd)
}
static int
-event_slot_alloc(struct event_pool *event_pool, int fd)
+event_slot_alloc(struct event_pool *event_pool, int fd,
+ char notify_poller_death)
{
int idx = -1;
pthread_mutex_lock(&event_pool->mutex);
{
- idx = __event_slot_alloc(event_pool, fd);
+ idx = __event_slot_alloc(event_pool, fd, notify_poller_death);
}
pthread_mutex_unlock(&event_pool->mutex);
@@ -155,6 +167,7 @@ __event_slot_dealloc(struct event_pool *event_pool, int idx)
slot->fd = -1;
slot->handled_error = 0;
slot->in_handler = 0;
+ list_del_init(&slot->poller_death);
event_pool->slots_used[table_idx]--;
return;
@@ -172,6 +185,15 @@ event_slot_dealloc(struct event_pool *event_pool, int idx)
return;
}
+static int
+event_slot_ref(struct event_slot_epoll *slot)
+{
+ if (!slot)
+ return -1;
+
+ return GF_ATOMIC_INC(slot->ref);
+}
+
static struct event_slot_epoll *
event_slot_get(struct event_pool *event_pool, int idx)
{
@@ -188,12 +210,41 @@ event_slot_get(struct event_pool *event_pool, int idx)
return NULL;
slot = &table[offset];
- GF_ATOMIC_INC(slot->ref);
+ event_slot_ref(slot);
return slot;
}
static void
+__event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot,
+ int idx)
+{
+ int ref = -1;
+ int fd = -1;
+ int do_close = 0;
+
+ ref = GF_ATOMIC_DEC(slot->ref);
+ if (ref)
+ /* slot still alive */
+ goto done;
+
+ LOCK(&slot->lock);
+ {
+ fd = slot->fd;
+ do_close = slot->do_close;
+ slot->do_close = 0;
+ }
+ UNLOCK(&slot->lock);
+
+ __event_slot_dealloc(event_pool, idx);
+
+ if (do_close)
+ sys_close(fd);
+done:
+ return;
+}
+
+static void
event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot,
int idx)
{
@@ -248,7 +299,7 @@ event_pool_new_epoll(int count, int eventthreadcount)
event_pool->fd = epfd;
event_pool->count = count;
-
+ INIT_LIST_HEAD(&event_pool->poller_death);
event_pool->eventthreadcount = eventthreadcount;
event_pool->auto_thread_count = 0;
@@ -297,7 +348,7 @@ __slot_update_events(struct event_slot_epoll *slot, int poll_in, int poll_out)
int
event_register_epoll(struct event_pool *event_pool, int fd,
event_handler_t handler, void *data, int poll_in,
- int poll_out)
+ int poll_out, char notify_poller_death)
{
int idx = -1;
int ret = -1;
@@ -328,7 +379,7 @@ event_register_epoll(struct event_pool *event_pool, int fd,
if (destroy == 1)
goto out;
- idx = event_slot_alloc(event_pool, fd);
+ idx = event_slot_alloc(event_pool, fd, notify_poller_death);
if (idx == -1) {
gf_msg("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND,
"could not find slot for fd=%d", fd);
@@ -591,7 +642,7 @@ pre_unlock:
ret = handler(fd, idx, gen, data,
(event->events & (EPOLLIN | EPOLLPRI)),
(event->events & (EPOLLOUT)),
- (event->events & (EPOLLERR | EPOLLHUP)));
+ (event->events & (EPOLLERR | EPOLLHUP)), 0);
}
out:
event_slot_unref(event_pool, slot, idx);
@@ -607,7 +658,9 @@ event_dispatch_epoll_worker(void *data)
struct event_thread_data *ev_data = data;
struct event_pool *event_pool;
int myindex = -1;
- int timetodie = 0;
+ int timetodie = 0, gen = 0;
+ struct list_head poller_death_notify;
+ struct event_slot_epoll *slot = NULL, *tmp = NULL;
GF_VALIDATE_OR_GOTO("event", ev_data, out);
@@ -619,7 +672,7 @@ event_dispatch_epoll_worker(void *data)
gf_msg("epoll", GF_LOG_INFO, 0, LG_MSG_STARTED_EPOLL_THREAD,
"Started"
" thread with index %d",
- myindex);
+ myindex - 1);
pthread_mutex_lock(&event_pool->mutex);
{
@@ -637,20 +690,55 @@ event_dispatch_epoll_worker(void *data)
pthread_mutex_lock(&event_pool->mutex);
{
if (event_pool->eventthreadcount < myindex) {
+ while (event_pool->poller_death_sliced) {
+ pthread_cond_wait(&event_pool->cond,
+ &event_pool->mutex);
+ }
+
+ INIT_LIST_HEAD(&poller_death_notify);
/* if found true in critical section,
* die */
event_pool->pollers[myindex - 1] = 0;
event_pool->activethreadcount--;
timetodie = 1;
+ gen = ++event_pool->poller_gen;
+ list_for_each_entry(slot, &event_pool->poller_death,
+ poller_death)
+ {
+ event_slot_ref(slot);
+ }
+
+ list_splice_init(&event_pool->poller_death,
+ &poller_death_notify);
+ event_pool->poller_death_sliced = 1;
pthread_cond_broadcast(&event_pool->cond);
}
}
pthread_mutex_unlock(&event_pool->mutex);
if (timetodie) {
+ list_for_each_entry(slot, &poller_death_notify, poller_death)
+ {
+ slot->handler(slot->fd, 0, gen, slot->data, 0, 0, 0, 1);
+ }
+
+ pthread_mutex_lock(&event_pool->mutex);
+ {
+ list_for_each_entry_safe(slot, tmp, &poller_death_notify,
+ poller_death)
+ {
+ __event_slot_unref(event_pool, slot, slot->idx);
+ }
+
+ list_splice(&poller_death_notify,
+ &event_pool->poller_death);
+ event_pool->poller_death_sliced = 0;
+ pthread_cond_broadcast(&event_pool->cond);
+ }
+ pthread_mutex_unlock(&event_pool->mutex);
+
gf_msg("epoll", GF_LOG_INFO, 0, LG_MSG_EXITED_EPOLL_THREAD,
- "Exited "
- "thread with index %d",
- myindex);
+ "Exited thread with index %d", myindex);
+
goto out;
}
}
diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c
index 727d2a000a..5bac4291c4 100644
--- a/libglusterfs/src/event-poll.c
+++ b/libglusterfs/src/event-poll.c
@@ -33,11 +33,11 @@ struct event_slot_poll {
static int
event_register_poll(struct event_pool *event_pool, int fd,
event_handler_t handler, void *data, int poll_in,
- int poll_out);
+ int poll_out, char notify_poller_death);
static int
__flush_fd(int fd, int idx, int gen, void *data, int poll_in, int poll_out,
- int poll_err)
+ int poll_err, char event_thread_died)
{
char buf[64];
int ret = -1;
@@ -146,7 +146,7 @@ event_pool_new_poll(int count, int eventthreadcount)
}
ret = event_register_poll(event_pool, event_pool->breaker[0], __flush_fd,
- NULL, 1, 0);
+ NULL, 1, 0, 0);
if (ret == -1) {
gf_msg("poll", GF_LOG_ERROR, 0, LG_MSG_REGISTER_PIPE_FAILED,
"could not register pipe fd with poll event loop");
@@ -180,7 +180,7 @@ event_pool_new_poll(int count, int eventthreadcount)
static int
event_register_poll(struct event_pool *event_pool, int fd,
event_handler_t handler, void *data, int poll_in,
- int poll_out)
+ int poll_out, char notify_poller_death)
{
int idx = -1;
@@ -378,7 +378,7 @@ unlock:
ret = handler(ufds[i].fd, idx, 0, data,
(ufds[i].revents & (POLLIN | POLLPRI)),
(ufds[i].revents & (POLLOUT)),
- (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL)));
+ (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL)), 0);
return ret;
}
diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c
index 49f70c8336..ddba9810b0 100644
--- a/libglusterfs/src/event.c
+++ b/libglusterfs/src/event.c
@@ -54,14 +54,14 @@ event_pool_new(int count, int eventthreadcount)
int
event_register(struct event_pool *event_pool, int fd, event_handler_t handler,
- void *data, int poll_in, int poll_out)
+ void *data, int poll_in, int poll_out, char notify_poller_death)
{
int ret = -1;
GF_VALIDATE_OR_GOTO("event", event_pool, out);
- ret = event_pool->ops->event_register(event_pool, fd, handler, data,
- poll_in, poll_out);
+ ret = event_pool->ops->event_register(
+ event_pool, fd, handler, data, poll_in, poll_out, notify_poller_death);
out:
return ret;
}
@@ -161,7 +161,7 @@ out:
int
poller_destroy_handler(int fd, int idx, int gen, void *data, int poll_out,
- int poll_in, int poll_err)
+ int poll_in, int poll_err, char event_thread_exit)
{
struct event_destroy_data *destroy = NULL;
int readfd = -1, ret = -1;
@@ -233,7 +233,7 @@ event_dispatch_destroy(struct event_pool *event_pool)
/* From the main thread register an event on the pipe fd[0],
*/
- idx = event_register(event_pool, fd[0], poller_destroy_handler, &data, 1,
+ idx = event_register(event_pool, fd[0], poller_destroy_handler, &data, 1, 0,
0);
if (idx < 0)
goto out;
diff --git a/libglusterfs/src/gf-event.h b/libglusterfs/src/gf-event.h
index 5c3724cc95..5d92a2dd28 100644
--- a/libglusterfs/src/gf-event.h
+++ b/libglusterfs/src/gf-event.h
@@ -12,6 +12,7 @@
#define _GF_EVENT_H_
#include <pthread.h>
+#include "list.h"
struct event_pool;
struct event_ops;
@@ -23,7 +24,8 @@ struct event_data {
} __attribute__((__packed__, __may_alias__));
typedef int (*event_handler_t)(int fd, int idx, int gen, void *data,
- int poll_in, int poll_out, int poll_err);
+ int poll_in, int poll_out, int poll_err,
+ char event_thread_exit);
#define EVENT_EPOLL_TABLES 1024
#define EVENT_EPOLL_SLOTS 1024
@@ -40,6 +42,13 @@ struct event_pool {
struct event_slot_epoll *ereg[EVENT_EPOLL_TABLES];
int slots_used[EVENT_EPOLL_TABLES];
+ struct list_head poller_death;
+ int poller_death_sliced; /* track whether the list of fds interested
+ * poller_death is sliced. If yes, new thread death
+ * notification has to wait till the list is added
+ * back
+ */
+ int poller_gen;
int used;
int changed;
@@ -52,8 +61,8 @@ struct event_pool {
/* NOTE: Currently used only when event processing is done using
* epoll. */
int eventthreadcount; /* number of event threads to execute. */
- pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store,
- * and live status */
+ pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store, and live
+ status */
int destroy;
int activethreadcount;
@@ -81,7 +90,7 @@ struct event_ops {
int (*event_register)(struct event_pool *event_pool, int fd,
event_handler_t handler, void *data, int poll_in,
- int poll_out);
+ int poll_out, char notify_poller_death);
int (*event_select_on)(struct event_pool *event_pool, int fd, int idx,
int poll_in, int poll_out);
@@ -107,7 +116,7 @@ event_select_on(struct event_pool *event_pool, int fd, int idx, int poll_in,
int poll_out);
int
event_register(struct event_pool *event_pool, int fd, event_handler_t handler,
- void *data, int poll_in, int poll_out);
+ void *data, int poll_in, int poll_out, char notify_poller_death);
int
event_unregister(struct event_pool *event_pool, int fd, int idx);
int
diff --git a/rpc/rpc-lib/src/autoscale-threads.c b/rpc/rpc-lib/src/autoscale-threads.c
index 337f002df1..d629a1cd43 100644
--- a/rpc/rpc-lib/src/autoscale-threads.c
+++ b/rpc/rpc-lib/src/autoscale-threads.c
@@ -19,5 +19,4 @@ rpcsvc_autoscale_threads(glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr)
pool->auto_thread_count += incr;
(void)event_reconfigure_threads(pool, thread_count + incr);
- rpcsvc_ownthread_reconf(rpc, pool->eventthreadcount);
}
diff --git a/rpc/rpc-lib/src/libgfrpc.sym b/rpc/rpc-lib/src/libgfrpc.sym
index a7cb5f6b5c..4f42485044 100644
--- a/rpc/rpc-lib/src/libgfrpc.sym
+++ b/rpc/rpc-lib/src/libgfrpc.sym
@@ -51,7 +51,6 @@ rpcsvc_transport_connect
rpcsvc_transport_getpeeraddr
rpcsvc_unregister_notify
rpcsvc_volume_allowed
-rpcsvc_ownthread_reconf
rpc_transport_count
rpc_transport_connect
rpc_transport_disconnect
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 2505998b3d..b26d645bb1 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -969,6 +969,12 @@ rpc_clnt_notify(rpc_transport_t *trans, void *mydata,
*/
ret = 0;
break;
+
+ case RPC_TRANSPORT_EVENT_THREAD_DIED:
+ /* only meaningful on a server, no need of handling this event on a
+ * client */
+ ret = 0;
+ break;
}
out:
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index d70334476c..54636dcbf0 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -266,6 +266,10 @@ rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
goto fail;
}
+ if (dict_get(options, "notify-poller-death")) {
+ trans->notify_poller_death = 1;
+ }
+
gf_log("rpc-transport", GF_LOG_DEBUG, "attempt to load file %s", name);
handle = dlopen(name, RTLD_NOW);
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index c238501b5c..fd737d0c76 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -97,6 +97,7 @@ typedef enum {
RPC_TRANSPORT_MSG_RECEIVED, /* Complete rpc msg has been read */
RPC_TRANSPORT_CONNECT, /* client is connected to server */
RPC_TRANSPORT_MSG_SENT,
+ RPC_TRANSPORT_EVENT_THREAD_DIED /* event-thread has died */
} rpc_transport_event_t;
struct rpc_transport_msg {
@@ -213,6 +214,8 @@ struct rpc_transport {
* layer or in client management notification handler functions
*/
gf_boolean_t connect_failed;
+ char notify_poller_death;
+ char poller_death_accept;
};
struct rpc_transport_ops {
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index c6545193a1..d678bca43a 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -36,6 +36,7 @@
#include <fnmatch.h>
#include <stdarg.h>
#include <stdio.h>
+#include <math.h>
#ifdef IPV6_DEFAULT
#include <netconfig.h>
@@ -63,10 +64,76 @@ rpcsvc_get_listener(rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans);
int
rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event,
void *data, ...);
+void *
+rpcsvc_request_handler(void *arg);
static int
rpcsvc_match_subnet_v4(const char *addrtok, const char *ipaddr);
+void
+rpcsvc_toggle_queue_status(rpcsvc_program_t *prog,
+ rpcsvc_request_queue_t *queue, char status[])
+{
+ int queue_index = 0, status_index = 0, set_bit = 0;
+
+ if (queue != &prog->request_queue[0]) {
+ queue_index = (queue - &prog->request_queue[0]);
+ }
+
+ status_index = queue_index / 8;
+ set_bit = queue_index % 8;
+
+ status[status_index] ^= (1 << set_bit);
+
+ return;
+}
+
+static int
+get_rightmost_set_bit(int n)
+{
+ return log2(n & -n);
+}
+
+int
+rpcsvc_get_free_queue_index(rpcsvc_program_t *prog)
+{
+ int queue_index = 0, max_index = 0, i = 0;
+ unsigned int right_most_unset_bit = 0;
+
+ right_most_unset_bit = 8;
+
+ max_index = gf_roof(EVENT_MAX_THREADS, 8) / 8;
+ for (i = 0; i < max_index; i++) {
+ if (prog->request_queue_status[i] == 0) {
+ right_most_unset_bit = 0;
+ break;
+ } else {
+ right_most_unset_bit = get_rightmost_set_bit(
+ ~prog->request_queue_status[i]);
+ if (right_most_unset_bit < 8) {
+ break;
+ }
+ }
+ }
+
+ if (right_most_unset_bit > 7) {
+ queue_index = -1;
+ } else {
+ queue_index = i * 8;
+ queue_index += right_most_unset_bit;
+
+ if (queue_index > EVENT_MAX_THREADS) {
+ queue_index = -1;
+ }
+ }
+
+ if (queue_index != -1) {
+ prog->request_queue_status[i] |= (0x1 << right_most_unset_bit);
+ }
+
+ return queue_index;
+}
+
rpcsvc_notify_wrapper_t *
rpcsvc_notify_wrapper_alloc(void)
{
@@ -575,6 +642,73 @@ rpcsvc_check_and_reply_error(int ret, call_frame_t *frame, void *opaque)
return 0;
}
+void
+rpcsvc_queue_event_thread_death(rpcsvc_t *svc, rpcsvc_program_t *prog, int gen)
+{
+ rpcsvc_request_queue_t *queue = NULL;
+ int num = 0;
+ void *value = NULL;
+ rpcsvc_request_t *req = NULL;
+ char empty = 0;
+
+ value = pthread_getspecific(prog->req_queue_key);
+ if (value == NULL) {
+ return;
+ }
+
+ num = ((unsigned long)value) - 1;
+
+ queue = &prog->request_queue[num];
+
+ if (queue->gen == gen) {
+ /* duplicate event */
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "not queuing duplicate event thread death. "
+ "queue %d program %s",
+ num, prog->progname);
+ return;
+ }
+
+ rpcsvc_alloc_request(svc, req);
+ req->prognum = RPCSVC_INFRA_PROGRAM;
+ req->procnum = RPCSVC_PROC_EVENT_THREAD_DEATH;
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "queuing event thread death request to queue %d of program %s", num,
+ prog->progname);
+
+ pthread_mutex_lock(&queue->queue_lock);
+ {
+ empty = list_empty(&queue->request_queue);
+
+ list_add_tail(&req->request_list, &queue->request_queue);
+ queue->gen = gen;
+
+ if (empty && queue->waiting)
+ pthread_cond_signal(&queue->queue_cond);
+ }
+ pthread_mutex_unlock(&queue->queue_lock);
+
+ return;
+}
+
+int
+rpcsvc_handle_event_thread_death(rpcsvc_t *svc, rpc_transport_t *trans, int gen)
+{
+ rpcsvc_program_t *prog = NULL;
+
+ pthread_rwlock_rdlock(&svc->rpclock);
+ {
+ list_for_each_entry(prog, &svc->programs, program)
+ {
+ if (prog->ownthread)
+ rpcsvc_queue_event_thread_death(svc, prog, gen);
+ }
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
+
+ return 0;
+}
+
int
rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,
rpc_transport_pollin_t *msg)
@@ -585,9 +719,12 @@ rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,
int ret = -1;
uint16_t port = 0;
gf_boolean_t is_unix = _gf_false, empty = _gf_false;
- gf_boolean_t unprivileged = _gf_false;
+ gf_boolean_t unprivileged = _gf_false, spawn_request_handler = 0;
drc_cached_op_t *reply = NULL;
rpcsvc_drc_globals_t *drc = NULL;
+ rpcsvc_request_queue_t *queue = NULL;
+ long num = 0;
+ void *value = NULL;
if (!trans || !svc)
return -1;
@@ -700,19 +837,81 @@ rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,
ret = synctask_new(THIS->ctx->env, (synctask_fn_t)actor_fn,
rpcsvc_check_and_reply_error, NULL, req);
} else if (req->ownthread) {
- pthread_mutex_lock(&req->prog->queue_lock);
+ value = pthread_getspecific(req->prog->req_queue_key);
+ if (value == NULL) {
+ pthread_mutex_lock(&req->prog->thr_lock);
+ {
+ num = rpcsvc_get_free_queue_index(req->prog);
+ if (num != -1) {
+ num++;
+ value = (void *)num;
+ ret = pthread_setspecific(req->prog->req_queue_key,
+ value);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
+ "setting request queue in TLS failed");
+ rpcsvc_toggle_queue_status(
+ req->prog, &req->prog->request_queue[num - 1],
+ req->prog->request_queue_status);
+ num = -1;
+ } else {
+ spawn_request_handler = 1;
+ }
+ }
+ }
+ pthread_mutex_unlock(&req->prog->thr_lock);
+ }
+
+ if (num == -1)
+ goto noqueue;
+
+ num = ((unsigned long)value) - 1;
+
+ queue = &req->prog->request_queue[num];
+
+ if (spawn_request_handler) {
+ ret = gf_thread_create(&queue->thread, NULL,
+ rpcsvc_request_handler, queue,
+ "rpcrqhnd");
+ if (!ret) {
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "spawned a request handler thread for queue %d",
+ (int)num);
+
+ req->prog->threadcount++;
+ } else {
+ gf_log(
+ GF_RPCSVC, GF_LOG_INFO,
+ "spawning a request handler thread for queue %d failed",
+ (int)num);
+ ret = pthread_setspecific(req->prog->req_queue_key, 0);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
+ "resetting request queue in TLS failed");
+ }
+
+ rpcsvc_toggle_queue_status(
+ req->prog, &req->prog->request_queue[num - 1],
+ req->prog->request_queue_status);
+
+ goto noqueue;
+ }
+ }
+
+ pthread_mutex_lock(&queue->queue_lock);
{
- empty = list_empty(&req->prog->request_queue);
+ empty = list_empty(&queue->request_queue);
- list_add_tail(&req->request_list, &req->prog->request_queue);
+ list_add_tail(&req->request_list, &queue->request_queue);
- if (empty)
- pthread_cond_signal(&req->prog->queue_cond);
+ if (empty && queue->waiting)
+ pthread_cond_signal(&queue->queue_cond);
}
- pthread_mutex_unlock(&req->prog->queue_lock);
+ pthread_mutex_unlock(&queue->queue_lock);
ret = 0;
} else {
+ noqueue:
ret = actor_fn(req);
}
}
@@ -839,6 +1038,12 @@ rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event,
"got MAP_XID event, which should have not come");
ret = 0;
break;
+
+ case RPC_TRANSPORT_EVENT_THREAD_DIED:
+ rpcsvc_handle_event_thread_death(svc, trans,
+ (int)(unsigned long)data);
+ ret = 0;
+ break;
}
out:
@@ -1877,6 +2082,7 @@ rpcsvc_create_listeners(rpcsvc_t *svc, dict_t *options, char *name)
goto out;
}
+ dict_del(options, "notify-poller-death");
GF_FREE(transport_name);
transport_name = NULL;
count++;
@@ -1961,55 +2167,86 @@ out:
void *
rpcsvc_request_handler(void *arg)
{
- rpcsvc_program_t *program = arg;
- rpcsvc_request_t *req = NULL;
+ rpcsvc_request_queue_t *queue = NULL;
+ rpcsvc_program_t *program = NULL;
+ rpcsvc_request_t *req = NULL, *tmp_req = NULL;
rpcsvc_actor_t *actor = NULL;
gf_boolean_t done = _gf_false;
int ret = 0;
+ struct list_head tmp_list = {
+ 0,
+ };
+
+ queue = arg;
+ program = queue->program;
+
+ INIT_LIST_HEAD(&tmp_list);
if (!program)
return NULL;
while (1) {
- pthread_mutex_lock(&program->queue_lock);
+ pthread_mutex_lock(&queue->queue_lock);
{
- if (!program->alive && list_empty(&program->request_queue)) {
+ if (!program->alive && list_empty(&queue->request_queue)) {
done = 1;
goto unlock;
}
- while (list_empty(&program->request_queue) &&
- (program->threadcount <= program->eventthreadcount)) {
- pthread_cond_wait(&program->queue_cond, &program->queue_lock);
+ while (list_empty(&queue->request_queue)) {
+ queue->waiting = _gf_true;
+ pthread_cond_wait(&queue->queue_cond, &queue->queue_lock);
}
- if (program->threadcount > program->eventthreadcount) {
- done = 1;
- program->threadcount--;
+ queue->waiting = _gf_false;
- gf_log(GF_RPCSVC, GF_LOG_INFO,
- "program '%s' thread terminated; "
- "total count:%d",
- program->progname, program->threadcount);
- } else if (!list_empty(&program->request_queue)) {
- req = list_entry(program->request_queue.next, typeof(*req),
- request_list);
-
- list_del_init(&req->request_list);
+ if (!list_empty(&queue->request_queue)) {
+ INIT_LIST_HEAD(&tmp_list);
+ list_splice_init(&queue->request_queue, &tmp_list);
}
}
unlock:
- pthread_mutex_unlock(&program->queue_lock);
-
- if (req) {
- THIS = req->svc->xl;
- actor = rpcsvc_program_actor(req);
- ret = actor->actor(req);
+ pthread_mutex_unlock(&queue->queue_lock);
- if (ret != 0) {
- rpcsvc_check_and_reply_error(ret, NULL, req);
+ list_for_each_entry_safe(req, tmp_req, &tmp_list, request_list)
+ {
+ list_del_init(&req->request_list);
+
+ if (req) {
+ if (req->prognum == RPCSVC_INFRA_PROGRAM) {
+ switch (req->procnum) {
+ case RPCSVC_PROC_EVENT_THREAD_DEATH:
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "event thread died, exiting request handler "
+ "thread for queue %d of program %s",
+ (int)(queue - &program->request_queue[0]),
+ program->progname);
+ done = 1;
+ pthread_mutex_lock(&program->thr_lock);
+ {
+ rpcsvc_toggle_queue_status(
+ program, queue,
+ program->request_queue_status);
+ program->threadcount--;
+ }
+ pthread_mutex_unlock(&program->thr_lock);
+ rpcsvc_request_destroy(req);
+ break;
+
+ default:
+ break;
+ }
+ } else {
+ THIS = req->svc->xl;
+ actor = rpcsvc_program_actor(req);
+ ret = actor->actor(req);
+
+ if (ret != 0) {
+ rpcsvc_check_and_reply_error(ret, NULL, req);
+ }
+ req = NULL;
+ }
}
- req = NULL;
}
if (done)
@@ -2020,59 +2257,10 @@ rpcsvc_request_handler(void *arg)
}
int
-rpcsvc_spawn_threads(rpcsvc_t *svc, rpcsvc_program_t *program)
-{
- int ret = 0, delta = 0, creates = 0;
-
- if (!program || !svc)
- goto out;
-
- pthread_mutex_lock(&program->queue_lock);
- {
- delta = program->eventthreadcount - program->threadcount;
-
- if (delta >= 0) {
- while (delta--) {
- ret = gf_thread_create(&program->thread, NULL,
- rpcsvc_request_handler, program,
- "rpcrqhnd");
- if (!ret) {
- program->threadcount++;
- creates++;
- }
- }
-
- if (creates) {
- gf_log(GF_RPCSVC, GF_LOG_INFO,
- "spawned %d threads for program '%s'; "
- "total count:%d",
- creates, program->progname, program->threadcount);
- }
- } else {
- gf_log(GF_RPCSVC, GF_LOG_INFO,
- "terminating %d threads for program '%s'", -delta,
- program->progname);
-
- /* this signal is to just wake up the threads so they
- * test for the change in eventthreadcount and kill
- * themselves until the program thread count becomes
- * equal to the event thread count
- */
- pthread_cond_broadcast(&program->queue_cond);
- }
- }
- pthread_mutex_unlock(&program->queue_lock);
-
-out:
- return creates;
-}
-
-int
rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,
gf_boolean_t add_to_head)
{
- int ret = -1;
- int creates = -1;
+ int ret = -1, i = 0;
rpcsvc_program_t *newprog = NULL;
char already_registered = 0;
@@ -2110,9 +2298,16 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,
memcpy(newprog, program, sizeof(*program));
INIT_LIST_HEAD(&newprog->program);
- INIT_LIST_HEAD(&newprog->request_queue);
- pthread_mutex_init(&newprog->queue_lock, NULL);
- pthread_cond_init(&newprog->queue_cond, NULL);
+
+ for (i = 0; i < EVENT_MAX_THREADS; i++) {
+ INIT_LIST_HEAD(&newprog->request_queue[i].request_queue);
+ pthread_mutex_init(&newprog->request_queue[i].queue_lock, NULL);
+ pthread_cond_init(&newprog->request_queue[i].queue_cond, NULL);
+ newprog->request_queue[i].program = newprog;
+ }
+
+ pthread_mutex_init(&newprog->thr_lock, NULL);
+ pthread_cond_init(&newprog->thr_cond, NULL);
newprog->alive = _gf_true;
@@ -2121,12 +2316,11 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,
newprog->ownthread = _gf_false;
if (newprog->ownthread) {
- newprog->eventthreadcount = 1;
- creates = rpcsvc_spawn_threads(svc, newprog);
+ struct event_pool *ep = svc->ctx->event_pool;
+ newprog->eventthreadcount = ep->eventthreadcount;
- if (creates < 1) {
- goto out;
- }
+ pthread_key_create(&newprog->req_queue_key, NULL);
+ newprog->thr_queue = 1;
}
pthread_rwlock_wrlock(&svc->rpclock);
@@ -3003,38 +3197,6 @@ out:
return ret;
}
-/* During reconfigure, Make sure to call this function after event-threads are
- * reconfigured as programs' threadcount will be made equal to event threads.
- */
-
-int
-rpcsvc_ownthread_reconf(rpcsvc_t *svc, int new_eventthreadcount)
-{
- int ret = -1;
- rpcsvc_program_t *program = NULL;
-
- if (!svc) {
- ret = 0;
- goto out;
- }
-
- pthread_rwlock_wrlock(&svc->rpclock);
- {
- list_for_each_entry(program, &svc->programs, program)
- {
- if (program->ownthread) {
- program->eventthreadcount = new_eventthreadcount;
- rpcsvc_spawn_threads(svc, program);
- }
- }
- }
- pthread_rwlock_unlock(&svc->rpclock);
-
- ret = 0;
-out:
- return ret;
-}
-
rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = {
[GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA},
[GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA},
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index ebb836fba3..8388dd404c 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -33,6 +33,16 @@
#define MAX_IOVEC 16
#endif
+/* TODO: we should store prognums at a centralized location to avoid conflict
+ or use a robust random number generator to avoid conflicts
+*/
+
+#define RPCSVC_INFRA_PROGRAM 7712846 /* random number */
+
+typedef enum {
+ RPCSVC_PROC_EVENT_THREAD_DEATH = 0,
+} rpcsvc_infra_procnum_t;
+
#define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT \
64 /* Default for protocol/server */
#define RPCSVC_DEF_NFS_OUTSTANDING_RPC_LIMIT 16 /* Default for nfs/server */
@@ -362,6 +372,16 @@ typedef struct rpcsvc_actor_desc {
drc_op_type_t op_type;
} rpcsvc_actor_t;
+typedef struct rpcsvc_request_queue {
+ int gen;
+ struct list_head request_queue;
+ pthread_mutex_t queue_lock;
+ pthread_cond_t queue_cond;
+ pthread_t thread;
+ struct rpcsvc_program *program;
+ gf_boolean_t waiting;
+} rpcsvc_request_queue_t;
+
/* Describes a program and its version along with the function pointers
* required to handle the procedures/actors of each program/version.
* Never changed ever by any thread so no need for a lock.
@@ -421,11 +441,14 @@ struct rpcsvc_program {
gf_boolean_t synctask;
/* list member to link to list of registered services with rpcsvc */
struct list_head program;
- struct list_head request_queue;
- pthread_mutex_t queue_lock;
- pthread_cond_t queue_cond;
- pthread_t thread;
+ rpcsvc_request_queue_t request_queue[EVENT_MAX_THREADS];
+ char request_queue_status[EVENT_MAX_THREADS / 8 + 1];
+ pthread_mutex_t thr_lock;
+ pthread_cond_t thr_cond;
int threadcount;
+ int thr_queue;
+ pthread_key_t req_queue_key;
+
/* eventthreadcount is just a readonly copy of the actual value
* owned by the event sub-system
* It is used to control the scaling of rpcsvc_request_handler threads
@@ -652,9 +675,6 @@ rpcsvc_auth_array(rpcsvc_t *svc, char *volname, int *autharr, int arrlen);
rpcsvc_vector_sizer
rpcsvc_get_program_vector_sizer(rpcsvc_t *svc, uint32_t prognum,
uint32_t progver, int procnum);
-extern int
-rpcsvc_ownthread_reconf(rpcsvc_t *svc, int new_eventthreadcount);
-
void
rpcsvc_autoscale_threads(glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr);
#endif
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index dc227137d5..776e647d4f 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -2859,7 +2859,7 @@ socket_complete_connection(rpc_transport_t *this)
/* reads rpc_requests during pollin */
static int
socket_event_handler(int fd, int idx, int gen, void *data, int poll_in,
- int poll_out, int poll_err)
+ int poll_out, int poll_err, char event_thread_died)
{
rpc_transport_t *this = NULL;
socket_private_t *priv = NULL;
@@ -2869,6 +2869,11 @@ socket_event_handler(int fd, int idx, int gen, void *data, int poll_in,
this = data;
+ if (event_thread_died) {
+ /* to avoid duplicate notifications, notify only for listener sockets */
+ return 0;
+ }
+
GF_VALIDATE_OR_GOTO("socket", this, out);
GF_VALIDATE_OR_GOTO("socket", this->private, out);
GF_VALIDATE_OR_GOTO("socket", this->xl, out);
@@ -2967,7 +2972,7 @@ out:
static int
socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
- int poll_out, int poll_err)
+ int poll_out, int poll_err, char event_thread_died)
{
rpc_transport_t *this = NULL;
socket_private_t *priv = NULL;
@@ -2991,6 +2996,12 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
priv = this->private;
ctx = this->ctx;
+ if (event_thread_died) {
+ rpc_transport_notify(this, RPC_TRANSPORT_EVENT_THREAD_DIED,
+ (void *)(unsigned long)gen);
+ return 0;
+ }
+
/* NOTE:
* We have done away with the critical section in this function. since
* there's little that it helps with. There's no other code that
@@ -3099,6 +3110,7 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
new_trans->mydata = this->mydata;
new_trans->notify = this->notify;
new_trans->listener = this;
+ new_trans->notify_poller_death = this->poller_death_accept;
new_priv = new_trans->private;
if (new_sockaddr.ss_family == AF_UNIX) {
@@ -3149,9 +3161,9 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
ret = rpc_transport_notify(this, RPC_TRANSPORT_ACCEPT, new_trans);
if (ret != -1) {
- new_priv->idx = event_register(ctx->event_pool, new_sock,
- socket_event_handler, new_trans,
- 1, 0);
+ new_priv->idx = event_register(
+ ctx->event_pool, new_sock, socket_event_handler, new_trans,
+ 1, 0, new_trans->notify_poller_death);
if (new_priv->idx == -1) {
ret = -1;
gf_log(this->name, GF_LOG_ERROR,
@@ -3530,7 +3542,8 @@ socket_connect(rpc_transport_t *this, int port)
this->listener = this;
priv->idx = event_register(ctx->event_pool, priv->sock,
- socket_event_handler, this, 1, 1);
+ socket_event_handler, this, 1, 1,
+ this->notify_poller_death);
if (priv->idx == -1) {
gf_log("", GF_LOG_WARNING,
"failed to register the event; "
@@ -3709,7 +3722,8 @@ socket_listen(rpc_transport_t *this)
rpc_transport_ref(this);
priv->idx = event_register(ctx->event_pool, priv->sock,
- socket_server_event_handler, this, 1, 0);
+ socket_server_event_handler, this, 1, 0,
+ this->notify_poller_death);
if (priv->idx == -1) {
gf_log(this->name, GF_LOG_WARNING,
diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c
index 8d8e8fc571..77e5d74e7c 100644
--- a/xlators/protocol/server/src/server.c
+++ b/xlators/protocol/server/src/server.c
@@ -929,12 +929,6 @@ do_rpc:
if (ret)
goto out;
- /* rpcsvc thread reconfigure should be after events thread
- * reconfigure
- */
- new_nthread = ((struct event_pool *)(this->ctx->event_pool))
- ->eventthreadcount;
- ret = rpcsvc_ownthread_reconf(rpc_conf, new_nthread);
out:
THIS = oldTHIS;
gf_msg_debug("", 0, "returning %d", ret);
@@ -1133,6 +1127,9 @@ server_init(xlator_t *this)
ret = -1;
goto out;
}
+
+ ret = dict_set_int32(this->options, "notify-poller-death", 1);
+
ret = rpcsvc_create_listeners(conf->rpc, this->options, this->name);
if (ret < 1) {
gf_msg(this->name, GF_LOG_WARNING, 0,