summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-17 12:38:49 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-17 12:38:49 +0200
commit183b49015561890e148a50128c051a1cdd4491b9 (patch)
treeacdfddd9aab9ab51b590489a5216fd6fecd62d79 /runtime/queue.c
parent511fd780a25b59b42e93bb2c9ebc03a4991f5c16 (diff)
downloadrsyslog-183b49015561890e148a50128c051a1cdd4491b9.tar.gz
rsyslog-183b49015561890e148a50128c051a1cdd4491b9.tar.xz
rsyslog-183b49015561890e148a50128c051a1cdd4491b9.zip
more code simplification, should also bring some performance enhancement
reducing the number of thread cancellation state changes
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c38
1 files changed, 18 insertions, 20 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index e2641c5b..37ec3663 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -75,7 +75,7 @@ static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static int qqueueIsIdleDA(qqueue_t *pThis);
-static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
+static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
/* some constants for queuePersist () */
@@ -471,7 +471,7 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) StartDA));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode));
@@ -1170,7 +1170,6 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr)
static rsRetVal
tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
{
- DEFVARS_mutexProtection_uncond;
struct timespec tTimeout;
rsRetVal iRetLocal;
DEFiRet;
@@ -1178,7 +1177,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
- BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut); /* some workers may be running in parallel! */
+ d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */
if(getPhysicalQueueSize(pThis) > 0) {
if(pThis->bRunsDA) {
/* We may have waited on the low water mark. As it may have changed, we
@@ -1187,7 +1186,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
}
}
- END_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut);
+ d_pthread_mutex_unlock(pThis->mut);
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
if(pThis->bRunsDA) {
@@ -1259,7 +1258,6 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
static rsRetVal
tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
{
- DEFVARS_mutexProtection;
struct timespec tTimeout;
rsRetVal iRetLocal;
DEFiRet;
@@ -1280,9 +1278,9 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
timeoutComp(&tTimeout, pThis->toActShutdown);
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+ d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */
if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ d_pthread_mutex_unlock(pThis->mut);
dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n");
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
@@ -1293,12 +1291,12 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
"in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
/* we need to re-aquire the mutex for the next check in this case! */
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX);
+ d_pthread_mutex_lock(pThis->mut);
}
if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pqDA->pWtpReg, LOCK_MUTEX) > 0) {
/* and now the same for the DA queue */
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ d_pthread_mutex_unlock(pThis->mut);
dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n");
iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
@@ -1308,8 +1306,9 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue "
"in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
+ } else {
+ d_pthread_mutex_unlock(pThis->mut);
}
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
RETiRet;
}
@@ -1688,7 +1687,7 @@ finalize_it:
* rgerhards, 2009-04-22
*/
static rsRetVal
-DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
{
DEFiRet;
int iQueueSize = 0; /* keep the compiler happy... */
@@ -1714,7 +1713,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
// TODO: MULTI: check physical queue size?
pthread_cond_signal(&pThis->notFull);
d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
@@ -1827,14 +1825,14 @@ RateLimiter(qqueue_t *pThis)
* rgerhards, 2009-05-20
*/
static inline rsRetVal
-DequeueForConsumer(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+DequeueForConsumer(qqueue_t *pThis, wti_t *pWti)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(DequeueConsumable(pThis, pWti));
if(pWti->batch.nElem == 0)
ABORT_FINALIZE(RS_RET_IDLE);
@@ -1869,14 +1867,14 @@ dbgprintf("XXX: batchProcessed deletes %d records\n", pWti->batch.nElemDeq);
* rgerhards, 2008-01-21
*/
static rsRetVal
-ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerReg(qqueue_t *pThis, wti_t *pWti)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
+ CHKiRet(DequeueForConsumer(pThis, pWti));
CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
/* we now need to check if we should deliberately delay processing a bit
@@ -1905,7 +1903,7 @@ dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet
* rgerhards, 2008-01-14
*/
static rsRetVal
-ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerDA(qqueue_t *pThis, wti_t *pWti)
{
int i;
DEFiRet;
@@ -1913,7 +1911,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
+ CHKiRet(DequeueForConsumer(pThis, pWti));
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
@@ -2134,7 +2132,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));