diff options
-rw-r--r-- | proxy/src/gp_utils.h | 25 | ||||
-rw-r--r-- | proxy/src/gp_workers.c | 149 |
2 files changed, 119 insertions, 55 deletions
diff --git a/proxy/src/gp_utils.h b/proxy/src/gp_utils.h index 47d766d..38ca400 100644 --- a/proxy/src/gp_utils.h +++ b/proxy/src/gp_utils.h @@ -33,6 +33,31 @@ #define _(STRING) gettext(STRING) +/* add element to list head */ +#define LIST_ADD(list, elem) do { \ + elem->prev = NULL; \ + elem->next = list; \ + if (list) { \ + list->prev = elem; \ + } \ + list = elem; \ +} while (0) + +/* remove element from list */ +#define LIST_DEL(list, elem) do { \ + if (elem->next) { \ + elem->next->prev = elem->prev; \ + } \ + if (elem->prev) { \ + elem->prev->next = elem->next; \ + } \ + if (list == elem) { \ + list = elem->next; \ + } \ + elem->prev = NULL; \ + elem->next = NULL; \ +} while (0) + struct gp_config { char *config_file; /* gssproxy configuration file */ bool daemonize; /* let gssproxy daemonize */ diff --git a/proxy/src/gp_workers.c b/proxy/src/gp_workers.c index 5ccf20f..3dcefac 100644 --- a/proxy/src/gp_workers.c +++ b/proxy/src/gp_workers.c @@ -51,10 +51,11 @@ struct gp_query { }; struct gp_thread { + struct gp_thread *prev; + struct gp_thread *next; struct gp_workers *pool; pthread_t tid; - /* if query is assigned, then the thread is busy */ struct gp_query *query; pthread_mutex_t cond_mutex; pthread_cond_t cond_wakeup; @@ -65,7 +66,8 @@ struct gp_workers { bool shutdown; struct gp_query *wait_list; struct gp_query *reply_list; - struct gp_thread *threads; + struct gp_thread *free_list; + struct gp_thread *busy_list; int num_threads; int sig_pipe[2]; }; @@ -79,6 +81,7 @@ static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev); struct gp_workers *gp_workers_init(verto_ctx *vctx, struct gp_config *cfg) { struct gp_workers *w; + struct gp_thread *t; pthread_attr_t attr; verto_ev *ev; int vflags; @@ -103,32 +106,31 @@ struct gp_workers *gp_workers_init(verto_ctx *vctx, struct gp_config *cfg) w->num_threads = DEFAULT_WORKER_THREADS_NUM; } - w->threads = calloc(w->num_threads, sizeof(struct gp_thread)); - if (!w->threads) { - ret = -1; - goto done; - } - /* make thread joinable (portability) */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); /* init all workers */ for (i = 0; i < w->num_threads; i++) { - ret = pthread_cond_init(&w->threads[i].cond_wakeup, NULL); + t = calloc(1, sizeof(struct gp_thread)); + if (!t) { + ret = -1; + goto done; + } + t->pool = w; + ret = pthread_cond_init(&t->cond_wakeup, NULL); if (ret) { goto done; } - ret = pthread_mutex_init(&w->threads[i].cond_mutex, NULL); + ret = pthread_mutex_init(&t->cond_mutex, NULL); if (ret) { goto done; } - ret = pthread_create(&w->threads[i].tid, &attr, - gp_worker_main, &w->threads[i]); + ret = pthread_create(&t->tid, &attr, gp_worker_main, t); if (ret) { goto done; } - w->threads[i].pool = w; + LIST_ADD(w->free_list, t); } /* add wakeup pipe, so that threads can hand back replies to the @@ -157,79 +159,109 @@ done: void gp_workers_free(struct gp_workers *w) { - int ret; - int i; + struct gp_thread *t; void *retval; - ret = pthread_mutex_lock(&w->lock); - if (ret) { - syslog(LOG_CRIT, "Couldn't get mutex!"); - return; - } + /* ======> POOL LOCK */ + pthread_mutex_lock(&w->lock); w->shutdown = true; - ret = pthread_mutex_unlock(&w->lock); - if (ret) { - syslog(LOG_CRIT, "Can't release mutex?!"); - return; - } + /* <====== POOL LOCK */ + pthread_mutex_unlock(&w->lock); - if (w->threads) { - for (i = 0; i < w->num_threads; i++) { - /* wake up threads, then join them */ - /* ======> COND_MUTEX */ - pthread_mutex_lock(&w->threads[i].cond_mutex); - pthread_cond_signal(&w->threads[i].cond_wakeup); - /* <====== COND_MUTEX */ - pthread_mutex_unlock(&w->threads[i].cond_mutex); + /* we do not run the following operations within + * the lock, or deadlocks may arise for threads + * that are just finishing doing some work */ - ret = pthread_join(w->threads[i].tid, &retval); - } + /* we guarantee nobody is touching these lists by + * preventing workers from touching the free/busy + * lists when a 'shutdown' is in progress */ + + while (w->free_list) { + /* pick threads one by one */ + t = w->free_list; + LIST_DEL(w->free_list, t); + + /* wake up threads, then join them */ + /* ======> COND_MUTEX */ + pthread_mutex_lock(&t->cond_mutex); + pthread_cond_signal(&t->cond_wakeup); + /* <====== COND_MUTEX */ + pthread_mutex_unlock(&t->cond_mutex); - free(w->threads); - w->threads = NULL; + pthread_join(t->tid, &retval); + + pthread_mutex_destroy(&t->cond_mutex); + pthread_cond_destroy(&t->cond_wakeup); + free(t); } - ret = pthread_mutex_destroy(&w->lock); - if (ret) { - syslog(LOG_CRIT, "Failed to destroy mutex?!"); - return; + /* do the same with the busy list */ + while (w->busy_list) { + /* pick threads one by one */ + t = w->busy_list; + LIST_DEL(w->free_list, t); + + /* wake up threads, then join them */ + /* ======> COND_MUTEX */ + pthread_mutex_lock(&t->cond_mutex); + pthread_cond_signal(&t->cond_wakeup); + /* <====== COND_MUTEX */ + pthread_mutex_unlock(&t->cond_mutex); + + pthread_join(t->tid, &retval); + + pthread_mutex_destroy(&t->cond_mutex); + pthread_cond_destroy(&t->cond_wakeup); + free(t); } + close(w->sig_pipe[0]); + close(w->sig_pipe[1]); + + pthread_mutex_destroy(&w->lock); + free(w); } static void gp_query_assign(struct gp_workers *w, struct gp_query *q) { - int i; + struct gp_thread *t = NULL; + /* then either find a free thread or queue in the wait list */ - for (i = 0; q != NULL && i < w->num_threads; i++) { - if (w->threads[i].query != NULL) continue; + /* ======> POOL LOCK */ + pthread_mutex_lock(&w->lock); + if (w->free_list) { + t = w->free_list; + LIST_DEL(w->free_list, t); + LIST_ADD(w->busy_list, t); + } + /* <====== POOL LOCK */ + pthread_mutex_unlock(&w->lock); + + if (t) { + /* found free thread, assign work */ /* ======> COND_MUTEX */ - pthread_mutex_lock(&w->threads[i].cond_mutex); + pthread_mutex_lock(&t->cond_mutex); - if (w->threads[i].query == NULL) { - /* hand over the query */ - w->threads[i].query = q; - q = NULL; - pthread_cond_signal(&w->threads[i].cond_wakeup); - } + /* hand over the query */ + t->query = q; + pthread_cond_signal(&t->cond_wakeup); /* <====== COND_MUTEX */ - pthread_mutex_unlock(&w->threads[i].cond_mutex); - } + pthread_mutex_unlock(&t->cond_mutex); + + } else { - if (q) { /* all threads are busy, store in wait list */ /* only the dispatcher handles wait_list * so we do not need to lock around it */ q->next = w->wait_list; w->wait_list = q; - q = NULL; } } @@ -373,6 +405,13 @@ static void *gp_worker_main(void *pvt) q->next = t->pool->reply_list; t->pool->reply_list = q; + /* add us back to the free list but only if we are not + * shutting down */ + if (!t->pool->shutdown) { + LIST_DEL(t->pool->busy_list, t); + LIST_ADD(t->pool->free_list, t); + } + /* <====== POOL LOCK */ pthread_mutex_unlock(&t->pool->lock); |