diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-17 12:38:49 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-17 12:38:49 +0200 |
commit | 183b49015561890e148a50128c051a1cdd4491b9 (patch) | |
tree | acdfddd9aab9ab51b590489a5216fd6fecd62d79 | |
parent | 511fd780a25b59b42e93bb2c9ebc03a4991f5c16 (diff) | |
download | rsyslog-183b49015561890e148a50128c051a1cdd4491b9.tar.gz rsyslog-183b49015561890e148a50128c051a1cdd4491b9.tar.xz rsyslog-183b49015561890e148a50128c051a1cdd4491b9.zip |
more code simplification, should also bring some performance enhancement
reducing the number of thread cancellation state changes
-rw-r--r-- | action.c | 11 | ||||
-rw-r--r-- | runtime/apc.c | 17 | ||||
-rw-r--r-- | runtime/queue.c | 38 | ||||
-rw-r--r-- | runtime/srUtils.h | 15 | ||||
-rw-r--r-- | runtime/stream.c | 2 | ||||
-rw-r--r-- | runtime/wti.c | 22 | ||||
-rw-r--r-- | runtime/wtp.c | 27 | ||||
-rw-r--r-- | runtime/wtp.h | 4 |
8 files changed, 45 insertions, 91 deletions
@@ -942,7 +942,6 @@ finalize_it: static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch) { - int iCancelStateSave; DEFiRet; assert(pBatch != NULL); @@ -952,10 +951,8 @@ processBatchMain(action_t *pAction, batch_t *pBatch) * if they notify us they are - functionality not yet implemented...). * rgerhards, 2008-01-30 */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pAction->mutActExec); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - pthread_setcancelstate(iCancelStateSave, NULL); iRet = processAction(pAction, pBatch); @@ -976,7 +973,6 @@ rsRetVal actionCallHUPHdlr(action_t *pAction) { DEFiRet; - int iCancelStateSave; ASSERT(pAction != NULL); DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP); @@ -985,10 +981,8 @@ actionCallHUPHdlr(action_t *pAction) FINALIZE; /* no HUP handler, so we are done ;) */ } - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pAction->mutActExec); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - pthread_setcancelstate(iCancelStateSave, NULL); CHKiRet(pAction->pMod->doHUP(pAction->pModData)); pthread_cleanup_pop(1); /* unlock mutex */ @@ -1233,7 +1227,6 @@ rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg) { DEFiRet; - int iCancelStateSave; ISOBJ_TYPE_assert(pMsg, msg); ASSERT(pAction != NULL); @@ -1242,15 +1235,11 @@ actionCallAction(action_t *pAction, msg_t *pMsg) * rgerhards, 2009-06-19 */ //if(pAction->f_ReduceRepeated == 1) { - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); LockObj(pAction); pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); - pthread_setcancelstate(iCancelStateSave, NULL); iRet = doActionCallAction(pAction, pMsg); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); UnlockObj(pAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - pthread_setcancelstate(iCancelStateSave, NULL); //} else { //iRet = doActionCallAction(pAction, pMsg); //} diff --git a/runtime/apc.c b/runtime/apc.c index bc330e39..c2f61266 100644 --- a/runtime/apc.c +++ b/runtime/apc.c @@ -249,12 +249,11 @@ execScheduled(void) apc_list_t *pExecList; apc_list_t *pCurr; apc_list_t *pNext; - DEFVARS_mutexProtection_uncond; DEFiRet; - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + d_pthread_mutex_lock(&listMutex); iRet = unlistCurrent(&pExecList); - END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + d_pthread_mutex_unlock(&listMutex); CHKiRet(iRet); if(pExecList != NULL) { @@ -290,14 +289,12 @@ ENDobjConstruct(apc) static rsRetVal apcConstructFinalize(apc_t *pThis, apc_id_t *pID) { - DEFVARS_mutexProtection_uncond; DEFiRet; ISOBJ_TYPE_assert(pThis, apc); assert(pID != NULL); - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + d_pthread_mutex_lock(&listMutex); insertApc(pThis, pID); - END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); -RUNLOG_STR("apcConstructFinalize post mutex unlock\n"); + d_pthread_mutex_unlock(&listMutex); RETiRet; } @@ -333,12 +330,10 @@ SetParam2(apc_t *pThis, void *param2) static rsRetVal CancelApc(apc_id_t id) { - DEFVARS_mutexProtection_uncond; - BEGINfunc - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + d_pthread_mutex_lock(&listMutex); deleteApc(id); - END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + d_pthread_mutex_unlock(&listMutex); ENDfunc return RS_RET_OK; } diff --git a/runtime/queue.c b/runtime/queue.c index e2641c5b..37ec3663 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -75,7 +75,7 @@ static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); -static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); +static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); /* some constants for queuePersist () */ @@ -471,7 +471,7 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) StartDA)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode)); @@ -1170,7 +1170,6 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr) static rsRetVal tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) { - DEFVARS_mutexProtection_uncond; struct timespec tTimeout; rsRetVal iRetLocal; DEFiRet; @@ -1178,7 +1177,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut); /* some workers may be running in parallel! */ + d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */ if(getPhysicalQueueSize(pThis) > 0) { if(pThis->bRunsDA) { /* We may have waited on the low water mark. As it may have changed, we @@ -1187,7 +1186,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) wtpAdviseMaxWorkers(pThis->pWtpDA, 1); } } - END_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut); + d_pthread_mutex_unlock(pThis->mut); /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ if(pThis->bRunsDA) { @@ -1259,7 +1258,6 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) static rsRetVal tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) { - DEFVARS_mutexProtection; struct timespec tTimeout; rsRetVal iRetLocal; DEFiRet; @@ -1280,9 +1278,9 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ timeoutComp(&tTimeout, pThis->toActShutdown); - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */ if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + d_pthread_mutex_unlock(pThis->mut); dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n"); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { @@ -1293,12 +1291,12 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } /* we need to re-aquire the mutex for the next check in this case! */ - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); + d_pthread_mutex_lock(pThis->mut); } if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pqDA->pWtpReg, LOCK_MUTEX) > 0) { /* and now the same for the DA queue */ - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + d_pthread_mutex_unlock(pThis->mut); dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n"); iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { @@ -1308,8 +1306,9 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue " "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } + } else { + d_pthread_mutex_unlock(pThis->mut); } - END_MTX_PROTECTED_OPERATIONS(pThis->mut); RETiRet; } @@ -1688,7 +1687,7 @@ finalize_it: * rgerhards, 2009-04-22 */ static rsRetVal -DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +DequeueConsumable(qqueue_t *pThis, wti_t *pWti) { DEFiRet; int iQueueSize = 0; /* keep the compiler happy... */ @@ -1714,7 +1713,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) // TODO: MULTI: check physical queue size? pthread_cond_signal(&pThis->notFull); d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { @@ -1827,14 +1825,14 @@ RateLimiter(qqueue_t *pThis) * rgerhards, 2009-05-20 */ static inline rsRetVal -DequeueForConsumer(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) ABORT_FINALIZE(RS_RET_IDLE); @@ -1869,14 +1867,14 @@ dbgprintf("XXX: batchProcessed deletes %d records\n", pWti->batch.nElemDeq); * rgerhards, 2008-01-21 */ static rsRetVal -ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +ConsumerReg(qqueue_t *pThis, wti_t *pWti) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave)); + CHKiRet(DequeueForConsumer(pThis, pWti)); CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); /* we now need to check if we should deliberately delay processing a bit @@ -1905,7 +1903,7 @@ dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet * rgerhards, 2008-01-14 */ static rsRetVal -ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +ConsumerDA(qqueue_t *pThis, wti_t *pWti) { int i; DEFiRet; @@ -1913,7 +1911,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave)); + CHKiRet(DequeueForConsumer(pThis, pWti)); /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs @@ -2134,7 +2132,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg)); CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); diff --git a/runtime/srUtils.h b/runtime/srUtils.h index 6d5a784b..c4f73e16 100644 --- a/runtime/srUtils.h +++ b/runtime/srUtils.h @@ -110,11 +110,9 @@ rsRetVal getFileSize(uchar *pszName, off_t *pSize); /* some useful constants */ #define DEFVARS_mutexProtection\ - int iCancelStateSave; \ int bLockedOpIsLocked=0 #define BEGIN_MTX_PROTECTED_OPERATIONS(mut, bMustLock) \ if(bMustLock == LOCK_MUTEX) { \ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \ d_pthread_mutex_lock(mut); \ assert(bLockedOpIsLocked == 0); \ bLockedOpIsLocked = 1; \ @@ -123,19 +121,6 @@ rsRetVal getFileSize(uchar *pszName, off_t *pSize); if(bLockedOpIsLocked) { \ d_pthread_mutex_unlock(mut); \ bLockedOpIsLocked = 0; \ - pthread_setcancelstate(iCancelStateSave, NULL); \ } -/* The unconditional versions of the macro always lock the mutex. They are preferred in - * complex scenarios, where the simple ones might get mixed up by multiple calls. - */ -#define DEFVARS_mutexProtection_uncond\ - int iCancelStateSave -#define BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(mut) \ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \ - d_pthread_mutex_lock(mut); -#define END_MTX_PROTECTED_OPERATIONS_UNCOND(mut) \ - d_pthread_mutex_unlock(mut); \ - pthread_setcancelstate(iCancelStateSave, NULL); - #endif diff --git a/runtime/stream.c b/runtime/stream.c index d09531d1..7c96324a 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -839,7 +839,6 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) DEFiRet; ISOBJ_TYPE_assert(pThis, strm); -dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf); while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS) d_pthread_cond_wait(&pThis->notFull, &pThis->mut); @@ -850,7 +849,6 @@ dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf) if(++pThis->iCnt == 1) pthread_cond_signal(&pThis->notEmpty); -finalize_it: RETiRet; } diff --git a/runtime/wti.c b/runtime/wti.c index 0ba1fe54..c536e545 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -218,6 +218,8 @@ finalize_it: /* cancellation cleanup handler for queueWorker () * Updates admin structure and frees ressources. + * Keep in mind that cancellation is disabled if we run into + * the cancel cleanup handler (and have been cancelled). * rgerhards, 2008-01-16 */ static void @@ -225,7 +227,6 @@ wtiWorkerCancelCleanup(void *arg) { wti_t *pThis = (wti_t*) arg; wtp_t *pWtp; - int iCancelStateSave; BEGINfunc ISOBJ_TYPE_assert(pThis, wti); @@ -237,13 +238,10 @@ wtiWorkerCancelCleanup(void *arg) /* call user supplied handler (that one e.g. requeues the element) */ pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pWtp->mut); wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED); /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */ - d_pthread_mutex_unlock(&pWtp->mut); - pthread_setcancelstate(iCancelStateSave, NULL); ENDfunc } @@ -282,12 +280,12 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) rsRetVal wtiWorker(wti_t *pThis) { - DEFVARS_mutexProtection_uncond; wtp_t *pWtp; /* our worker thread pool */ int bInactivityTOOccured = 0; rsRetVal localRet; rsRetVal terminateRet; bool bMutexIsLocked; + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, wti); @@ -297,9 +295,9 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); + d_pthread_mutex_lock(pWtp->pmutUsr); pWtp->pfOnWorkerStartup(pWtp->pUsr); - END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); + d_pthread_mutex_unlock(pWtp->pmutUsr); /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ @@ -308,7 +306,7 @@ wtiWorker(wti_t *pThis) } wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */ - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); + d_pthread_mutex_lock(pWtp->pmutUsr); bMutexIsLocked = TRUE; /* first check if we are in shutdown process (but evaluate a bit later) */ @@ -323,7 +321,7 @@ wtiWorker(wti_t *pThis) /* try to execute and process whatever we have */ /* This function must and does RELEASE the MUTEX! */ - localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave); + localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); bMutexIsLocked = FALSE; if(localRet == RS_RET_IDLE) { @@ -335,9 +333,9 @@ wtiWorker(wti_t *pThis) /* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */ break; /* end worker thread run */ } - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); + d_pthread_mutex_lock(pWtp->pmutUsr); doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); - END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); + d_pthread_mutex_unlock(pWtp->pmutUsr); continue; /* request next iteration */ } @@ -346,7 +344,7 @@ wtiWorker(wti_t *pThis) /* if we exit the loop, the mutex may be locked and, if so, must be unlocked */ if(bMutexIsLocked) { - END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr); + d_pthread_mutex_unlock(pWtp->pmutUsr); } /* indicate termination */ diff --git a/runtime/wtp.c b/runtime/wtp.c index 470e0b03..beeaf01c 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -245,7 +245,6 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout { DEFiRet; int bTimedOut; - int iCancelStateSave; ISOBJ_TYPE_assert(pThis, wtp); @@ -253,10 +252,8 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout wtpWakeupAllWrkr(pThis); /* wait for worker thread termination */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pThis->mut); pthread_cleanup_push(mutexCancelCleanup, &pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", @@ -345,7 +342,6 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in uchar *pszDbgHdr; uchar thrdName[32] = "rs:"; DEFiRet; - DEFVARS_mutexProtection; wti_t *pWti = (wti_t*) arg; wtp_t *pThis; sigset_t sigSet; @@ -366,24 +362,21 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in } # endif - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX); - - /* do some late initialization */ - + d_pthread_mutex_lock(&pThis->mut); pthread_cleanup_push(wtpWrkrExecCancelCleanup, pThis); - /* finally change to RUNNING state. We need to check if we actually should still run, + /* change to RUNNING state. We need to check if we actually should still run, * because someone may have requested us to shut down even before we got a chance to do * our init. That would be a bad race... -- rgerhards, 2008-01-16 */ wtiSetState(pWti, eWRKTHRD_RUNNING, MUTEX_ALREADY_LOCKED); /* we are running now! */ do { - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + d_pthread_mutex_unlock(&pThis->mut); iRet = wtiWorker(pWti); /* just to make sure: this is NOT protected by the mutex! */ - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX); + d_pthread_mutex_lock(&pThis->mut); } while(pThis->iCurNumWrkThrd == 1 && pThis->bInactivityGuard == 1); /* inactivity guard prevents shutdown of all workers while one should be running due to race * condition. It can lead to one more worker running than desired, but that is acceptable. After @@ -398,7 +391,7 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n", wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd); - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + d_pthread_mutex_unlock(&pThis->mut); ENDfunc pthread_exit(0); @@ -463,7 +456,6 @@ rsRetVal wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) { DEFiRet; - DEFVARS_mutexProtection; int nMissing; /* number workers missing to run */ int i; @@ -472,7 +464,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) if(nMaxWrkr == 0) FINALIZE; - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX); + d_pthread_mutex_lock(&pThis->mut); if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */ nMaxWrkr = pThis->iNumWorkerThreads; @@ -494,7 +486,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) finalize_it: - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + d_pthread_mutex_unlock(&pThis->mut); RETiRet; } @@ -510,7 +502,7 @@ DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*)) -DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)) +DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*)) DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*)) DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*)) @@ -592,6 +584,5 @@ BEGINObjClassInit(wtp, 1, OBJ_IS_CORE_MODULE) CHKiRet(objUse(glbl, CORE_COMPONENT)); ENDObjClassInit(wtp) -/* - * vi:set ai: +/* vi:set ai: */ diff --git a/runtime/wtp.h b/runtime/wtp.h index 6b4054b8..c933f337 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -70,7 +70,7 @@ struct wtp_s { rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */ rsRetVal (*pfRateLimiter)(void *pUsr); rsRetVal (*pfIsIdle)(void *pUsr, wtp_t *pWtp); - rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int); + rsRetVal (*pfDoWork)(void *pUsr, void *pWti); rsRetVal (*pfOnIdle)(void *pUsr, int); rsRetVal (*pfOnWorkerCancel)(void *pUsr, void*pWti); rsRetVal (*pfOnWorkerStartup)(void *pUsr); @@ -102,7 +102,7 @@ PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)); PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)); PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*)); -PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)); +PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*)); PROTOTYPEpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*)); PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*,void*)); |