diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-10-14 11:01:21 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-10-14 11:01:21 +0200 |
commit | c5408da3d8f17691fb91282d031757ed041fec55 (patch) | |
tree | 4f932e801fac21ddc03616168106ac65411c340b /runtime/wti.c | |
parent | 4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32 (diff) | |
download | rsyslog-c5408da3d8f17691fb91282d031757ed041fec55.tar.gz rsyslog-c5408da3d8f17691fb91282d031757ed041fec55.tar.xz rsyslog-c5408da3d8f17691fb91282d031757ed041fec55.zip |
new queue engine - initial commit (probably not 100% working!)
simplified and thus speeded up the queue engine, also fixed some
potential race conditions (in very unusual shutdown conditions)
along the way. The threading model has seriously changes, so there may
be some regressions.
NOTE: the code passed basic tests, but there is still more work
and testing to be done. This commit should be treated with care.
Diffstat (limited to 'runtime/wti.c')
-rw-r--r-- | runtime/wti.c | 34 |
1 files changed, 3 insertions, 31 deletions
diff --git a/runtime/wti.c b/runtime/wti.c index c3ab0aba..24988cbe 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -189,6 +189,7 @@ finalize_it: * the cancel cleanup handler (and have been cancelled). * rgerhards, 2008-01-16 */ +// TODO: REMOVE THIS FUNCTION, CURRENTLY ONLY PRESENT TO PROVIDE DEBUG OUTPUT -- rgerhards, 2009-10-14 static void wtiWorkerCancelCleanup(void *arg) { @@ -202,9 +203,6 @@ wtiWorkerCancelCleanup(void *arg) DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); - /* call user supplied handler */ - pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp); - ENDfunc } @@ -222,8 +220,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) BEGINfunc DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis)); - pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); - if(pThis->bAlwaysRunning) { /* never shut down any started worker */ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); @@ -235,6 +231,7 @@ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } } + dbgoprint((obj_t*) pThis, "worker awoke from idle processing\n"); ENDfunc } @@ -249,7 +246,6 @@ wtiWorker(wti_t *pThis) int bInactivityTOOccured = 0; rsRetVal localRet; rsRetVal terminateRet; - int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, wti); @@ -259,23 +255,18 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); - pWtp->pfOnWorkerStartup(pWtp->pUsr); - /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */ pWtp->pfRateLimiter(pWtp->pUsr); } -dbgprintf("YYY/ZZZ: pre lock mutex\n"); d_pthread_mutex_lock(pWtp->pmutUsr); -dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); -RUNLOG; +RUNLOG_VAR("%d", terminateRet); if(terminateRet == RS_RET_TERMINATE_NOW) { -RUNLOG; /* we now need to free the old batch */ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", @@ -283,7 +274,6 @@ RUNLOG; d_pthread_mutex_unlock(pWtp->pmutUsr); break; } -RUNLOG; /* try to execute and process whatever we have */ /* Note that this function releases and re-aquires the mutex. The returned @@ -291,41 +281,23 @@ RUNLOG; */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); -dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr); if(localRet == RS_RET_IDLE) { -RUNLOG; if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { d_pthread_mutex_unlock(pWtp->pmutUsr); break; /* end of loop */ } -RUNLOG; doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); -RUNLOG; d_pthread_mutex_unlock(pWtp->pmutUsr); -RUNLOG; continue; /* request next iteration */ } -RUNLOG; d_pthread_mutex_unlock(pWtp->pmutUsr); bInactivityTOOccured = 0; /* reset for next run */ } /* indicate termination */ -RUNLOG; - d_pthread_mutex_lock(pWtp->pmutUsr); -RUNLOG; - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); -RUNLOG; pthread_cleanup_pop(0); /* remove cleanup handler */ -RUNLOG; - pWtp->pfOnWorkerShutdown(pWtp->pUsr); -RUNLOG; - pthread_setcancelstate(iCancelStateSave, NULL); -RUNLOG; - d_pthread_mutex_unlock(pWtp->pmutUsr); -RUNLOG; RETiRet; } |