diff options
| author | Simo Sorce <simo@redhat.com> | 2012-01-18 11:35:03 -0500 |
|---|---|---|
| committer | Simo Sorce <simo@redhat.com> | 2012-01-18 11:35:03 -0500 |
| commit | 6ca28bbf8523713e34ed2a7378e21f23cf64e298 (patch) | |
| tree | 795e9a23510be415e115bdd488a28c8c507115e2 /proxy/src/gp_workers.c | |
| parent | 1d62ecb4261c30c8312f765f81ad9b4c75334a33 (diff) | |
| download | gss-proxy-6ca28bbf8523713e34ed2a7378e21f23cf64e298.tar.gz gss-proxy-6ca28bbf8523713e34ed2a7378e21f23cf64e298.tar.xz gss-proxy-6ca28bbf8523713e34ed2a7378e21f23cf64e298.zip | |
Organize workers in free and busy lists
This avoids going through an array to chase free threads, so that assigning
work is O(1) instead of O(n).
Will also make easier to later change the number of available workers
dynamically.
Diffstat (limited to 'proxy/src/gp_workers.c')
| -rw-r--r-- | proxy/src/gp_workers.c | 149 |
1 files changed, 94 insertions, 55 deletions
diff --git a/proxy/src/gp_workers.c b/proxy/src/gp_workers.c index 5ccf20f..3dcefac 100644 --- a/proxy/src/gp_workers.c +++ b/proxy/src/gp_workers.c @@ -51,10 +51,11 @@ struct gp_query { }; struct gp_thread { + struct gp_thread *prev; + struct gp_thread *next; struct gp_workers *pool; pthread_t tid; - /* if query is assigned, then the thread is busy */ struct gp_query *query; pthread_mutex_t cond_mutex; pthread_cond_t cond_wakeup; @@ -65,7 +66,8 @@ struct gp_workers { bool shutdown; struct gp_query *wait_list; struct gp_query *reply_list; - struct gp_thread *threads; + struct gp_thread *free_list; + struct gp_thread *busy_list; int num_threads; int sig_pipe[2]; }; @@ -79,6 +81,7 @@ static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev); struct gp_workers *gp_workers_init(verto_ctx *vctx, struct gp_config *cfg) { struct gp_workers *w; + struct gp_thread *t; pthread_attr_t attr; verto_ev *ev; int vflags; @@ -103,32 +106,31 @@ struct gp_workers *gp_workers_init(verto_ctx *vctx, struct gp_config *cfg) w->num_threads = DEFAULT_WORKER_THREADS_NUM; } - w->threads = calloc(w->num_threads, sizeof(struct gp_thread)); - if (!w->threads) { - ret = -1; - goto done; - } - /* make thread joinable (portability) */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); /* init all workers */ for (i = 0; i < w->num_threads; i++) { - ret = pthread_cond_init(&w->threads[i].cond_wakeup, NULL); + t = calloc(1, sizeof(struct gp_thread)); + if (!t) { + ret = -1; + goto done; + } + t->pool = w; + ret = pthread_cond_init(&t->cond_wakeup, NULL); if (ret) { goto done; } - ret = pthread_mutex_init(&w->threads[i].cond_mutex, NULL); + ret = pthread_mutex_init(&t->cond_mutex, NULL); if (ret) { goto done; } - ret = pthread_create(&w->threads[i].tid, &attr, - gp_worker_main, &w->threads[i]); + ret = pthread_create(&t->tid, &attr, gp_worker_main, t); if (ret) { goto done; } - w->threads[i].pool = w; + LIST_ADD(w->free_list, t); } /* add wakeup pipe, so that threads can hand back replies to the @@ -157,79 +159,109 @@ done: void gp_workers_free(struct gp_workers *w) { - int ret; - int i; + struct gp_thread *t; void *retval; - ret = pthread_mutex_lock(&w->lock); - if (ret) { - syslog(LOG_CRIT, "Couldn't get mutex!"); - return; - } + /* ======> POOL LOCK */ + pthread_mutex_lock(&w->lock); w->shutdown = true; - ret = pthread_mutex_unlock(&w->lock); - if (ret) { - syslog(LOG_CRIT, "Can't release mutex?!"); - return; - } + /* <====== POOL LOCK */ + pthread_mutex_unlock(&w->lock); - if (w->threads) { - for (i = 0; i < w->num_threads; i++) { - /* wake up threads, then join them */ - /* ======> COND_MUTEX */ - pthread_mutex_lock(&w->threads[i].cond_mutex); - pthread_cond_signal(&w->threads[i].cond_wakeup); - /* <====== COND_MUTEX */ - pthread_mutex_unlock(&w->threads[i].cond_mutex); + /* we do not run the following operations within + * the lock, or deadlocks may arise for threads + * that are just finishing doing some work */ - ret = pthread_join(w->threads[i].tid, &retval); - } + /* we guarantee nobody is touching these lists by + * preventing workers from touching the free/busy + * lists when a 'shutdown' is in progress */ + + while (w->free_list) { + /* pick threads one by one */ + t = w->free_list; + LIST_DEL(w->free_list, t); + + /* wake up threads, then join them */ + /* ======> COND_MUTEX */ + pthread_mutex_lock(&t->cond_mutex); + pthread_cond_signal(&t->cond_wakeup); + /* <====== COND_MUTEX */ + pthread_mutex_unlock(&t->cond_mutex); - free(w->threads); - w->threads = NULL; + pthread_join(t->tid, &retval); + + pthread_mutex_destroy(&t->cond_mutex); + pthread_cond_destroy(&t->cond_wakeup); + free(t); } - ret = pthread_mutex_destroy(&w->lock); - if (ret) { - syslog(LOG_CRIT, "Failed to destroy mutex?!"); - return; + /* do the same with the busy list */ + while (w->busy_list) { + /* pick threads one by one */ + t = w->busy_list; + LIST_DEL(w->free_list, t); + + /* wake up threads, then join them */ + /* ======> COND_MUTEX */ + pthread_mutex_lock(&t->cond_mutex); + pthread_cond_signal(&t->cond_wakeup); + /* <====== COND_MUTEX */ + pthread_mutex_unlock(&t->cond_mutex); + + pthread_join(t->tid, &retval); + + pthread_mutex_destroy(&t->cond_mutex); + pthread_cond_destroy(&t->cond_wakeup); + free(t); } + close(w->sig_pipe[0]); + close(w->sig_pipe[1]); + + pthread_mutex_destroy(&w->lock); + free(w); } static void gp_query_assign(struct gp_workers *w, struct gp_query *q) { - int i; + struct gp_thread *t = NULL; + /* then either find a free thread or queue in the wait list */ - for (i = 0; q != NULL && i < w->num_threads; i++) { - if (w->threads[i].query != NULL) continue; + /* ======> POOL LOCK */ + pthread_mutex_lock(&w->lock); + if (w->free_list) { + t = w->free_list; + LIST_DEL(w->free_list, t); + LIST_ADD(w->busy_list, t); + } + /* <====== POOL LOCK */ + pthread_mutex_unlock(&w->lock); + + if (t) { + /* found free thread, assign work */ /* ======> COND_MUTEX */ - pthread_mutex_lock(&w->threads[i].cond_mutex); + pthread_mutex_lock(&t->cond_mutex); - if (w->threads[i].query == NULL) { - /* hand over the query */ - w->threads[i].query = q; - q = NULL; - pthread_cond_signal(&w->threads[i].cond_wakeup); - } + /* hand over the query */ + t->query = q; + pthread_cond_signal(&t->cond_wakeup); /* <====== COND_MUTEX */ - pthread_mutex_unlock(&w->threads[i].cond_mutex); - } + pthread_mutex_unlock(&t->cond_mutex); + + } else { - if (q) { /* all threads are busy, store in wait list */ /* only the dispatcher handles wait_list * so we do not need to lock around it */ q->next = w->wait_list; w->wait_list = q; - q = NULL; } } @@ -373,6 +405,13 @@ static void *gp_worker_main(void *pvt) q->next = t->pool->reply_list; t->pool->reply_list = q; + /* add us back to the free list but only if we are not + * shutting down */ + if (!t->pool->shutdown) { + LIST_DEL(t->pool->busy_list, t); + LIST_ADD(t->pool->free_list, t); + } + /* <====== POOL LOCK */ pthread_mutex_unlock(&t->pool->lock); |
