summaryrefslogtreecommitdiffstats
path: root/runtime/wti.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-17 18:40:28 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-17 18:40:28 +0200
commit4c9eded44dbae1701bb3b8f255865892b19e7f72 (patch)
tree0610fa71ff86cba87cce2e1c6c36df26c854ae0b /runtime/wti.c
parent183b49015561890e148a50128c051a1cdd4491b9 (diff)
downloadrsyslog-4c9eded44dbae1701bb3b8f255865892b19e7f72.tar.gz
rsyslog-4c9eded44dbae1701bb3b8f255865892b19e7f72.tar.xz
rsyslog-4c9eded44dbae1701bb3b8f255865892b19e7f72.zip
further code simplification
... could even remove one mutex by using a better algorithm. I think I also spotted some situation in which a hang could have happened. As I can't fix it in v4 and less without moving to the new engine, I make no effort in testing this out. Hangs occur during shutdown, only (if at all). The code changes should also result in some mild performance improvement. Some bug potential, but overall the bug potential should have been greatly reduced.
Diffstat (limited to 'runtime/wti.c')
-rw-r--r--runtime/wti.c125
1 files changed, 24 insertions, 101 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
index c536e545..b6a09c65 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -76,74 +76,32 @@ wtiGetDbgHdr(wti_t *pThis)
}
-/* get the current worker state. For simplicity and speed, we have
- * NOT used our regular calling interface this time. I hope that won't
- * bite in the long term... -- rgerhards, 2008-01-17
- * TODO: may be performance optimized by atomic operations
+/* return the current worker processing state. For the sake of
+ * simplicity, we do not use the iRet interface. -- rgerhards, 2009-07-17
*/
-qWrkCmd_t
-wtiGetState(wti_t *pThis, int bLockMutex)
+bool
+wtiGetState(wti_t *pThis)
{
- DEFVARS_mutexProtection;
- qWrkCmd_t tCmd;
-
- BEGINfunc
- ISOBJ_TYPE_assert(pThis, wti);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- tCmd = pThis->tCurrCmd;
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
-
- ENDfunc
- return tCmd;
+ return pThis->bIsRunning;
}
-/* send a command to a specific thread
- * rgerhards, 2008-01-20
+/* Set status (thread is running or not), actually an property of
+ * use for wtp, but we need to have it per thread instance (thus it
+ * is inside wti). -- rgerhards, 2009-07-17
*/
rsRetVal
-wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bLockMutex)
+wtiSetState(wti_t *pThis, bool bNewVal)
{
- DEFiRet;
- qWrkCmd_t tCurrCmd;
- DEFVARS_mutexProtection;
-
ISOBJ_TYPE_assert(pThis, wti);
- assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
-
- tCurrCmd = pThis->tCurrCmd;
- /* all worker states must be followed sequentially, only termination can be set in any state */
- if(tCurrCmd > tCmd && !(tCmd == eWRKTHRD_STOPPED)) {
- DBGPRINTF("%s: command %d can not be accepted in current %d processing state - ignored\n",
- wtiGetDbgHdr(pThis), tCmd, tCurrCmd);
- } else {
- DBGPRINTF("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd);
- /* we could replace this with a simple if, but we leave the switch in in case we need
- * to add something at a later stage. -- rgerhards, 2008-09-30
- */
- if(tCmd == eWRKTHRD_STOPPED) {
- dbgprintf("%s: worker almost stopped, assuming it has\n", wtiGetDbgHdr(pThis));
- pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
- }
- /* apply the new state */
-dbgprintf("worker terminator will write stateval %d\n", tCmd);
- unsigned val = ATOMIC_CAS_VAL(pThis->tCurrCmd, tCurrCmd, tCmd);
- if(val != tCurrCmd) {
- DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd);
- }
- }
-
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
- RETiRet;
+ pThis->bIsRunning = bNewVal;
+ return RS_RET_OK;
}
-/* Cancel the thread. If the thread is already cancelled or terminated,
- * we do not again cancel it. But it is save and legal to call wtiCancelThrd() in
- * such situations.
+/* Cancel the thread. If the thread is not running. But it is save and legal to
+ * call wtiCancelThrd() in such situations.
+ * IMPORTANT: WTP mutex must be locked while this function is called!
* rgerhards, 2008-02-26
*/
rsRetVal
@@ -153,17 +111,11 @@ wtiCancelThrd(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
- d_pthread_mutex_lock(&pThis->mut);
-
- if(pThis->tCurrCmd != eWRKTHRD_STOPPED) {
- dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd);
+ if(pThis->bIsRunning) {
+ dbgoprint((obj_t*) pThis, "canceling worker thread\n");
pthread_cancel(pThis->thrdID);
- /* TODO: check: the following check should automatically be done by cancel cleanup handler! 2009-07-08 rgerhards */
- wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED);
}
- d_pthread_mutex_unlock(&pThis->mut);
-
RETiRet;
}
@@ -171,14 +123,7 @@ wtiCancelThrd(wti_t *pThis)
/* Destructor */
BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(wti)
- if(Debug && wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) {
- dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - ignoring\n",
- wtiGetDbgHdr(pThis), pThis);
- }
-
/* actual destruction */
- pthread_mutex_destroy(&pThis->mut);
-
free(pThis->batch.pElem);
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -187,7 +132,6 @@ ENDobjDestruct(wti)
/* Standard-Constructor for the wti object
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
- pthread_mutex_init(&pThis->mut, NULL);
ENDobjConstruct(wti)
@@ -205,7 +149,7 @@ wtiConstructFinalize(wti_t *pThis)
dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
/* initialize our thread instance descriptor */
- pThis->tCurrCmd = eWRKTHRD_STOPPED;
+ pThis->bIsRunning = FALSE;
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
@@ -238,17 +182,13 @@ wtiWorkerCancelCleanup(void *arg)
/* call user supplied handler (that one e.g. requeues the element) */
pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp);
- d_pthread_mutex_lock(&pWtp->mut);
- wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED);
- /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */
- d_pthread_mutex_unlock(&pWtp->mut);
ENDfunc
}
/* wait for queue to become non-empty or timeout
- * helper to wtiWorker
- * IMPORTANT: mutex must be locked when this code is called!
+ * helper to wtiWorker. Note the the predicate is
+ * re-tested by the caller, so it is OK to NOT do it here.
* rgerhards, 2009-05-20
*/
static inline void
@@ -258,8 +198,10 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
BEGINfunc
DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
+
pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
+ d_pthread_mutex_lock(pWtp->pmutUsr);
if(pWtp->toWrkShutdown == -1) {
/* never shut down any started worker */
d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
@@ -270,6 +212,7 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
*pbInactivityTOOccured = 1; /* indicate we had a timeout */
}
}
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
ENDfunc
}
@@ -284,7 +227,6 @@ wtiWorker(wti_t *pThis)
int bInactivityTOOccured = 0;
rsRetVal localRet;
rsRetVal terminateRet;
- bool bMutexIsLocked;
int iCancelStateSave;
DEFiRet;
@@ -305,9 +247,7 @@ wtiWorker(wti_t *pThis)
pWtp->pfRateLimiter(pWtp->pUsr);
}
- wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
d_pthread_mutex_lock(pWtp->pmutUsr);
- bMutexIsLocked = TRUE;
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED);
@@ -316,46 +256,29 @@ wtiWorker(wti_t *pThis)
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n",
localRet);
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
break;
}
/* try to execute and process whatever we have */
/* This function must and does RELEASE the MUTEX! */
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
- bMutexIsLocked = FALSE;
if(localRet == RS_RET_IDLE) {
- if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE) {
+ if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
break; /* end of loop */
}
-
- if(bInactivityTOOccured) {
- /* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */
- break; /* end worker thread run */
- }
- d_pthread_mutex_lock(pWtp->pmutUsr);
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
- d_pthread_mutex_unlock(pWtp->pmutUsr);
continue; /* request next iteration */
}
bInactivityTOOccured = 0; /* reset for next run */
}
- /* if we exit the loop, the mutex may be locked and, if so, must be unlocked */
- if(bMutexIsLocked) {
- d_pthread_mutex_unlock(pWtp->pmutUsr);
- }
-
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_pop(0); /* remove cleanup handler */
-
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
-
- wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED);
- d_pthread_mutex_unlock(&pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
RETiRet;