summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--proxy/src/gp_utils.h25
-rw-r--r--proxy/src/gp_workers.c149
2 files changed, 119 insertions, 55 deletions
diff --git a/proxy/src/gp_utils.h b/proxy/src/gp_utils.h
index 47d766d..38ca400 100644
--- a/proxy/src/gp_utils.h
+++ b/proxy/src/gp_utils.h
@@ -33,6 +33,31 @@
#define _(STRING) gettext(STRING)
+/* add element to list head */
+#define LIST_ADD(list, elem) do { \
+ elem->prev = NULL; \
+ elem->next = list; \
+ if (list) { \
+ list->prev = elem; \
+ } \
+ list = elem; \
+} while (0)
+
+/* remove element from list */
+#define LIST_DEL(list, elem) do { \
+ if (elem->next) { \
+ elem->next->prev = elem->prev; \
+ } \
+ if (elem->prev) { \
+ elem->prev->next = elem->next; \
+ } \
+ if (list == elem) { \
+ list = elem->next; \
+ } \
+ elem->prev = NULL; \
+ elem->next = NULL; \
+} while (0)
+
struct gp_config {
char *config_file; /* gssproxy configuration file */
bool daemonize; /* let gssproxy daemonize */
diff --git a/proxy/src/gp_workers.c b/proxy/src/gp_workers.c
index 5ccf20f..3dcefac 100644
--- a/proxy/src/gp_workers.c
+++ b/proxy/src/gp_workers.c
@@ -51,10 +51,11 @@ struct gp_query {
};
struct gp_thread {
+ struct gp_thread *prev;
+ struct gp_thread *next;
struct gp_workers *pool;
pthread_t tid;
- /* if query is assigned, then the thread is busy */
struct gp_query *query;
pthread_mutex_t cond_mutex;
pthread_cond_t cond_wakeup;
@@ -65,7 +66,8 @@ struct gp_workers {
bool shutdown;
struct gp_query *wait_list;
struct gp_query *reply_list;
- struct gp_thread *threads;
+ struct gp_thread *free_list;
+ struct gp_thread *busy_list;
int num_threads;
int sig_pipe[2];
};
@@ -79,6 +81,7 @@ static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev);
struct gp_workers *gp_workers_init(verto_ctx *vctx, struct gp_config *cfg)
{
struct gp_workers *w;
+ struct gp_thread *t;
pthread_attr_t attr;
verto_ev *ev;
int vflags;
@@ -103,32 +106,31 @@ struct gp_workers *gp_workers_init(verto_ctx *vctx, struct gp_config *cfg)
w->num_threads = DEFAULT_WORKER_THREADS_NUM;
}
- w->threads = calloc(w->num_threads, sizeof(struct gp_thread));
- if (!w->threads) {
- ret = -1;
- goto done;
- }
-
/* make thread joinable (portability) */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
/* init all workers */
for (i = 0; i < w->num_threads; i++) {
- ret = pthread_cond_init(&w->threads[i].cond_wakeup, NULL);
+ t = calloc(1, sizeof(struct gp_thread));
+ if (!t) {
+ ret = -1;
+ goto done;
+ }
+ t->pool = w;
+ ret = pthread_cond_init(&t->cond_wakeup, NULL);
if (ret) {
goto done;
}
- ret = pthread_mutex_init(&w->threads[i].cond_mutex, NULL);
+ ret = pthread_mutex_init(&t->cond_mutex, NULL);
if (ret) {
goto done;
}
- ret = pthread_create(&w->threads[i].tid, &attr,
- gp_worker_main, &w->threads[i]);
+ ret = pthread_create(&t->tid, &attr, gp_worker_main, t);
if (ret) {
goto done;
}
- w->threads[i].pool = w;
+ LIST_ADD(w->free_list, t);
}
/* add wakeup pipe, so that threads can hand back replies to the
@@ -157,79 +159,109 @@ done:
void gp_workers_free(struct gp_workers *w)
{
- int ret;
- int i;
+ struct gp_thread *t;
void *retval;
- ret = pthread_mutex_lock(&w->lock);
- if (ret) {
- syslog(LOG_CRIT, "Couldn't get mutex!");
- return;
- }
+ /* ======> POOL LOCK */
+ pthread_mutex_lock(&w->lock);
w->shutdown = true;
- ret = pthread_mutex_unlock(&w->lock);
- if (ret) {
- syslog(LOG_CRIT, "Can't release mutex?!");
- return;
- }
+ /* <====== POOL LOCK */
+ pthread_mutex_unlock(&w->lock);
- if (w->threads) {
- for (i = 0; i < w->num_threads; i++) {
- /* wake up threads, then join them */
- /* ======> COND_MUTEX */
- pthread_mutex_lock(&w->threads[i].cond_mutex);
- pthread_cond_signal(&w->threads[i].cond_wakeup);
- /* <====== COND_MUTEX */
- pthread_mutex_unlock(&w->threads[i].cond_mutex);
+ /* we do not run the following operations within
+ * the lock, or deadlocks may arise for threads
+ * that are just finishing doing some work */
- ret = pthread_join(w->threads[i].tid, &retval);
- }
+ /* we guarantee nobody is touching these lists by
+ * preventing workers from touching the free/busy
+ * lists when a 'shutdown' is in progress */
+
+ while (w->free_list) {
+ /* pick threads one by one */
+ t = w->free_list;
+ LIST_DEL(w->free_list, t);
+
+ /* wake up threads, then join them */
+ /* ======> COND_MUTEX */
+ pthread_mutex_lock(&t->cond_mutex);
+ pthread_cond_signal(&t->cond_wakeup);
+ /* <====== COND_MUTEX */
+ pthread_mutex_unlock(&t->cond_mutex);
- free(w->threads);
- w->threads = NULL;
+ pthread_join(t->tid, &retval);
+
+ pthread_mutex_destroy(&t->cond_mutex);
+ pthread_cond_destroy(&t->cond_wakeup);
+ free(t);
}
- ret = pthread_mutex_destroy(&w->lock);
- if (ret) {
- syslog(LOG_CRIT, "Failed to destroy mutex?!");
- return;
+ /* do the same with the busy list */
+ while (w->busy_list) {
+ /* pick threads one by one */
+ t = w->busy_list;
+ LIST_DEL(w->free_list, t);
+
+ /* wake up threads, then join them */
+ /* ======> COND_MUTEX */
+ pthread_mutex_lock(&t->cond_mutex);
+ pthread_cond_signal(&t->cond_wakeup);
+ /* <====== COND_MUTEX */
+ pthread_mutex_unlock(&t->cond_mutex);
+
+ pthread_join(t->tid, &retval);
+
+ pthread_mutex_destroy(&t->cond_mutex);
+ pthread_cond_destroy(&t->cond_wakeup);
+ free(t);
}
+ close(w->sig_pipe[0]);
+ close(w->sig_pipe[1]);
+
+ pthread_mutex_destroy(&w->lock);
+
free(w);
}
static void gp_query_assign(struct gp_workers *w, struct gp_query *q)
{
- int i;
+ struct gp_thread *t = NULL;
+
/* then either find a free thread or queue in the wait list */
- for (i = 0; q != NULL && i < w->num_threads; i++) {
- if (w->threads[i].query != NULL) continue;
+ /* ======> POOL LOCK */
+ pthread_mutex_lock(&w->lock);
+ if (w->free_list) {
+ t = w->free_list;
+ LIST_DEL(w->free_list, t);
+ LIST_ADD(w->busy_list, t);
+ }
+ /* <====== POOL LOCK */
+ pthread_mutex_unlock(&w->lock);
+
+ if (t) {
+ /* found free thread, assign work */
/* ======> COND_MUTEX */
- pthread_mutex_lock(&w->threads[i].cond_mutex);
+ pthread_mutex_lock(&t->cond_mutex);
- if (w->threads[i].query == NULL) {
- /* hand over the query */
- w->threads[i].query = q;
- q = NULL;
- pthread_cond_signal(&w->threads[i].cond_wakeup);
- }
+ /* hand over the query */
+ t->query = q;
+ pthread_cond_signal(&t->cond_wakeup);
/* <====== COND_MUTEX */
- pthread_mutex_unlock(&w->threads[i].cond_mutex);
- }
+ pthread_mutex_unlock(&t->cond_mutex);
+
+ } else {
- if (q) {
/* all threads are busy, store in wait list */
/* only the dispatcher handles wait_list
* so we do not need to lock around it */
q->next = w->wait_list;
w->wait_list = q;
- q = NULL;
}
}
@@ -373,6 +405,13 @@ static void *gp_worker_main(void *pvt)
q->next = t->pool->reply_list;
t->pool->reply_list = q;
+ /* add us back to the free list but only if we are not
+ * shutting down */
+ if (!t->pool->shutdown) {
+ LIST_DEL(t->pool->busy_list, t);
+ LIST_ADD(t->pool->free_list, t);
+ }
+
/* <====== POOL LOCK */
pthread_mutex_unlock(&t->pool->lock);