summaryrefslogtreecommitdiffstats
path: root/runtime/wti.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-10-14 11:01:21 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-10-14 11:01:21 +0200
commitc5408da3d8f17691fb91282d031757ed041fec55 (patch)
tree4f932e801fac21ddc03616168106ac65411c340b /runtime/wti.c
parent4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32 (diff)
downloadrsyslog-c5408da3d8f17691fb91282d031757ed041fec55.tar.gz
rsyslog-c5408da3d8f17691fb91282d031757ed041fec55.tar.xz
rsyslog-c5408da3d8f17691fb91282d031757ed041fec55.zip
new queue engine - initial commit (probably not 100% working!)
simplified and thus speeded up the queue engine, also fixed some potential race conditions (in very unusual shutdown conditions) along the way. The threading model has seriously changes, so there may be some regressions. NOTE: the code passed basic tests, but there is still more work and testing to be done. This commit should be treated with care.
Diffstat (limited to 'runtime/wti.c')
-rw-r--r--runtime/wti.c34
1 files changed, 3 insertions, 31 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
index c3ab0aba..24988cbe 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -189,6 +189,7 @@ finalize_it:
* the cancel cleanup handler (and have been cancelled).
* rgerhards, 2008-01-16
*/
+// TODO: REMOVE THIS FUNCTION, CURRENTLY ONLY PRESENT TO PROVIDE DEBUG OUTPUT -- rgerhards, 2009-10-14
static void
wtiWorkerCancelCleanup(void *arg)
{
@@ -202,9 +203,6 @@ wtiWorkerCancelCleanup(void *arg)
DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
- /* call user supplied handler */
- pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp);
-
ENDfunc
}
@@ -222,8 +220,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
BEGINfunc
DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
- pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
-
if(pThis->bAlwaysRunning) {
/* never shut down any started worker */
dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr);
@@ -235,6 +231,7 @@ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr);
*pbInactivityTOOccured = 1; /* indicate we had a timeout */
}
}
+ dbgoprint((obj_t*) pThis, "worker awoke from idle processing\n");
ENDfunc
}
@@ -249,7 +246,6 @@ wtiWorker(wti_t *pThis)
int bInactivityTOOccured = 0;
rsRetVal localRet;
rsRetVal terminateRet;
- int iCancelStateSave;
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
@@ -259,23 +255,18 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
- pWtp->pfOnWorkerStartup(pWtp->pUsr);
-
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */
pWtp->pfRateLimiter(pWtp->pUsr);
}
-dbgprintf("YYY/ZZZ: pre lock mutex\n");
d_pthread_mutex_lock(pWtp->pmutUsr);
-dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr);
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
-RUNLOG;
+RUNLOG_VAR("%d", terminateRet);
if(terminateRet == RS_RET_TERMINATE_NOW) {
-RUNLOG;
/* we now need to free the old batch */
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n",
@@ -283,7 +274,6 @@ RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
break;
}
-RUNLOG;
/* try to execute and process whatever we have */
/* Note that this function releases and re-aquires the mutex. The returned
@@ -291,41 +281,23 @@ RUNLOG;
*/
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
-dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr);
if(localRet == RS_RET_IDLE) {
-RUNLOG;
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
d_pthread_mutex_unlock(pWtp->pmutUsr);
break; /* end of loop */
}
-RUNLOG;
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
-RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
-RUNLOG;
continue; /* request next iteration */
}
-RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
bInactivityTOOccured = 0; /* reset for next run */
}
/* indicate termination */
-RUNLOG;
- d_pthread_mutex_lock(pWtp->pmutUsr);
-RUNLOG;
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
-RUNLOG;
pthread_cleanup_pop(0); /* remove cleanup handler */
-RUNLOG;
- pWtp->pfOnWorkerShutdown(pWtp->pUsr);
-RUNLOG;
- pthread_setcancelstate(iCancelStateSave, NULL);
-RUNLOG;
- d_pthread_mutex_unlock(pWtp->pmutUsr);
-RUNLOG;
RETiRet;
}