diff options
author | Xavi Hernandez <xhernandez@redhat.com> | 2020-04-30 11:19:01 +0200 |
---|---|---|
committer | MOHIT AGRAWAL <moagrawa@redhat.com> | 2020-05-13 16:17:35 +0000 |
commit | 3da22f8cb08b05562a4c6bd2694f2f19199cff7f (patch) | |
tree | 9e8f69a2634fe6866aca1a82d46676c81dc08448 /libglusterfs/src | |
parent | 03a6b11179f325f6c7d91989c4f7b2acd6438d1a (diff) | |
download | glusterfs-3da22f8cb08b05562a4c6bd2694f2f19199cff7f.tar.gz glusterfs-3da22f8cb08b05562a4c6bd2694f2f19199cff7f.tar.xz glusterfs-3da22f8cb08b05562a4c6bd2694f2f19199cff7f.zip |
syncop: improve scaling and implement more tools
The current scaling of the syncop thread pool is not working properly
and can leave some tasks in the run queue more time than necessary
when the maximum number of threads is not reached.
This patch provides a better scaling condition to react faster to
pending work.
Condition variables and sleep in the context of a synctask have also
been implemented. Their purpose is to replace regular condition
variables and sleeps that block synctask threads and prevent other
tasks to be executed.
The new features have been applied to several places in glusterd.
Change-Id: Ic50b7c73c104f9e41f08101a357d30b95efccfbf
Fixes: #1116
Signed-off-by: Xavi Hernandez <xhernandez@redhat.com>
Diffstat (limited to 'libglusterfs/src')
-rw-r--r-- | libglusterfs/src/glusterfs/syncop.h | 53 | ||||
-rw-r--r-- | libglusterfs/src/libglusterfs.sym | 7 | ||||
-rw-r--r-- | libglusterfs/src/syncop.c | 306 |
3 files changed, 291 insertions, 75 deletions
diff --git a/libglusterfs/src/glusterfs/syncop.h b/libglusterfs/src/glusterfs/syncop.h index bfdec491ba..1a801a9144 100644 --- a/libglusterfs/src/glusterfs/syncop.h +++ b/libglusterfs/src/glusterfs/syncop.h @@ -16,6 +16,8 @@ #include <ucontext.h> #include "glusterfs/dict.h" // for dict_t #include "glusterfs/stack.h" // for call_frame_t, STACK_DESTROY, STACK_... +#include "glusterfs/timer.h" + #define SYNCENV_PROC_MAX 16 #define SYNCENV_PROC_MIN 2 #define SYNCPROC_IDLE_TIME 600 @@ -32,6 +34,7 @@ struct synctask; struct syncproc; struct syncenv; +struct synccond; typedef int (*synctask_cbk_t)(int ret, call_frame_t *frame, void *opaque); @@ -55,9 +58,12 @@ struct synctask { call_frame_t *opframe; synctask_cbk_t synccbk; synctask_fn_t syncfn; - synctask_state_t state; + struct timespec *delta; + gf_timer_t *timer; + struct synccond *synccond; void *opaque; void *stack; + synctask_state_t state; int woken; int slept; int ret; @@ -85,19 +91,21 @@ struct syncproc { /* hosts the scheduler thread and framework for executing synctasks */ struct syncenv { struct syncproc proc[SYNCENV_PROC_MAX]; - int procs; + + pthread_mutex_t mutex; + pthread_cond_t cond; struct list_head runq; - int runcount; struct list_head waitq; - int waitcount; + + int procs; + int procs_idle; + + int runcount; int procmin; int procmax; - pthread_mutex_t mutex; - pthread_cond_t cond; - size_t stacksize; int destroy; /* FLAG to mark syncenv is in destroy mode @@ -123,6 +131,13 @@ struct synclock { }; typedef struct synclock synclock_t; +struct synccond { + pthread_mutex_t pmutex; + pthread_cond_t pcond; + struct list_head waitq; +}; +typedef struct synccond synccond_t; + struct syncbarrier { gf_boolean_t initialized; /*Set on successful initialization*/ pthread_mutex_t guard; /* guard the remaining members, pair @cond */ @@ -219,7 +234,7 @@ struct syncopctx { #define __yield(args) \ do { \ if (args->task) { \ - synctask_yield(args->task); \ + synctask_yield(args->task, NULL); \ } else { \ pthread_mutex_lock(&args->mutex); \ { \ @@ -310,7 +325,9 @@ synctask_join(struct synctask *task); void synctask_wake(struct synctask *task); void -synctask_yield(struct synctask *task); +synctask_yield(struct synctask *task, struct timespec *delta); +void +synctask_sleep(int32_t secs); void synctask_waitfor(struct synctask *task, int count); @@ -408,6 +425,24 @@ synclock_trylock(synclock_t *lock); int synclock_unlock(synclock_t *lock); +int32_t +synccond_init(synccond_t *cond); + +void +synccond_destroy(synccond_t *cond); + +int +synccond_wait(synccond_t *cond, synclock_t *lock); + +int +synccond_timedwait(synccond_t *cond, synclock_t *lock, struct timespec *delta); + +void +synccond_signal(synccond_t *cond); + +void +synccond_broadcast(synccond_t *cond); + int syncbarrier_init(syncbarrier_t *barrier); int diff --git a/libglusterfs/src/libglusterfs.sym b/libglusterfs/src/libglusterfs.sym index 47e0872ca5..f5e273e4ca 100644 --- a/libglusterfs/src/libglusterfs.sym +++ b/libglusterfs/src/libglusterfs.sym @@ -937,6 +937,12 @@ syncbarrier_destroy syncbarrier_init syncbarrier_wait syncbarrier_wake +synccond_init +synccond_destroy +synccond_wait +synccond_timedwait +synccond_signal +synccond_broadcast syncenv_destroy syncenv_new synclock_destroy @@ -1014,6 +1020,7 @@ synctask_new synctask_new1 synctask_set synctask_setid +synctask_sleep synctask_wake synctask_yield sys_access diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index 693970f0f3..71d37b76f2 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -154,10 +154,14 @@ out: return ret; } +void * +syncenv_processor(void *thdata); + static void __run(struct synctask *task) { struct syncenv *env = NULL; + int32_t total, ret, i; env = task->env; @@ -173,7 +177,6 @@ __run(struct synctask *task) env->runcount--; break; case SYNCTASK_WAIT: - env->waitcount--; break; case SYNCTASK_DONE: gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_COMPLETED_TASK, @@ -187,8 +190,27 @@ __run(struct synctask *task) } list_add_tail(&task->all_tasks, &env->runq); - env->runcount++; task->state = SYNCTASK_RUN; + + env->runcount++; + + total = env->procs + env->runcount - env->procs_idle; + if (total > env->procmax) { + total = env->procmax; + } + if (total > env->procs) { + for (i = 0; i < env->procmax; i++) { + if (env->proc[i].env == NULL) { + env->proc[i].env = env; + ret = gf_thread_create(&env->proc[i].processor, NULL, + syncenv_processor, &env->proc[i], + "sproc%d", i); + if ((ret < 0) || (++env->procs >= total)) { + break; + } + } + } + } } static void @@ -210,7 +232,6 @@ __wait(struct synctask *task) gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_REWAITING_TASK, "re-waiting already waiting " "task"); - env->waitcount--; break; case SYNCTASK_DONE: gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_COMPLETED_TASK, @@ -223,12 +244,11 @@ __wait(struct synctask *task) } list_add_tail(&task->all_tasks, &env->waitq); - env->waitcount++; task->state = SYNCTASK_WAIT; } void -synctask_yield(struct synctask *task) +synctask_yield(struct synctask *task, struct timespec *delta) { xlator_t *oldTHIS = THIS; @@ -237,6 +257,8 @@ synctask_yield(struct synctask *task) task->proc->sched.uc_flags &= ~_UC_TLSBASE; #endif + task->delta = delta; + if (task->state != SYNCTASK_DONE) { task->state = SYNCTASK_SUSPEND; } @@ -249,6 +271,35 @@ synctask_yield(struct synctask *task) } void +synctask_sleep(int32_t secs) +{ + struct timespec delta; + struct synctask *task; + + task = synctask_get(); + + if (task == NULL) { + sleep(secs); + } else { + delta.tv_sec = secs; + delta.tv_nsec = 0; + + synctask_yield(task, &delta); + } +} + +static void +__synctask_wake(struct synctask *task) +{ + task->woken = 1; + + if (task->slept) + __run(task); + + pthread_cond_broadcast(&task->env->cond); +} + +void synctask_wake(struct synctask *task) { struct syncenv *env = NULL; @@ -257,13 +308,18 @@ synctask_wake(struct synctask *task) pthread_mutex_lock(&env->mutex); { - task->woken = 1; + if (task->timer != NULL) { + if (gf_timer_call_cancel(task->xl->ctx, task->timer) != 0) { + goto unlock; + } - if (task->slept) - __run(task); + task->timer = NULL; + task->synccond = NULL; + } - pthread_cond_broadcast(&env->cond); + __synctask_wake(task); } +unlock: pthread_mutex_unlock(&env->mutex); } @@ -282,7 +338,7 @@ synctask_wrap(void) task->state = SYNCTASK_DONE; - synctask_yield(task); + synctask_yield(task, NULL); } void @@ -422,11 +478,6 @@ synctask_create(struct syncenv *env, size_t stacksize, synctask_fn_t fn, } synctask_wake(newtask); - /* - * Make sure someone's there to execute anything we just put on the - * run queue. - */ - syncenv_scale(env); return newtask; err: @@ -520,8 +571,12 @@ syncenv_task(struct syncproc *proc) goto unlock; } + env->procs_idle++; + sleep_till.tv_sec = time(NULL) + SYNCPROC_IDLE_TIME; ret = pthread_cond_timedwait(&env->cond, &env->mutex, &sleep_till); + + env->procs_idle--; } task = list_entry(env->runq.next, struct synctask, all_tasks); @@ -540,6 +595,34 @@ unlock: return task; } +static void +synctask_timer(void *data) +{ + struct synctask *task = data; + struct synccond *cond; + + cond = task->synccond; + if (cond != NULL) { + pthread_mutex_lock(&cond->pmutex); + + list_del_init(&task->waitq); + task->synccond = NULL; + + pthread_mutex_unlock(&cond->pmutex); + + task->ret = -ETIMEDOUT; + } + + pthread_mutex_lock(&task->env->mutex); + + gf_timer_call_cancel(task->xl->ctx, task->timer); + task->timer = NULL; + + __synctask_wake(task); + + pthread_mutex_unlock(&task->env->mutex); +} + void synctask_switchto(struct synctask *task) { @@ -572,7 +655,14 @@ synctask_switchto(struct synctask *task) } else { task->slept = 1; __wait(task); + + if (task->delta != NULL) { + task->timer = gf_timer_call_after(task->xl->ctx, *task->delta, + synctask_timer, task); + } } + + task->delta = NULL; } pthread_mutex_unlock(&env->mutex); } @@ -580,65 +670,18 @@ synctask_switchto(struct synctask *task) void * syncenv_processor(void *thdata) { - struct syncenv *env = NULL; struct syncproc *proc = NULL; struct synctask *task = NULL; proc = thdata; - env = proc->env; - - for (;;) { - task = syncenv_task(proc); - if (!task) - break; + while ((task = syncenv_task(proc)) != NULL) { synctask_switchto(task); - - syncenv_scale(env); } return NULL; } -void -syncenv_scale(struct syncenv *env) -{ - int diff = 0; - int scale = 0; - int i = 0; - int ret = 0; - - pthread_mutex_lock(&env->mutex); - { - if (env->procs > env->runcount) - goto unlock; - - scale = env->runcount; - if (scale > env->procmax) - scale = env->procmax; - if (scale > env->procs) - diff = scale - env->procs; - while (diff) { - diff--; - for (; (i < env->procmax); i++) { - if (env->proc[i].processor == 0) - break; - } - - env->proc[i].env = env; - ret = gf_thread_create(&env->proc[i].processor, NULL, - syncenv_processor, &env->proc[i], - "sproc%03hx", env->procs & 0x3ff); - if (ret) - break; - env->procs++; - i++; - } - } -unlock: - pthread_mutex_unlock(&env->mutex); -} - /* The syncenv threads are cleaned up in this routine. */ void @@ -715,12 +758,13 @@ syncenv_new(size_t stacksize, int procmin, int procmax) newenv->stacksize = stacksize; newenv->procmin = procmin; newenv->procmax = procmax; + newenv->procs_idle = 0; for (i = 0; i < newenv->procmin; i++) { newenv->proc[i].env = newenv; ret = gf_thread_create(&newenv->proc[i].processor, NULL, syncenv_processor, &newenv->proc[i], "sproc%d", - newenv->procs); + i); if (ret) break; newenv->procs++; @@ -810,7 +854,7 @@ __synclock_lock(struct synclock *lock) task->woken = 0; list_add_tail(&task->waitq, &lock->waitq); pthread_mutex_unlock(&lock->guard); - synctask_yield(task); + synctask_yield(task, NULL); /* task is removed from waitq in unlock, * under lock->guard.*/ pthread_mutex_lock(&lock->guard); @@ -963,6 +1007,136 @@ synclock_unlock(synclock_t *lock) return ret; } +/* Condition variables */ + +int32_t +synccond_init(synccond_t *cond) +{ + int32_t ret; + + INIT_LIST_HEAD(&cond->waitq); + + ret = pthread_mutex_init(&cond->pmutex, NULL); + if (ret != 0) { + return -ret; + } + + ret = pthread_cond_init(&cond->pcond, NULL); + if (ret != 0) { + pthread_mutex_destroy(&cond->pmutex); + } + + return -ret; +} + +void +synccond_destroy(synccond_t *cond) +{ + pthread_cond_destroy(&cond->pcond); + pthread_mutex_destroy(&cond->pmutex); +} + +int +synccond_timedwait(synccond_t *cond, synclock_t *lock, struct timespec *delta) +{ + struct timespec now; + struct synctask *task = NULL; + int ret; + + task = synctask_get(); + + if (task == NULL) { + if (delta != NULL) { + timespec_now_realtime(&now); + timespec_adjust_delta(&now, *delta); + } + + pthread_mutex_lock(&cond->pmutex); + + if (delta == NULL) { + ret = -pthread_cond_wait(&cond->pcond, &cond->pmutex); + } else { + ret = -pthread_cond_timedwait(&cond->pcond, &cond->pmutex, &now); + } + } else { + pthread_mutex_lock(&cond->pmutex); + + list_add_tail(&task->waitq, &cond->waitq); + task->synccond = cond; + + ret = synclock_unlock(lock); + if (ret == 0) { + pthread_mutex_unlock(&cond->pmutex); + + synctask_yield(task, delta); + + ret = synclock_lock(lock); + if (ret == 0) { + ret = task->ret; + } + task->ret = 0; + + return ret; + } + + list_del_init(&task->waitq); + } + + pthread_mutex_unlock(&cond->pmutex); + + return ret; +} + +int +synccond_wait(synccond_t *cond, synclock_t *lock) +{ + return synccond_timedwait(cond, lock, NULL); +} + +void +synccond_signal(synccond_t *cond) +{ + struct synctask *task; + + pthread_mutex_lock(&cond->pmutex); + + if (!list_empty(&cond->waitq)) { + task = list_first_entry(&cond->waitq, struct synctask, waitq); + list_del_init(&task->waitq); + + pthread_mutex_unlock(&cond->pmutex); + + synctask_wake(task); + } else { + pthread_cond_signal(&cond->pcond); + + pthread_mutex_unlock(&cond->pmutex); + } +} + +void +synccond_broadcast(synccond_t *cond) +{ + struct list_head list; + struct synctask *task; + + INIT_LIST_HEAD(&list); + + pthread_mutex_lock(&cond->pmutex); + + list_splice_init(&cond->waitq, &list); + pthread_cond_broadcast(&cond->pcond); + + pthread_mutex_unlock(&cond->pmutex); + + while (!list_empty(&list)) { + task = list_first_entry(&list, struct synctask, waitq); + list_del_init(&task->waitq); + + synctask_wake(task); + } +} + /* Barriers */ int @@ -1032,7 +1206,7 @@ __syncbarrier_wait(struct syncbarrier *barrier, int waitfor) /* called within a synctask */ list_add_tail(&task->waitq, &barrier->waitq); pthread_mutex_unlock(&barrier->guard); - synctask_yield(task); + synctask_yield(task, NULL); pthread_mutex_lock(&barrier->guard); } else { /* called by a non-synctask */ |