diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-04-23 12:50:07 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-04-23 12:50:07 +0200 |
commit | 5c0aeae8ab1f344a022d586dc26c5d78203f7e0b (patch) | |
tree | b81b80018149907c1cbb1a81d2e04db067deccd3 /runtime/queue.c | |
parent | 6feb86688546aff2d957e27d8516143b256371c3 (diff) | |
download | rsyslog-5c0aeae8ab1f344a022d586dc26c5d78203f7e0b.tar.gz rsyslog-5c0aeae8ab1f344a022d586dc26c5d78203f7e0b.tar.xz rsyslog-5c0aeae8ab1f344a022d586dc26c5d78203f7e0b.zip |
added $MainMsgQueueDequeueBatchSize and $ActionQueueDequeueBatchSize configuration directives
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 22 |
1 files changed, 10 insertions, 12 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index c5f9df81..f3d3fe71 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -69,7 +69,7 @@ rsRetVal qqueueChkPersist(qqueue_t *pThis); static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); -static rsRetVal GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal); +static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2); @@ -375,7 +375,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); // MULTIQUEUE: TODO: this should be DA-specific! - CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup)); @@ -1280,7 +1280,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->pConsumer = pConsumer; pThis->iNumWorkerThreads = iWorkerThreads; pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ - pThis->iDeqMaxAtOnce = 8; /* conservative default, should still provide good performance */ + pThis->iDeqBatchSize = 8; /* conservative default, should still provide good performance */ pThis->pszFilePrefix = NULL; pThis->qType = qType; @@ -1421,7 +1421,7 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); /* all well, use this element */ pWti->paUsrp->pUsrp[nDequeued++] = pUsr; - } while(iQueueSize > 0 && nDequeued < pThis->iDeqMaxAtOnce); + } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize); //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ pWti->paUsrp->nElem = nDequeued; @@ -1691,14 +1691,11 @@ ChkStooWrkrReg(qqueue_t *pThis) * rgerhards, 2009-04-22 */ static rsRetVal -GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal) +GetDeqBatchSize(qqueue_t *pThis, int *pVal) { DEFiRet; assert(pVal != NULL); -RUNLOG_VAR("%d", pThis->iDeqMaxAtOnce); // MULTIQUEUE: delete this when done - - *pVal = pThis->iDeqMaxAtOnce; - + *pVal = pThis->iDeqBatchSize; RETiRet; } @@ -1819,10 +1816,10 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, " - "full delay %d, light delay %d starting\n", + "full delay %d, light delay %d, deq batch size %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1, - pThis->iFullDlyMrk, pThis->iLightDlyMrk); + pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ @@ -1835,7 +1832,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg)); - CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup)); @@ -2301,6 +2298,7 @@ DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) DEFpropSetMeth(qqueue, bSaveOnShutdown, int) DEFpropSetMeth(qqueue, pUsr, void*) DEFpropSetMeth(qqueue, iDeqSlowdown, int) +DEFpropSetMeth(qqueue, iDeqBatchSize, int) DEFpropSetMeth(qqueue, sizeOnDiskMax, int64) |