summaryrefslogtreecommitdiffstats
path: root/source3/lib
diff options
context:
space:
mode:
authorVolker Lendecke <vl@samba.org>2014-03-21 17:53:26 +0100
committerJeremy Allison <jra@samba.org>2014-03-27 06:06:11 +0100
commit84aa2ddd861549d6ec8d1ef15f4fd518e03f449b (patch)
treedc1137ef85e9d3c09416c30d38aafbb79b0a1d84 /source3/lib
parent17a60b98db92026ae0b7136a9c8b802bf936423a (diff)
downloadsamba-84aa2ddd861549d6ec8d1ef15f4fd518e03f449b.tar.gz
samba-84aa2ddd861549d6ec8d1ef15f4fd518e03f449b.tar.xz
samba-84aa2ddd861549d6ec8d1ef15f4fd518e03f449b.zip
pthreadpool: Avoid a malloc/free per job
pthreadpool_add_job is in our hottest code path for r/w intensive workloads, so we should avoid anything CPU-intensive. pthreadpool used to malloc each job and free it in the worker thread. This patch adds a FIFO queue for jobs that helper threads copy from, avoiding constant malloc/free. This cuts user space CPU in the local-bench-pthreadpool benchmark by roughly 10% on my system. Signed-off-by: Volker Lendecke <vl@samba.org> Reviewed-by: Jeremy Allison <jra@samba.org>
Diffstat (limited to 'source3/lib')
-rw-r--r--source3/lib/pthreadpool/pthreadpool.c145
1 files changed, 91 insertions, 54 deletions
diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
index 654d420732f..d51e8083601 100644
--- a/source3/lib/pthreadpool/pthreadpool.c
+++ b/source3/lib/pthreadpool/pthreadpool.c
@@ -34,7 +34,6 @@
#include "lib/util/dlinklist.h"
struct pthreadpool_job {
- struct pthreadpool_job *next;
int id;
void (*fn)(void *private_data);
void *private_data;
@@ -57,9 +56,13 @@ struct pthreadpool {
pthread_cond_t condvar;
/*
- * List of work jobs
+ * Array of jobs
*/
- struct pthreadpool_job *jobs, *last_job;
+ size_t jobs_array_len;
+ struct pthreadpool_job *jobs;
+
+ size_t head;
+ size_t num_jobs;
/*
* pipe for signalling
@@ -113,9 +116,21 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
return ENOMEM;
}
+ pool->jobs_array_len = 4;
+ pool->jobs = calloc(
+ pool->jobs_array_len, sizeof(struct pthreadpool_job));
+
+ if (pool->jobs == NULL) {
+ free(pool);
+ return ENOMEM;
+ }
+
+ pool->head = pool->num_jobs = 0;
+
ret = pipe(pool->sig_pipe);
if (ret == -1) {
int err = errno;
+ free(pool->jobs);
free(pool);
return err;
}
@@ -124,6 +139,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
if (ret != 0) {
close(pool->sig_pipe[0]);
close(pool->sig_pipe[1]);
+ free(pool->jobs);
free(pool);
return ret;
}
@@ -133,12 +149,12 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
pthread_mutex_destroy(&pool->mutex);
close(pool->sig_pipe[0]);
close(pool->sig_pipe[1]);
+ free(pool->jobs);
free(pool);
return ret;
}
pool->shutdown = 0;
- pool->jobs = pool->last_job = NULL;
pool->num_threads = 0;
pool->num_exited = 0;
pool->exited = NULL;
@@ -151,6 +167,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
pthread_mutex_destroy(&pool->mutex);
close(pool->sig_pipe[0]);
close(pool->sig_pipe[1]);
+ free(pool->jobs);
free(pool);
return ret;
}
@@ -221,14 +238,8 @@ static void pthreadpool_child(void)
pool->exited = NULL;
pool->num_idle = 0;
-
- while (pool->jobs != NULL) {
- struct pthreadpool_job *job;
- job = pool->jobs;
- pool->jobs = job->next;
- free(job);
- }
- pool->last_job = NULL;
+ pool->head = 0;
+ pool->num_jobs = 0;
ret = pthread_mutex_unlock(&pool->mutex);
assert(ret == 0);
@@ -311,7 +322,7 @@ int pthreadpool_destroy(struct pthreadpool *pool)
return ret;
}
- if ((pool->jobs != NULL) || pool->shutdown) {
+ if ((pool->num_jobs != 0) || pool->shutdown) {
ret = pthread_mutex_unlock(&pool->mutex);
assert(ret == 0);
return EBUSY;
@@ -383,6 +394,7 @@ int pthreadpool_destroy(struct pthreadpool *pool)
pool->sig_pipe[1] = -1;
free(pool->exited);
+ free(pool->jobs);
free(pool);
return 0;
@@ -410,6 +422,61 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
pool->num_exited += 1;
}
+static bool pthreadpool_get_job(struct pthreadpool *p,
+ struct pthreadpool_job *job)
+{
+ if (p->num_jobs == 0) {
+ return false;
+ }
+ *job = p->jobs[p->head];
+ p->head = (p->head+1) % p->jobs_array_len;
+ p->num_jobs -= 1;
+ return true;
+}
+
+static bool pthreadpool_put_job(struct pthreadpool *p,
+ int id,
+ void (*fn)(void *private_data),
+ void *private_data)
+{
+ struct pthreadpool_job *job;
+
+ if (p->num_jobs == p->jobs_array_len) {
+ struct pthreadpool_job *tmp;
+ size_t new_len = p->jobs_array_len * 2;
+
+ tmp = realloc(
+ p->jobs, sizeof(struct pthreadpool_job) * new_len);
+ if (tmp == NULL) {
+ return false;
+ }
+ p->jobs = tmp;
+
+ /*
+ * We just doubled the jobs array. The array implements a FIFO
+ * queue with a modulo-based wraparound, so we have to memcpy
+ * the jobs that are logically at the queue end but physically
+ * before the queue head into the reallocated area. The new
+ * space starts at the current jobs_array_len, and we have to
+ * copy everything before the current head job into the new
+ * area.
+ */
+ memcpy(&p->jobs[p->jobs_array_len], p->jobs,
+ sizeof(struct pthreadpool_job) * p->head);
+
+ p->jobs_array_len = new_len;
+ }
+
+ job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
+ job->id = id;
+ job->fn = fn;
+ job->private_data = private_data;
+
+ p->num_jobs += 1;
+
+ return true;
+}
+
static void *pthreadpool_server(void *arg)
{
struct pthreadpool *pool = (struct pthreadpool *)arg;
@@ -422,7 +489,7 @@ static void *pthreadpool_server(void *arg)
while (1) {
struct timespec ts;
- struct pthreadpool_job *job;
+ struct pthreadpool_job job;
/*
* idle-wait at most 1 second. If nothing happens in that
@@ -432,7 +499,7 @@ static void *pthreadpool_server(void *arg)
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 1;
- while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
+ while ((pool->num_jobs == 0) && (pool->shutdown == 0)) {
pool->num_idle += 1;
res = pthread_cond_timedwait(
@@ -441,7 +508,7 @@ static void *pthreadpool_server(void *arg)
if (res == ETIMEDOUT) {
- if (pool->jobs == NULL) {
+ if (pool->num_jobs == 0) {
/*
* we timed out and still no work for
* us. Exit.
@@ -456,19 +523,9 @@ static void *pthreadpool_server(void *arg)
assert(res == 0);
}
- job = pool->jobs;
-
- if (job != NULL) {
+ if (pthreadpool_get_job(pool, &job)) {
ssize_t written;
-
- /*
- * Ok, there's work for us to do, remove the job from
- * the pthreadpool list
- */
- pool->jobs = job->next;
- if (pool->last_job == job) {
- pool->last_job = NULL;
- }
+ int sig_pipe = pool->sig_pipe[1];
/*
* Do the work with the mutex unlocked
@@ -477,12 +534,8 @@ static void *pthreadpool_server(void *arg)
res = pthread_mutex_unlock(&pool->mutex);
assert(res == 0);
- job->fn(job->private_data);
-
- written = write(pool->sig_pipe[1], &job->id,
- sizeof(int));
-
- free(job);
+ job.fn(job.private_data);
+ written = write(sig_pipe, &job.id, sizeof(job.id));
res = pthread_mutex_lock(&pool->mutex);
assert(res == 0);
@@ -494,7 +547,7 @@ static void *pthreadpool_server(void *arg)
}
}
- if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
+ if ((pool->num_jobs == 0) && (pool->shutdown != 0)) {
/*
* No more work to do and we're asked to shut down, so
* exit
@@ -518,24 +571,12 @@ static void *pthreadpool_server(void *arg)
int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data)
{
- struct pthreadpool_job *job;
pthread_t thread_id;
int res;
sigset_t mask, omask;
- job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
- if (job == NULL) {
- return ENOMEM;
- }
-
- job->fn = fn;
- job->private_data = private_data;
- job->id = job_id;
- job->next = NULL;
-
res = pthread_mutex_lock(&pool->mutex);
if (res != 0) {
- free(job);
return res;
}
@@ -546,7 +587,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
*/
res = pthread_mutex_unlock(&pool->mutex);
assert(res == 0);
- free(job);
return EINVAL;
}
@@ -558,13 +598,10 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
/*
* Add job to the end of the queue
*/
- if (pool->jobs == NULL) {
- pool->jobs = job;
- }
- else {
- pool->last_job->next = job;
+ if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
+ pthread_mutex_unlock(&pool->mutex);
+ return ENOMEM;
}
- pool->last_job = job;
if (pool->num_idle > 0) {
/*