summaryrefslogtreecommitdiffstats
path: root/runtime/wti.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wti.c')
-rw-r--r--runtime/wti.c125
1 files changed, 24 insertions, 101 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
index c536e545..b6a09c65 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -76,74 +76,32 @@ wtiGetDbgHdr(wti_t *pThis)
}
-/* get the current worker state. For simplicity and speed, we have
- * NOT used our regular calling interface this time. I hope that won't
- * bite in the long term... -- rgerhards, 2008-01-17
- * TODO: may be performance optimized by atomic operations
+/* return the current worker processing state. For the sake of
+ * simplicity, we do not use the iRet interface. -- rgerhards, 2009-07-17
*/
-qWrkCmd_t
-wtiGetState(wti_t *pThis, int bLockMutex)
+bool
+wtiGetState(wti_t *pThis)
{
- DEFVARS_mutexProtection;
- qWrkCmd_t tCmd;
-
- BEGINfunc
- ISOBJ_TYPE_assert(pThis, wti);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- tCmd = pThis->tCurrCmd;
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
-
- ENDfunc
- return tCmd;
+ return pThis->bIsRunning;
}
-/* send a command to a specific thread
- * rgerhards, 2008-01-20
+/* Set status (thread is running or not), actually an property of
+ * use for wtp, but we need to have it per thread instance (thus it
+ * is inside wti). -- rgerhards, 2009-07-17
*/
rsRetVal
-wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bLockMutex)
+wtiSetState(wti_t *pThis, bool bNewVal)
{
- DEFiRet;
- qWrkCmd_t tCurrCmd;
- DEFVARS_mutexProtection;
-
ISOBJ_TYPE_assert(pThis, wti);
- assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
-
- tCurrCmd = pThis->tCurrCmd;
- /* all worker states must be followed sequentially, only termination can be set in any state */
- if(tCurrCmd > tCmd && !(tCmd == eWRKTHRD_STOPPED)) {
- DBGPRINTF("%s: command %d can not be accepted in current %d processing state - ignored\n",
- wtiGetDbgHdr(pThis), tCmd, tCurrCmd);
- } else {
- DBGPRINTF("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd);
- /* we could replace this with a simple if, but we leave the switch in in case we need
- * to add something at a later stage. -- rgerhards, 2008-09-30
- */
- if(tCmd == eWRKTHRD_STOPPED) {
- dbgprintf("%s: worker almost stopped, assuming it has\n", wtiGetDbgHdr(pThis));
- pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
- }
- /* apply the new state */
-dbgprintf("worker terminator will write stateval %d\n", tCmd);
- unsigned val = ATOMIC_CAS_VAL(pThis->tCurrCmd, tCurrCmd, tCmd);
- if(val != tCurrCmd) {
- DBGPRINTF("wtiSetState PROBLEM, tCurrCmd %d overwritten with %d, wanted to set %d\n", tCurrCmd, val, tCmd);
- }
- }
-
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
- RETiRet;
+ pThis->bIsRunning = bNewVal;
+ return RS_RET_OK;
}
-/* Cancel the thread. If the thread is already cancelled or terminated,
- * we do not again cancel it. But it is save and legal to call wtiCancelThrd() in
- * such situations.
+/* Cancel the thread. If the thread is not running. But it is save and legal to
+ * call wtiCancelThrd() in such situations.
+ * IMPORTANT: WTP mutex must be locked while this function is called!
* rgerhards, 2008-02-26
*/
rsRetVal
@@ -153,17 +111,11 @@ wtiCancelThrd(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
- d_pthread_mutex_lock(&pThis->mut);
-
- if(pThis->tCurrCmd != eWRKTHRD_STOPPED) {
- dbgoprint((obj_t*) pThis, "canceling worker thread, curr stat %d\n", pThis->tCurrCmd);
+ if(pThis->bIsRunning) {
+ dbgoprint((obj_t*) pThis, "canceling worker thread\n");
pthread_cancel(pThis->thrdID);
- /* TODO: check: the following check should automatically be done by cancel cleanup handler! 2009-07-08 rgerhards */
- wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED);
}
- d_pthread_mutex_unlock(&pThis->mut);
-
RETiRet;
}
@@ -171,14 +123,7 @@ wtiCancelThrd(wti_t *pThis)
/* Destructor */
BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(wti)
- if(Debug && wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) {
- dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - ignoring\n",
- wtiGetDbgHdr(pThis), pThis);
- }
-
/* actual destruction */
- pthread_mutex_destroy(&pThis->mut);
-
free(pThis->batch.pElem);
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -187,7 +132,6 @@ ENDobjDestruct(wti)
/* Standard-Constructor for the wti object
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
- pthread_mutex_init(&pThis->mut, NULL);
ENDobjConstruct(wti)
@@ -205,7 +149,7 @@ wtiConstructFinalize(wti_t *pThis)
dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
/* initialize our thread instance descriptor */
- pThis->tCurrCmd = eWRKTHRD_STOPPED;
+ pThis->bIsRunning = FALSE;
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
@@ -238,17 +182,13 @@ wtiWorkerCancelCleanup(void *arg)
/* call user supplied handler (that one e.g. requeues the element) */
pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp);
- d_pthread_mutex_lock(&pWtp->mut);
- wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED);
- /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */
- d_pthread_mutex_unlock(&pWtp->mut);
ENDfunc
}
/* wait for queue to become non-empty or timeout
- * helper to wtiWorker
- * IMPORTANT: mutex must be locked when this code is called!
+ * helper to wtiWorker. Note the the predicate is
+ * re-tested by the caller, so it is OK to NOT do it here.
* rgerhards, 2009-05-20
*/
static inline void
@@ -258,8 +198,10 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
BEGINfunc
DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
+
pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
+ d_pthread_mutex_lock(pWtp->pmutUsr);
if(pWtp->toWrkShutdown == -1) {
/* never shut down any started worker */
d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
@@ -270,6 +212,7 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
*pbInactivityTOOccured = 1; /* indicate we had a timeout */
}
}
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
ENDfunc
}
@@ -284,7 +227,6 @@ wtiWorker(wti_t *pThis)
int bInactivityTOOccured = 0;
rsRetVal localRet;
rsRetVal terminateRet;
- bool bMutexIsLocked;
int iCancelStateSave;
DEFiRet;
@@ -305,9 +247,7 @@ wtiWorker(wti_t *pThis)
pWtp->pfRateLimiter(pWtp->pUsr);
}
- wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
d_pthread_mutex_lock(pWtp->pmutUsr);
- bMutexIsLocked = TRUE;
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED);
@@ -316,46 +256,29 @@ wtiWorker(wti_t *pThis)
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n",
localRet);
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
break;
}
/* try to execute and process whatever we have */
/* This function must and does RELEASE the MUTEX! */
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
- bMutexIsLocked = FALSE;
if(localRet == RS_RET_IDLE) {
- if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE) {
+ if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
break; /* end of loop */
}
-
- if(bInactivityTOOccured) {
- /* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */
- break; /* end worker thread run */
- }
- d_pthread_mutex_lock(pWtp->pmutUsr);
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
- d_pthread_mutex_unlock(pWtp->pmutUsr);
continue; /* request next iteration */
}
bInactivityTOOccured = 0; /* reset for next run */
}
- /* if we exit the loop, the mutex may be locked and, if so, must be unlocked */
- if(bMutexIsLocked) {
- d_pthread_mutex_unlock(pWtp->pmutUsr);
- }
-
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_pop(0); /* remove cleanup handler */
-
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
-
- wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED);
- d_pthread_mutex_unlock(&pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
RETiRet;