summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimo Sorce <simo@redhat.com>2012-01-18 11:35:03 -0500
committerSimo Sorce <simo@redhat.com>2012-01-18 11:35:03 -0500
commit6ca28bbf8523713e34ed2a7378e21f23cf64e298 (patch)
tree795e9a23510be415e115bdd488a28c8c507115e2
parent1d62ecb4261c30c8312f765f81ad9b4c75334a33 (diff)
downloadgss-proxy-6ca28bbf8523713e34ed2a7378e21f23cf64e298.tar.gz
gss-proxy-6ca28bbf8523713e34ed2a7378e21f23cf64e298.tar.xz
gss-proxy-6ca28bbf8523713e34ed2a7378e21f23cf64e298.zip
Organize workers in free and busy lists
This avoids going through an array to chase free threads, so that assigning work is O(1) instead of O(n). Will also make easier to later change the number of available workers dynamically.
-rw-r--r--proxy/src/gp_utils.h25
-rw-r--r--proxy/src/gp_workers.c149
2 files changed, 119 insertions, 55 deletions
diff --git a/proxy/src/gp_utils.h b/proxy/src/gp_utils.h
index 47d766d..38ca400 100644
--- a/proxy/src/gp_utils.h
+++ b/proxy/src/gp_utils.h
@@ -33,6 +33,31 @@
#define _(STRING) gettext(STRING)
+/* add element to list head */
+#define LIST_ADD(list, elem) do { \
+ elem->prev = NULL; \
+ elem->next = list; \
+ if (list) { \
+ list->prev = elem; \
+ } \
+ list = elem; \
+} while (0)
+
+/* remove element from list */
+#define LIST_DEL(list, elem) do { \
+ if (elem->next) { \
+ elem->next->prev = elem->prev; \
+ } \
+ if (elem->prev) { \
+ elem->prev->next = elem->next; \
+ } \
+ if (list == elem) { \
+ list = elem->next; \
+ } \
+ elem->prev = NULL; \
+ elem->next = NULL; \
+} while (0)
+
struct gp_config {
char *config_file; /* gssproxy configuration file */
bool daemonize; /* let gssproxy daemonize */
diff --git a/proxy/src/gp_workers.c b/proxy/src/gp_workers.c
index 5ccf20f..3dcefac 100644
--- a/proxy/src/gp_workers.c
+++ b/proxy/src/gp_workers.c
@@ -51,10 +51,11 @@ struct gp_query {
};
struct gp_thread {
+ struct gp_thread *prev;
+ struct gp_thread *next;
struct gp_workers *pool;
pthread_t tid;
- /* if query is assigned, then the thread is busy */
struct gp_query *query;
pthread_mutex_t cond_mutex;
pthread_cond_t cond_wakeup;
@@ -65,7 +66,8 @@ struct gp_workers {
bool shutdown;
struct gp_query *wait_list;
struct gp_query *reply_list;
- struct gp_thread *threads;
+ struct gp_thread *free_list;
+ struct gp_thread *busy_list;
int num_threads;
int sig_pipe[2];
};
@@ -79,6 +81,7 @@ static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev);
struct gp_workers *gp_workers_init(verto_ctx *vctx, struct gp_config *cfg)
{
struct gp_workers *w;
+ struct gp_thread *t;
pthread_attr_t attr;
verto_ev *ev;
int vflags;
@@ -103,32 +106,31 @@ struct gp_workers *gp_workers_init(verto_ctx *vctx, struct gp_config *cfg)
w->num_threads = DEFAULT_WORKER_THREADS_NUM;
}
- w->threads = calloc(w->num_threads, sizeof(struct gp_thread));
- if (!w->threads) {
- ret = -1;
- goto done;
- }
-
/* make thread joinable (portability) */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
/* init all workers */
for (i = 0; i < w->num_threads; i++) {
- ret = pthread_cond_init(&w->threads[i].cond_wakeup, NULL);
+ t = calloc(1, sizeof(struct gp_thread));
+ if (!t) {
+ ret = -1;
+ goto done;
+ }
+ t->pool = w;
+ ret = pthread_cond_init(&t->cond_wakeup, NULL);
if (ret) {
goto done;
}
- ret = pthread_mutex_init(&w->threads[i].cond_mutex, NULL);
+ ret = pthread_mutex_init(&t->cond_mutex, NULL);
if (ret) {
goto done;
}
- ret = pthread_create(&w->threads[i].tid, &attr,
- gp_worker_main, &w->threads[i]);
+ ret = pthread_create(&t->tid, &attr, gp_worker_main, t);
if (ret) {
goto done;
}
- w->threads[i].pool = w;
+ LIST_ADD(w->free_list, t);
}
/* add wakeup pipe, so that threads can hand back replies to the
@@ -157,79 +159,109 @@ done:
void gp_workers_free(struct gp_workers *w)
{
- int ret;
- int i;
+ struct gp_thread *t;
void *retval;
- ret = pthread_mutex_lock(&w->lock);
- if (ret) {
- syslog(LOG_CRIT, "Couldn't get mutex!");
- return;
- }
+ /* ======> POOL LOCK */
+ pthread_mutex_lock(&w->lock);
w->shutdown = true;
- ret = pthread_mutex_unlock(&w->lock);
- if (ret) {
- syslog(LOG_CRIT, "Can't release mutex?!");
- return;
- }
+ /* <====== POOL LOCK */
+ pthread_mutex_unlock(&w->lock);
- if (w->threads) {
- for (i = 0; i < w->num_threads; i++) {
- /* wake up threads, then join them */
- /* ======> COND_MUTEX */
- pthread_mutex_lock(&w->threads[i].cond_mutex);
- pthread_cond_signal(&w->threads[i].cond_wakeup);
- /* <====== COND_MUTEX */
- pthread_mutex_unlock(&w->threads[i].cond_mutex);
+ /* we do not run the following operations within
+ * the lock, or deadlocks may arise for threads
+ * that are just finishing doing some work */
- ret = pthread_join(w->threads[i].tid, &retval);
- }
+ /* we guarantee nobody is touching these lists by
+ * preventing workers from touching the free/busy
+ * lists when a 'shutdown' is in progress */
+
+ while (w->free_list) {
+ /* pick threads one by one */
+ t = w->free_list;
+ LIST_DEL(w->free_list, t);
+
+ /* wake up threads, then join them */
+ /* ======> COND_MUTEX */
+ pthread_mutex_lock(&t->cond_mutex);
+ pthread_cond_signal(&t->cond_wakeup);
+ /* <====== COND_MUTEX */
+ pthread_mutex_unlock(&t->cond_mutex);
- free(w->threads);
- w->threads = NULL;
+ pthread_join(t->tid, &retval);
+
+ pthread_mutex_destroy(&t->cond_mutex);
+ pthread_cond_destroy(&t->cond_wakeup);
+ free(t);
}
- ret = pthread_mutex_destroy(&w->lock);
- if (ret) {
- syslog(LOG_CRIT, "Failed to destroy mutex?!");
- return;
+ /* do the same with the busy list */
+ while (w->busy_list) {
+ /* pick threads one by one */
+ t = w->busy_list;
+ LIST_DEL(w->free_list, t);
+
+ /* wake up threads, then join them */
+ /* ======> COND_MUTEX */
+ pthread_mutex_lock(&t->cond_mutex);
+ pthread_cond_signal(&t->cond_wakeup);
+ /* <====== COND_MUTEX */
+ pthread_mutex_unlock(&t->cond_mutex);
+
+ pthread_join(t->tid, &retval);
+
+ pthread_mutex_destroy(&t->cond_mutex);
+ pthread_cond_destroy(&t->cond_wakeup);
+ free(t);
}
+ close(w->sig_pipe[0]);
+ close(w->sig_pipe[1]);
+
+ pthread_mutex_destroy(&w->lock);
+
free(w);
}
static void gp_query_assign(struct gp_workers *w, struct gp_query *q)
{
- int i;
+ struct gp_thread *t = NULL;
+
/* then either find a free thread or queue in the wait list */
- for (i = 0; q != NULL && i < w->num_threads; i++) {
- if (w->threads[i].query != NULL) continue;
+ /* ======> POOL LOCK */
+ pthread_mutex_lock(&w->lock);
+ if (w->free_list) {
+ t = w->free_list;
+ LIST_DEL(w->free_list, t);
+ LIST_ADD(w->busy_list, t);
+ }
+ /* <====== POOL LOCK */
+ pthread_mutex_unlock(&w->lock);
+
+ if (t) {
+ /* found free thread, assign work */
/* ======> COND_MUTEX */
- pthread_mutex_lock(&w->threads[i].cond_mutex);
+ pthread_mutex_lock(&t->cond_mutex);
- if (w->threads[i].query == NULL) {
- /* hand over the query */
- w->threads[i].query = q;
- q = NULL;
- pthread_cond_signal(&w->threads[i].cond_wakeup);
- }
+ /* hand over the query */
+ t->query = q;
+ pthread_cond_signal(&t->cond_wakeup);
/* <====== COND_MUTEX */
- pthread_mutex_unlock(&w->threads[i].cond_mutex);
- }
+ pthread_mutex_unlock(&t->cond_mutex);
+
+ } else {
- if (q) {
/* all threads are busy, store in wait list */
/* only the dispatcher handles wait_list
* so we do not need to lock around it */
q->next = w->wait_list;
w->wait_list = q;
- q = NULL;
}
}
@@ -373,6 +405,13 @@ static void *gp_worker_main(void *pvt)
q->next = t->pool->reply_list;
t->pool->reply_list = q;
+ /* add us back to the free list but only if we are not
+ * shutting down */
+ if (!t->pool->shutdown) {
+ LIST_DEL(t->pool->busy_list, t);
+ LIST_ADD(t->pool->free_list, t);
+ }
+
/* <====== POOL LOCK */
pthread_mutex_unlock(&t->pool->lock);