diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-10-14 11:01:21 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-10-14 11:01:21 +0200 |
commit | c5408da3d8f17691fb91282d031757ed041fec55 (patch) | |
tree | 4f932e801fac21ddc03616168106ac65411c340b /runtime/wtp.c | |
parent | 4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32 (diff) | |
download | rsyslog-c5408da3d8f17691fb91282d031757ed041fec55.tar.gz rsyslog-c5408da3d8f17691fb91282d031757ed041fec55.tar.xz rsyslog-c5408da3d8f17691fb91282d031757ed041fec55.zip |
new queue engine - initial commit (probably not 100% working!)
simplified and thus speeded up the queue engine, also fixed some
potential race conditions (in very unusual shutdown conditions)
along the way. The threading model has seriously changes, so there may
be some regressions.
NOTE: the code passed basic tests, but there is still more work
and testing to be done. This commit should be treated with care.
Diffstat (limited to 'runtime/wtp.c')
-rw-r--r-- | runtime/wtp.c | 35 |
1 files changed, 7 insertions, 28 deletions
diff --git a/runtime/wtp.c b/runtime/wtp.c index 93234819..3e76bb56 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -94,13 +94,8 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; pThis->pfGetDeqBatchSize = NotImplementedDummy; - pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; pThis->pfObjProcessed = NotImplementedDummy; - pThis->pfOnIdle = NotImplementedDummy; - pThis->pfOnWorkerCancel = NotImplementedDummy; - pThis->pfOnWorkerStartup = NotImplementedDummy; - pThis->pfOnWorkerShutdown = NotImplementedDummy; ENDobjConstruct(wtp) @@ -160,22 +155,6 @@ CODESTARTobjDestruct(wtp) ENDobjDestruct(wtp) -/* wake up all worker threads. - * rgerhards, 2008-01-16 - */ -rsRetVal -wtpWakeupAllWrkr(wtp_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, wtp); - //d_pthread_mutex_lock(pThis->pmutUsr); - pthread_cond_broadcast(pThis->pcondBusy); - //d_pthread_mutex_unlock(pThis->pmutUsr); - RETiRet; -} - - /* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21 * We do not need to do atomic instructions as set operations are only * called when terminating the pool, and then in strict sequence. So we @@ -211,8 +190,10 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex) wtpState = ATOMIC_FETCH_32BIT(pThis->wtpState); if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) { +RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_NOW"); ABORT_FINALIZE(RS_RET_TERMINATE_NOW); } else if(wtpState == wtpState_SHUTDOWN) { +RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_WHEN_IDLE"); ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); } @@ -241,8 +222,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout ISOBJ_TYPE_assert(pThis, wtp); + /* lock mutex to prevent races (may otherwise happen during idle processing and such...) */ + d_pthread_mutex_lock(pThis->pmutUsr); wtpSetState(pThis, tShutdownCmd); - wtpWakeupAllWrkr(pThis); + pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */ + d_pthread_mutex_unlock(pThis->pmutUsr); /* wait for worker thread termination */ d_pthread_mutex_lock(&pThis->mutWtp); @@ -430,7 +414,7 @@ dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, p CHKiRet(wtpStartWrkr(pThis)); } } else { -dbgprintf("YYY: adivse signal cond busy"); +dbgprintf("YYY: wtpAdviseMaxWorkers, sufficient workers, just doing adivse signal cond busy\n"); pthread_cond_signal(pThis->pcondBusy); } @@ -450,13 +434,8 @@ DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) -DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*)) DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*)) DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*)) -DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) -DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*)) -DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)) -DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*)) /* set the debug header message |