From 183b49015561890e148a50128c051a1cdd4491b9 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 17 Jul 2009 12:38:49 +0200 Subject: more code simplification, should also bring some performance enhancement reducing the number of thread cancellation state changes --- runtime/queue.c | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index e2641c5b..37ec3663 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -75,7 +75,7 @@ static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); -static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); +static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); /* some constants for queuePersist () */ @@ -471,7 +471,7 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) StartDA)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode)); @@ -1170,7 +1170,6 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr) static rsRetVal tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) { - DEFVARS_mutexProtection_uncond; struct timespec tTimeout; rsRetVal iRetLocal; DEFiRet; @@ -1178,7 +1177,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ - BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut); /* some workers may be running in parallel! */ + d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */ if(getPhysicalQueueSize(pThis) > 0) { if(pThis->bRunsDA) { /* We may have waited on the low water mark. As it may have changed, we @@ -1187,7 +1186,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) wtpAdviseMaxWorkers(pThis->pWtpDA, 1); } } - END_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut); + d_pthread_mutex_unlock(pThis->mut); /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ if(pThis->bRunsDA) { @@ -1259,7 +1258,6 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) static rsRetVal tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) { - DEFVARS_mutexProtection; struct timespec tTimeout; rsRetVal iRetLocal; DEFiRet; @@ -1280,9 +1278,9 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ timeoutComp(&tTimeout, pThis->toActShutdown); - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */ if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + d_pthread_mutex_unlock(pThis->mut); dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n"); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { @@ -1293,12 +1291,12 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } /* we need to re-aquire the mutex for the next check in this case! */ - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); + d_pthread_mutex_lock(pThis->mut); } if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pqDA->pWtpReg, LOCK_MUTEX) > 0) { /* and now the same for the DA queue */ - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + d_pthread_mutex_unlock(pThis->mut); dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n"); iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { @@ -1308,8 +1306,9 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue " "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } + } else { + d_pthread_mutex_unlock(pThis->mut); } - END_MTX_PROTECTED_OPERATIONS(pThis->mut); RETiRet; } @@ -1688,7 +1687,7 @@ finalize_it: * rgerhards, 2009-04-22 */ static rsRetVal -DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +DequeueConsumable(qqueue_t *pThis, wti_t *pWti) { DEFiRet; int iQueueSize = 0; /* keep the compiler happy... */ @@ -1714,7 +1713,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) // TODO: MULTI: check physical queue size? pthread_cond_signal(&pThis->notFull); d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { @@ -1827,14 +1825,14 @@ RateLimiter(qqueue_t *pThis) * rgerhards, 2009-05-20 */ static inline rsRetVal -DequeueForConsumer(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) ABORT_FINALIZE(RS_RET_IDLE); @@ -1869,14 +1867,14 @@ dbgprintf("XXX: batchProcessed deletes %d records\n", pWti->batch.nElemDeq); * rgerhards, 2008-01-21 */ static rsRetVal -ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +ConsumerReg(qqueue_t *pThis, wti_t *pWti) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave)); + CHKiRet(DequeueForConsumer(pThis, pWti)); CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); /* we now need to check if we should deliberately delay processing a bit @@ -1905,7 +1903,7 @@ dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet * rgerhards, 2008-01-14 */ static rsRetVal -ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +ConsumerDA(qqueue_t *pThis, wti_t *pWti) { int i; DEFiRet; @@ -1913,7 +1911,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave)); + CHKiRet(DequeueForConsumer(pThis, pWti)); /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs @@ -2134,7 +2132,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg)); CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); -- cgit