summaryrefslogtreecommitdiffstats
path: root/runtime/wtp.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-10-14 11:01:21 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-10-14 11:01:21 +0200
commitc5408da3d8f17691fb91282d031757ed041fec55 (patch)
tree4f932e801fac21ddc03616168106ac65411c340b /runtime/wtp.c
parent4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32 (diff)
downloadrsyslog-c5408da3d8f17691fb91282d031757ed041fec55.tar.gz
rsyslog-c5408da3d8f17691fb91282d031757ed041fec55.tar.xz
rsyslog-c5408da3d8f17691fb91282d031757ed041fec55.zip
new queue engine - initial commit (probably not 100% working!)
simplified and thus speeded up the queue engine, also fixed some potential race conditions (in very unusual shutdown conditions) along the way. The threading model has seriously changes, so there may be some regressions. NOTE: the code passed basic tests, but there is still more work and testing to be done. This commit should be treated with care.
Diffstat (limited to 'runtime/wtp.c')
-rw-r--r--runtime/wtp.c35
1 files changed, 7 insertions, 28 deletions
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 93234819..3e76bb56 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -94,13 +94,8 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
/* set all function pointers to "not implemented" dummy so that we can safely call them */
pThis->pfChkStopWrkr = NotImplementedDummy;
pThis->pfGetDeqBatchSize = NotImplementedDummy;
- pThis->pfIsIdle = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
pThis->pfObjProcessed = NotImplementedDummy;
- pThis->pfOnIdle = NotImplementedDummy;
- pThis->pfOnWorkerCancel = NotImplementedDummy;
- pThis->pfOnWorkerStartup = NotImplementedDummy;
- pThis->pfOnWorkerShutdown = NotImplementedDummy;
ENDobjConstruct(wtp)
@@ -160,22 +155,6 @@ CODESTARTobjDestruct(wtp)
ENDobjDestruct(wtp)
-/* wake up all worker threads.
- * rgerhards, 2008-01-16
- */
-rsRetVal
-wtpWakeupAllWrkr(wtp_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, wtp);
- //d_pthread_mutex_lock(pThis->pmutUsr);
- pthread_cond_broadcast(pThis->pcondBusy);
- //d_pthread_mutex_unlock(pThis->pmutUsr);
- RETiRet;
-}
-
-
/* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21
* We do not need to do atomic instructions as set operations are only
* called when terminating the pool, and then in strict sequence. So we
@@ -211,8 +190,10 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex)
wtpState = ATOMIC_FETCH_32BIT(pThis->wtpState);
if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
+RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_NOW");
ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
} else if(wtpState == wtpState_SHUTDOWN) {
+RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_WHEN_IDLE");
ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE);
}
@@ -241,8 +222,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
ISOBJ_TYPE_assert(pThis, wtp);
+ /* lock mutex to prevent races (may otherwise happen during idle processing and such...) */
+ d_pthread_mutex_lock(pThis->pmutUsr);
wtpSetState(pThis, tShutdownCmd);
- wtpWakeupAllWrkr(pThis);
+ pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */
+ d_pthread_mutex_unlock(pThis->pmutUsr);
/* wait for worker thread termination */
d_pthread_mutex_lock(&pThis->mutWtp);
@@ -430,7 +414,7 @@ dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, p
CHKiRet(wtpStartWrkr(pThis));
}
} else {
-dbgprintf("YYY: adivse signal cond busy");
+dbgprintf("YYY: wtpAdviseMaxWorkers, sufficient workers, just doing adivse signal cond busy\n");
pthread_cond_signal(pThis->pcondBusy);
}
@@ -450,13 +434,8 @@ DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t)
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
-DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*))
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*))
DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*))
-DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
-DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*))
-DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*))
-DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*))
/* set the debug header message