diff options
Diffstat (limited to 'wti.c')
-rw-r--r-- | wti.c | 53 |
1 files changed, 33 insertions, 20 deletions
@@ -73,18 +73,20 @@ wtiGetDbgHdr(wti_t *pThis) * bite in the long term... -- rgerhards, 2008-01-17 * TODO: may be performance optimized by atomic operations */ -static inline qWrkCmd_t +qWrkCmd_t wtiGetState(wti_t *pThis, int bLockMutex) { DEFVARS_mutexProtection; qWrkCmd_t tCmd; + BEGINfunc ISOBJ_TYPE_assert(pThis, wti); BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); tCmd = pThis->tCurrCmd; END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + ENDfunc return tCmd; } @@ -95,19 +97,20 @@ wtiGetState(wti_t *pThis, int bLockMutex) * in an active state. -- rgerhards, 2008-01-20 */ rsRetVal -wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly) +wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) { DEFiRet; - DEFVARS_mutex_cancelsafeLock; + DEFVARS_mutexProtection; ISOBJ_TYPE_assert(pThis, wti); assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE); - mutex_cancelsafe_lock(&pThis->mut); + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); +RUNLOG_VAR("%d", bActiveOnly); /* all worker states must be followed sequentially, only termination can be set in any state */ if( (bActiveOnly && (pThis->tCurrCmd < eWRKTHRD_RUN_CREATED)) - || (pThis->tCurrCmd > tCmd && tCmd != eWRKTHRD_TERMINATING)) { + || (pThis->tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) { dbgprintf("%s: command %d can not be accepted in current %d processing state - ignored\n", wtiGetDbgHdr(pThis), tCmd, pThis->tCurrCmd); } else { @@ -137,7 +140,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly) pThis->tCurrCmd = tCmd; /* apply the new state */ } - mutex_cancelsafe_unlock(&pThis->mut); + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; } @@ -188,15 +191,17 @@ ENDobjConstruct(wti) rsRetVal wtiConstructFinalize(wti_t *pThis) { + DEFiRet; + ISOBJ_TYPE_assert(pThis, wti); dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); /* initialize our thread instance descriptor */ - pThis->pUsr = NULL; + pThis->pUsrp = NULL; pThis->tCurrCmd = eWRKTHRD_STOPPED; - return RS_RET_OK; + RETiRet; } @@ -211,7 +216,9 @@ wtiJoinThrd(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); dbgprintf("wti: waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd); pthread_join(pThis->thrdID, NULL); - wtiSetState(pThis, eWRKTHRD_STOPPED, 0); /* back to virgin... */ +RUNLOG; + wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */ +RUNLOG; pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ dbgprintf("wti: worker %s has stopped\n", wtiGetDbgHdr(pThis)); @@ -261,6 +268,7 @@ wtiWorkerCancelCleanup(void *arg) wtp_t *pWtp; int iCancelStateSave; + BEGINfunc ISOBJ_TYPE_assert(pThis, wti); pWtp = pThis->pWtp; ISOBJ_TYPE_assert(pWtp, wtp); @@ -268,17 +276,17 @@ wtiWorkerCancelCleanup(void *arg) dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); /* call user supplied handler (that one e.g. requeues the element) */ - pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr); + pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pWtp->mut); - wtiSetState(pThis, eWRKTHRD_TERMINATING, 0); + wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); // TODO: sync access! pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - pthread_cond_signal(&pWtp->condThrdTrm); /* activate anyone waiting on thread shutdown */ d_pthread_mutex_unlock(&pWtp->mut); pthread_setcancelstate(iCancelStateSave, NULL); + ENDfunc } @@ -318,8 +326,11 @@ wtiWorker(wti_t *pThis) pWtp = pThis->pWtp; /* shortcut */ ISOBJ_TYPE_assert(pWtp, wtp); + dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); + pWtp->pfOnWorkerStartup(pWtp->pUsr); + /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd); @@ -349,21 +360,22 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), if(pWtp->toWrkShutdown == -1) { dbgprintf("worker never times out!\n"); // DEL /* never shut down any started worker */ - pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); + d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); } else { timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ - if(pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { + if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { dbgprintf("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis)); bInactivityTOOccured = 1; /* indicate we had a timeout */ } } - dbgprintf("%s: post condwait ->notEmpty\n", wtiGetDbgHdr(pThis)); // DEL + dbgprintf("%s: post condwait ->Busy or timeout\n", wtiGetDbgHdr(pThis)); // DEL END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); continue; /* request next iteration */ } /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */ - pWtp->pfDoWork(pThis, iCancelStateSave); + dbgprintf("%s: calling consumer\n", wtiGetDbgHdr(pThis)); + pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave); /* TODO: move this above into one of the chck Term functions */ //if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0) @@ -371,6 +383,8 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), // " %d messages to process.\n", wtiGetDbgHdr(pThis), pThis->iQueueSize); } + pWtp->pfOnWorkerShutdown(pWtp->pUsr); + /* indicate termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); dbgprintf("%s: worker waiting for mutex\n", wtiGetDbgHdr(pThis)); @@ -391,11 +405,9 @@ dbgprintf("%s: setting termination state\n", wtiGetDbgHdr(pThis)); wtiSetState(pWrkrInst, eWRKTHRD_TERMINATING, 0); } #else - wtiSetState(pThis, eWRKTHRD_TERMINATING, 0); + wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); #endif - // TODO: call, mutex: pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - pthread_cond_signal(&pWtp->condThrdTrm); /* activate anyone waiting on thread shutdown */ d_pthread_mutex_unlock(&pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); @@ -404,6 +416,7 @@ dbgprintf("%s: setting termination state\n", wtiGetDbgHdr(pThis)); /* some simple object access methods */ +DEFpropSetMeth(wti, pWtp, wtp_t*); /* set the debug header message * The passed-in string is duplicated. So if the caller does not need @@ -441,7 +454,7 @@ finalize_it: * rgerhards, 2008-01-09 */ BEGINObjClassInit(wti, 1) /* one is the object version (most important for persisting) */ -ENDObjClassInit(queue) +ENDObjClassInit(wti) /* * vi:set ai: |