summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-17 12:38:49 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-17 12:38:49 +0200
commit183b49015561890e148a50128c051a1cdd4491b9 (patch)
treeacdfddd9aab9ab51b590489a5216fd6fecd62d79
parent511fd780a25b59b42e93bb2c9ebc03a4991f5c16 (diff)
downloadrsyslog-183b49015561890e148a50128c051a1cdd4491b9.tar.gz
rsyslog-183b49015561890e148a50128c051a1cdd4491b9.tar.xz
rsyslog-183b49015561890e148a50128c051a1cdd4491b9.zip
more code simplification, should also bring some performance enhancement
reducing the number of thread cancellation state changes
-rw-r--r--action.c11
-rw-r--r--runtime/apc.c17
-rw-r--r--runtime/queue.c38
-rw-r--r--runtime/srUtils.h15
-rw-r--r--runtime/stream.c2
-rw-r--r--runtime/wti.c22
-rw-r--r--runtime/wtp.c27
-rw-r--r--runtime/wtp.h4
8 files changed, 45 insertions, 91 deletions
diff --git a/action.c b/action.c
index ab89ffd3..8a29df2e 100644
--- a/action.c
+++ b/action.c
@@ -942,7 +942,6 @@ finalize_it:
static rsRetVal
processBatchMain(action_t *pAction, batch_t *pBatch)
{
- int iCancelStateSave;
DEFiRet;
assert(pBatch != NULL);
@@ -952,10 +951,8 @@ processBatchMain(action_t *pAction, batch_t *pBatch)
* if they notify us they are - functionality not yet implemented...).
* rgerhards, 2008-01-30
*/
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pAction->mutActExec);
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
- pthread_setcancelstate(iCancelStateSave, NULL);
iRet = processAction(pAction, pBatch);
@@ -976,7 +973,6 @@ rsRetVal
actionCallHUPHdlr(action_t *pAction)
{
DEFiRet;
- int iCancelStateSave;
ASSERT(pAction != NULL);
DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP);
@@ -985,10 +981,8 @@ actionCallHUPHdlr(action_t *pAction)
FINALIZE; /* no HUP handler, so we are done ;) */
}
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pAction->mutActExec);
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
- pthread_setcancelstate(iCancelStateSave, NULL);
CHKiRet(pAction->pMod->doHUP(pAction->pModData));
pthread_cleanup_pop(1); /* unlock mutex */
@@ -1233,7 +1227,6 @@ rsRetVal
actionCallAction(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
- int iCancelStateSave;
ISOBJ_TYPE_assert(pMsg, msg);
ASSERT(pAction != NULL);
@@ -1242,15 +1235,11 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
* rgerhards, 2009-06-19
*/
//if(pAction->f_ReduceRepeated == 1) {
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
LockObj(pAction);
pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
iRet = doActionCallAction(pAction, pMsg);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
UnlockObj(pAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */
- pthread_setcancelstate(iCancelStateSave, NULL);
//} else {
//iRet = doActionCallAction(pAction, pMsg);
//}
diff --git a/runtime/apc.c b/runtime/apc.c
index bc330e39..c2f61266 100644
--- a/runtime/apc.c
+++ b/runtime/apc.c
@@ -249,12 +249,11 @@ execScheduled(void)
apc_list_t *pExecList;
apc_list_t *pCurr;
apc_list_t *pNext;
- DEFVARS_mutexProtection_uncond;
DEFiRet;
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex);
+ d_pthread_mutex_lock(&listMutex);
iRet = unlistCurrent(&pExecList);
- END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex);
+ d_pthread_mutex_unlock(&listMutex);
CHKiRet(iRet);
if(pExecList != NULL) {
@@ -290,14 +289,12 @@ ENDobjConstruct(apc)
static rsRetVal
apcConstructFinalize(apc_t *pThis, apc_id_t *pID)
{
- DEFVARS_mutexProtection_uncond;
DEFiRet;
ISOBJ_TYPE_assert(pThis, apc);
assert(pID != NULL);
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex);
+ d_pthread_mutex_lock(&listMutex);
insertApc(pThis, pID);
- END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex);
-RUNLOG_STR("apcConstructFinalize post mutex unlock\n");
+ d_pthread_mutex_unlock(&listMutex);
RETiRet;
}
@@ -333,12 +330,10 @@ SetParam2(apc_t *pThis, void *param2)
static rsRetVal
CancelApc(apc_id_t id)
{
- DEFVARS_mutexProtection_uncond;
-
BEGINfunc
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex);
+ d_pthread_mutex_lock(&listMutex);
deleteApc(id);
- END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex);
+ d_pthread_mutex_unlock(&listMutex);
ENDfunc
return RS_RET_OK;
}
diff --git a/runtime/queue.c b/runtime/queue.c
index e2641c5b..37ec3663 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -75,7 +75,7 @@ static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static int qqueueIsIdleDA(qqueue_t *pThis);
-static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
+static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
/* some constants for queuePersist () */
@@ -471,7 +471,7 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) StartDA));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode));
@@ -1170,7 +1170,6 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr)
static rsRetVal
tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
{
- DEFVARS_mutexProtection_uncond;
struct timespec tTimeout;
rsRetVal iRetLocal;
DEFiRet;
@@ -1178,7 +1177,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut); /* some workers may be running in parallel! */
+ d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */
if(getPhysicalQueueSize(pThis) > 0) {
if(pThis->bRunsDA) {
/* We may have waited on the low water mark. As it may have changed, we
@@ -1187,7 +1186,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
}
}
- END_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut);
+ d_pthread_mutex_unlock(pThis->mut);
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
if(pThis->bRunsDA) {
@@ -1259,7 +1258,6 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
static rsRetVal
tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
{
- DEFVARS_mutexProtection;
struct timespec tTimeout;
rsRetVal iRetLocal;
DEFiRet;
@@ -1280,9 +1278,9 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
timeoutComp(&tTimeout, pThis->toActShutdown);
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */
if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ d_pthread_mutex_unlock(pThis->mut);
dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n");
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
@@ -1293,12 +1291,12 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
"in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
/* we need to re-aquire the mutex for the next check in this case! */
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX);
+ d_pthread_mutex_lock(pThis->mut);
}
if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pqDA->pWtpReg, LOCK_MUTEX) > 0) {
/* and now the same for the DA queue */
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ d_pthread_mutex_unlock(pThis->mut);
dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n");
iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
@@ -1308,8 +1306,9 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue "
"in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
+ } else {
+ d_pthread_mutex_unlock(pThis->mut);
}
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
RETiRet;
}
@@ -1688,7 +1687,7 @@ finalize_it:
* rgerhards, 2009-04-22
*/
static rsRetVal
-DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
{
DEFiRet;
int iQueueSize = 0; /* keep the compiler happy... */
@@ -1714,7 +1713,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
// TODO: MULTI: check physical queue size?
pthread_cond_signal(&pThis->notFull);
d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
@@ -1827,14 +1825,14 @@ RateLimiter(qqueue_t *pThis)
* rgerhards, 2009-05-20
*/
static inline rsRetVal
-DequeueForConsumer(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+DequeueForConsumer(qqueue_t *pThis, wti_t *pWti)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(DequeueConsumable(pThis, pWti));
if(pWti->batch.nElem == 0)
ABORT_FINALIZE(RS_RET_IDLE);
@@ -1869,14 +1867,14 @@ dbgprintf("XXX: batchProcessed deletes %d records\n", pWti->batch.nElemDeq);
* rgerhards, 2008-01-21
*/
static rsRetVal
-ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerReg(qqueue_t *pThis, wti_t *pWti)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
+ CHKiRet(DequeueForConsumer(pThis, pWti));
CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
/* we now need to check if we should deliberately delay processing a bit
@@ -1905,7 +1903,7 @@ dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet
* rgerhards, 2008-01-14
*/
static rsRetVal
-ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerDA(qqueue_t *pThis, wti_t *pWti)
{
int i;
DEFiRet;
@@ -1913,7 +1911,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
+ CHKiRet(DequeueForConsumer(pThis, pWti));
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
@@ -2134,7 +2132,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
diff --git a/runtime/srUtils.h b/runtime/srUtils.h
index 6d5a784b..c4f73e16 100644
--- a/runtime/srUtils.h
+++ b/runtime/srUtils.h
@@ -110,11 +110,9 @@ rsRetVal getFileSize(uchar *pszName, off_t *pSize);
/* some useful constants */
#define DEFVARS_mutexProtection\
- int iCancelStateSave; \
int bLockedOpIsLocked=0
#define BEGIN_MTX_PROTECTED_OPERATIONS(mut, bMustLock) \
if(bMustLock == LOCK_MUTEX) { \
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \
d_pthread_mutex_lock(mut); \
assert(bLockedOpIsLocked == 0); \
bLockedOpIsLocked = 1; \
@@ -123,19 +121,6 @@ rsRetVal getFileSize(uchar *pszName, off_t *pSize);
if(bLockedOpIsLocked) { \
d_pthread_mutex_unlock(mut); \
bLockedOpIsLocked = 0; \
- pthread_setcancelstate(iCancelStateSave, NULL); \
}
-/* The unconditional versions of the macro always lock the mutex. They are preferred in
- * complex scenarios, where the simple ones might get mixed up by multiple calls.
- */
-#define DEFVARS_mutexProtection_uncond\
- int iCancelStateSave
-#define BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(mut) \
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \
- d_pthread_mutex_lock(mut);
-#define END_MTX_PROTECTED_OPERATIONS_UNCOND(mut) \
- d_pthread_mutex_unlock(mut); \
- pthread_setcancelstate(iCancelStateSave, NULL);
-
#endif
diff --git a/runtime/stream.c b/runtime/stream.c
index d09531d1..7c96324a 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -839,7 +839,6 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf)
DEFiRet;
ISOBJ_TYPE_assert(pThis, strm);
-dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf);
while(pThis->iCnt >= STREAM_ASYNC_NUMBUFS)
d_pthread_cond_wait(&pThis->notFull, &pThis->mut);
@@ -850,7 +849,6 @@ dbgprintf("XXX: doAsyncWriteInternal: strm %p, len %ld\n", pThis, (long) lenBuf)
if(++pThis->iCnt == 1)
pthread_cond_signal(&pThis->notEmpty);
-finalize_it:
RETiRet;
}
diff --git a/runtime/wti.c b/runtime/wti.c
index 0ba1fe54..c536e545 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -218,6 +218,8 @@ finalize_it:
/* cancellation cleanup handler for queueWorker ()
* Updates admin structure and frees ressources.
+ * Keep in mind that cancellation is disabled if we run into
+ * the cancel cleanup handler (and have been cancelled).
* rgerhards, 2008-01-16
*/
static void
@@ -225,7 +227,6 @@ wtiWorkerCancelCleanup(void *arg)
{
wti_t *pThis = (wti_t*) arg;
wtp_t *pWtp;
- int iCancelStateSave;
BEGINfunc
ISOBJ_TYPE_assert(pThis, wti);
@@ -237,13 +238,10 @@ wtiWorkerCancelCleanup(void *arg)
/* call user supplied handler (that one e.g. requeues the element) */
pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pWtp->mut);
wtiSetState(pThis, eWRKTHRD_STOPPED, MUTEX_ALREADY_LOCKED);
/* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */
-
d_pthread_mutex_unlock(&pWtp->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
ENDfunc
}
@@ -282,12 +280,12 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
rsRetVal
wtiWorker(wti_t *pThis)
{
- DEFVARS_mutexProtection_uncond;
wtp_t *pWtp; /* our worker thread pool */
int bInactivityTOOccured = 0;
rsRetVal localRet;
rsRetVal terminateRet;
bool bMutexIsLocked;
+ int iCancelStateSave;
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
@@ -297,9 +295,9 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
+ d_pthread_mutex_lock(pWtp->pmutUsr);
pWtp->pfOnWorkerStartup(pWtp->pUsr);
- END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
@@ -308,7 +306,7 @@ wtiWorker(wti_t *pThis)
}
wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
+ d_pthread_mutex_lock(pWtp->pmutUsr);
bMutexIsLocked = TRUE;
/* first check if we are in shutdown process (but evaluate a bit later) */
@@ -323,7 +321,7 @@ wtiWorker(wti_t *pThis)
/* try to execute and process whatever we have */
/* This function must and does RELEASE the MUTEX! */
- localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
+ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
bMutexIsLocked = FALSE;
if(localRet == RS_RET_IDLE) {
@@ -335,9 +333,9 @@ wtiWorker(wti_t *pThis)
/* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */
break; /* end worker thread run */
}
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
+ d_pthread_mutex_lock(pWtp->pmutUsr);
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
- END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
continue; /* request next iteration */
}
@@ -346,7 +344,7 @@ wtiWorker(wti_t *pThis)
/* if we exit the loop, the mutex may be locked and, if so, must be unlocked */
if(bMutexIsLocked) {
- END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
}
/* indicate termination */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 470e0b03..beeaf01c 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -245,7 +245,6 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
{
DEFiRet;
int bTimedOut;
- int iCancelStateSave;
ISOBJ_TYPE_assert(pThis, wtp);
@@ -253,10 +252,8 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
wtpWakeupAllWrkr(pThis);
/* wait for worker thread termination */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_push(mutexCancelCleanup, &pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
@@ -345,7 +342,6 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
uchar *pszDbgHdr;
uchar thrdName[32] = "rs:";
DEFiRet;
- DEFVARS_mutexProtection;
wti_t *pWti = (wti_t*) arg;
wtp_t *pThis;
sigset_t sigSet;
@@ -366,24 +362,21 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
}
# endif
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
-
- /* do some late initialization */
-
+ d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_push(wtpWrkrExecCancelCleanup, pThis);
- /* finally change to RUNNING state. We need to check if we actually should still run,
+ /* change to RUNNING state. We need to check if we actually should still run,
* because someone may have requested us to shut down even before we got a chance to do
* our init. That would be a bad race... -- rgerhards, 2008-01-16
*/
wtiSetState(pWti, eWRKTHRD_RUNNING, MUTEX_ALREADY_LOCKED); /* we are running now! */
do {
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ d_pthread_mutex_unlock(&pThis->mut);
iRet = wtiWorker(pWti); /* just to make sure: this is NOT protected by the mutex! */
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
+ d_pthread_mutex_lock(&pThis->mut);
} while(pThis->iCurNumWrkThrd == 1 && pThis->bInactivityGuard == 1);
/* inactivity guard prevents shutdown of all workers while one should be running due to race
* condition. It can lead to one more worker running than desired, but that is acceptable. After
@@ -398,7 +391,7 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n",
wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd);
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ d_pthread_mutex_unlock(&pThis->mut);
ENDfunc
pthread_exit(0);
@@ -463,7 +456,6 @@ rsRetVal
wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
{
DEFiRet;
- DEFVARS_mutexProtection;
int nMissing; /* number workers missing to run */
int i;
@@ -472,7 +464,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
if(nMaxWrkr == 0)
FINALIZE;
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
+ d_pthread_mutex_lock(&pThis->mut);
if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */
nMaxWrkr = pThis->iNumWorkerThreads;
@@ -494,7 +486,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
finalize_it:
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ d_pthread_mutex_unlock(&pThis->mut);
RETiRet;
}
@@ -510,7 +502,7 @@ DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*))
-DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int))
+DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*))
DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*))
DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*))
@@ -592,6 +584,5 @@ BEGINObjClassInit(wtp, 1, OBJ_IS_CORE_MODULE)
CHKiRet(objUse(glbl, CORE_COMPONENT));
ENDObjClassInit(wtp)
-/*
- * vi:set ai:
+/* vi:set ai:
*/
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 6b4054b8..c933f337 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -70,7 +70,7 @@ struct wtp_s {
rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */
rsRetVal (*pfRateLimiter)(void *pUsr);
rsRetVal (*pfIsIdle)(void *pUsr, wtp_t *pWtp);
- rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int);
+ rsRetVal (*pfDoWork)(void *pUsr, void *pWti);
rsRetVal (*pfOnIdle)(void *pUsr, int);
rsRetVal (*pfOnWorkerCancel)(void *pUsr, void*pWti);
rsRetVal (*pfOnWorkerStartup)(void *pUsr);
@@ -102,7 +102,7 @@ PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));
PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*));
PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*));
-PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int));
+PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*));
PROTOTYPEpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*));
PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*,void*));