summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-04-23 12:50:07 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-04-23 12:50:07 +0200
commit5c0aeae8ab1f344a022d586dc26c5d78203f7e0b (patch)
treeb81b80018149907c1cbb1a81d2e04db067deccd3 /runtime/queue.c
parent6feb86688546aff2d957e27d8516143b256371c3 (diff)
downloadrsyslog-5c0aeae8ab1f344a022d586dc26c5d78203f7e0b.tar.gz
rsyslog-5c0aeae8ab1f344a022d586dc26c5d78203f7e0b.tar.xz
rsyslog-5c0aeae8ab1f344a022d586dc26c5d78203f7e0b.zip
added $MainMsgQueueDequeueBatchSize and $ActionQueueDequeueBatchSize configuration directives
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c22
1 files changed, 10 insertions, 12 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index c5f9df81..f3d3fe71 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -69,7 +69,7 @@ rsRetVal qqueueChkPersist(qqueue_t *pThis);
static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
-static rsRetVal GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal);
+static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static int qqueueIsIdleDA(qqueue_t *pThis);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2);
@@ -375,7 +375,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
// MULTIQUEUE: TODO: this should be DA-specific!
- CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce));
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup));
@@ -1280,7 +1280,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->pConsumer = pConsumer;
pThis->iNumWorkerThreads = iWorkerThreads;
pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
- pThis->iDeqMaxAtOnce = 8; /* conservative default, should still provide good performance */
+ pThis->iDeqBatchSize = 8; /* conservative default, should still provide good performance */
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
@@ -1421,7 +1421,7 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
/* all well, use this element */
pWti->paUsrp->pUsrp[nDequeued++] = pUsr;
- } while(iQueueSize > 0 && nDequeued < pThis->iDeqMaxAtOnce);
+ } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize);
//bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
pWti->paUsrp->nElem = nDequeued;
@@ -1691,14 +1691,11 @@ ChkStooWrkrReg(qqueue_t *pThis)
* rgerhards, 2009-04-22
*/
static rsRetVal
-GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal)
+GetDeqBatchSize(qqueue_t *pThis, int *pVal)
{
DEFiRet;
assert(pVal != NULL);
-RUNLOG_VAR("%d", pThis->iDeqMaxAtOnce); // MULTIQUEUE: delete this when done
-
- *pVal = pThis->iDeqMaxAtOnce;
-
+ *pVal = pThis->iDeqBatchSize;
RETiRet;
}
@@ -1819,10 +1816,10 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, "
- "full delay %d, light delay %d starting\n",
+ "full delay %d, light delay %d, deq batch size %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1,
- pThis->iFullDlyMrk, pThis->iLightDlyMrk);
+ pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
@@ -1835,7 +1832,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg));
- CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce));
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup));
@@ -2301,6 +2298,7 @@ DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
DEFpropSetMeth(qqueue, pUsr, void*)
DEFpropSetMeth(qqueue, iDeqSlowdown, int)
+DEFpropSetMeth(qqueue, iDeqBatchSize, int)
DEFpropSetMeth(qqueue, sizeOnDiskMax, int64)