summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c239
1 files changed, 121 insertions, 118 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 4e017e84..f3d3fe71 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -8,7 +8,11 @@
* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
* if you are getting aquainted to the object.
*
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ * NOTE: as of 2009-04-22, I have begin to remove the qqueue* prefix from static
+ * function names - this makes it really hard to read and does not provide much
+ * benefit, at least I (now) think so...
+ *
+ * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -63,12 +67,13 @@ DEFobjCurrIf(glbl)
/* forward-definitions */
rsRetVal qqueueChkPersist(qqueue_t *pThis);
static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
-static rsRetVal qqueueRateLimiter(qqueue_t *pThis);
+static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
+static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static int qqueueIsIdleDA(qqueue_t *pThis);
-static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
-static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2);
-static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
+static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
+static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2);
+static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -369,9 +374,11 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
+ // MULTIQUEUE: TODO: this should be DA-specific!
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
@@ -607,28 +614,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
{
DEFiRet;
-
iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
-#if 0
- qLinkedList_t *pEntry;
-
- ASSERT(pThis != NULL);
- if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
-
- pEntry->pNext = NULL;
- pEntry->pUsr = pUsr;
-
- if(pThis->tVars.linklist.pRoot == NULL) {
- pThis->tVars.linklist.pRoot = pThis->tVars.linklist.pLast = pEntry;
- } else {
- pThis->tVars.linklist.pLast->pNext = pEntry;
- pThis->tVars.linklist.pLast = pEntry;
- }
-
-finalize_it:
-#endif
RETiRet;
}
@@ -636,24 +622,6 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
-#if 0
- qLinkedList_t *pEntry;
-
- ASSERT(pThis != NULL);
- ASSERT(pThis->tVars.linklist.pRoot != NULL);
-
- pEntry = pThis->tVars.linklist.pRoot;
- *ppUsr = pEntry->pUsr;
-
- if(pThis->tVars.linklist.pRoot == pThis->tVars.linklist.pLast) {
- pThis->tVars.linklist.pRoot = NULL;
- pThis->tVars.linklist.pLast = NULL;
- } else {
- pThis->tVars.linklist.pRoot = pEntry->pNext;
- }
- free(pEntry);
-
-#endif
RETiRet;
}
@@ -760,7 +728,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
while(iUngottenObjs > 0) {
/* fill the queue from disk */
CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL));
- qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
+ UngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
--iUngottenObjs; /* one less */
}
@@ -931,6 +899,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
+ aUsrp_t aUsrp;
+ obj_t *pMsgp;
DEFiRet;
ASSERT(pThis != NULL);
@@ -940,8 +910,13 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
* mode the consumer probably has a lot to convey (which get's lost in the other modes
* because they are asynchronous. But direct mode is deliberately synchronous.
* rgerhards, 2008-02-12
+ * We use our knowledge about the aUsrp_t structure below, but without that, we
+ * pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
- iRet = pThis->pConsumer(pThis->pUsr, pUsr);
+ pMsgp = (obj_t*) pUsr;
+ aUsrp.nElem = 1; /* there always is only one in direct mode */
+ aUsrp.pUsrp = &pMsgp;
+ iRet = pThis->pConsumer(pThis->pUsr, &aUsrp);
RETiRet;
}
@@ -961,7 +936,7 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute_
* rgerhards, 2008-01-20
*/
static rsRetVal
-qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
+UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
@@ -991,7 +966,7 @@ qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
* rgerhards, 2008-01-29
*/
static rsRetVal
-qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
+GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
@@ -1047,7 +1022,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
if(pThis->iUngottenObjs > 0) {
- iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
+ iRet = GetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
ATOMIC_DEC(pThis->iQueueSize);
@@ -1275,7 +1250,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1305,6 +1280,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->pConsumer = pConsumer;
pThis->iNumWorkerThreads = iWorkerThreads;
pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
+ pThis->iDeqBatchSize = 8; /* conservative default, should still provide good performance */
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
@@ -1354,7 +1330,7 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static rsRetVal
-qqueueConsumerCancelCleanup(void *arg1, void *arg2)
+ConsumerCancelCleanup(void *arg1, void *arg2)
{
DEFiRet;
@@ -1366,7 +1342,7 @@ qqueueConsumerCancelCleanup(void *arg1, void *arg2)
if(pUsr != NULL) {
/* make sure the data element is not lost */
dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
- CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX));
+ CHKiRet(UngetObj(pThis, pUsr, LOCK_MUTEX));
}
finalize_it:
@@ -1415,38 +1391,68 @@ finalize_it:
}
+/* dequeue as many user points as are available, until we hit the configured
+ * upper limit of pointers.
+ * This must only be called when the queue mutex is LOOKED, otherwise serious
+ * malfunction will happen.
+ */
+static inline rsRetVal
+DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize)
+{
+ int nDequeued;
+ int iQueueSize;
+ void *pUsr;
+ rsRetVal localRet;
+ DEFiRet;
+
+ nDequeued = 0;
+ do {
+dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
+ CHKiRet(qqueueDel(pThis, &pUsr));
+ qqueueChkPersist(pThis); /* is is questionable if we should really need to call this every time... */
+ iQueueSize = qqueueGetOverallQueueSize(pThis);
+
+ /* check if we should discard this element */
+ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr);
+ if(localRet == RS_RET_QUEUE_FULL)
+ continue;
+ else if(localRet != RS_RET_OK)
+ ABORT_FINALIZE(localRet);
+
+ /* all well, use this element */
+ pWti->paUsrp->pUsrp[nDequeued++] = pUsr;
+ } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize);
+
+ //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
+ pWti->paUsrp->nElem = nDequeued;
+ *iRemainingQueueSize = iQueueSize;
+
+finalize_it:
+ RETiRet;
+}
+
+
/* dequeue the queued object for the queue consumers.
* rgerhards, 2008-10-21
+ * I made a radical change - we now dequeue multiple elements, and store these objects in
+ * an array of user pointers. We expect that this increases performance.
+ * rgerhards, 2009-04-22
*/
static rsRetVal
-qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
- void *pUsr;
- int iQueueSize;
- int bRunsDA; /* cache for early mutex release */
+ int iQueueSize = 0; /* keep the compiler happy... */
- /* dequeue element (still protected from mutex) */
- iRet = qqueueDel(pThis, &pUsr);
- qqueueChkPersist(pThis);
- iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */
- bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
-
- /* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY
- * if we could successfully obtain a user pointer. Otherwise, we would bring the
- * cancel cleanup handler into big troubles (and we did ;)). Note that we can
- * NOT set the variable further below, as this may lead to an object leak. We
- * may get cancelled before we reach that part of the code, so the only
- * solution is to do it here. -- rgerhards, 2008-02-27
- */
- if(iRet == RS_RET_OK) {
- pWti->pUsrp = pUsr;
- }
+ /* dequeue element batch (still protected from mutex) */
+ iRet = DequeueConsumableElements(pThis, pWti, &iQueueSize);
/* awake some flow-controlled sources if we can do this right now */
/* TODO: this could be done better from a performance point of view -- do it only if
* we have someone waiting for the condition (or only when we hit the watermark right
* on the nail [exact value]) -- rgerhards, 2008-03-14
+ * now that we dequeue batches of pointers, this is much less an issue...
+ * rgerhards, 2009-04-22
*/
if(iQueueSize < pThis->iFullDlyMrk) {
pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk);
@@ -1456,37 +1462,16 @@ qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
- /* rgerhards, 2008-09-30: I reversed the order of cond_signal und mutex_unlock
- * as of the pthreads recommendation on predictable scheduling behaviour. I don't see
- * any problems caused by this, but I add this comment in case some will be seen
- * in the next time.
- */
pthread_cond_signal(&pThis->notFull);
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
- /* do actual processing (the lengthy part, runs in parallel)
- * If we had a problem while dequeing, we do not call the consumer,
- * but we otherwise ignore it. This is in the hopes that it will be
- * self-healing. However, this is really not a good thing.
- * rgerhards, 2008-01-03
- */
- if(iRet != RS_RET_OK)
- FINALIZE;
-
- /* we are running in normal, non-disk-assisted mode do a quick check if we need to drain the queue.
- * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
- * provide real-time creation of spool files.
- * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong.
- */
- CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
-
-finalize_it:
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
dbgoprint((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things "
"may happen\n", iRet);
}
+
RETiRet;
}
@@ -1529,7 +1514,7 @@ finalize_it:
* but you get the idea from the code above.
*/
static rsRetVal
-qqueueRateLimiter(qqueue_t *pThis)
+RateLimiter(qqueue_t *pThis)
{
DEFiRet;
int iDelay;
@@ -1592,19 +1577,20 @@ qqueueRateLimiter(qqueue_t *pThis)
* rgerhards, 2008-01-21
*/
static rsRetVal
-qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
+ CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
*/
+//TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that...
if(pThis->iDeqSlowdown) {
dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n",
pThis->iDeqSlowdown);
@@ -1616,7 +1602,7 @@ finalize_it:
}
-/* This is a special consumer to feed the disk-queue in disk-assited mode.
+/* This is a special consumer to feed the disk-queue in disk-assisted mode.
* When active, our own queue more or less acts as a memory buffer to the disk.
* So this consumer just needs to drain the memory queue and submit entries
* to the disk queue. The disk queue will then call the actual consumer from
@@ -1626,15 +1612,18 @@ finalize_it:
* rgerhards, 2008-01-14
*/
static rsRetVal
-qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
+ CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ /* iterate over returned results and enqueue them in DA queue */
+ for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i]));
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
@@ -1692,12 +1681,24 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
* the DA queue
*/
static int
-qqueueChkStopWrkrReg(qqueue_t *pThis)
+ChkStooWrkrReg(qqueue_t *pThis)
{
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0);
}
+/* return the configured "deq max at once" interval
+ * rgerhards, 2009-04-22
+ */
+static rsRetVal
+GetDeqBatchSize(qqueue_t *pThis, int *pVal)
+{
+ DEFiRet;
+ assert(pVal != NULL);
+ *pVal = pThis->iDeqBatchSize;
+ RETiRet;
+}
+
/* must only be called when the queue mutex is locked, else results
* are not stable! DA queue version
*/
@@ -1712,7 +1713,7 @@ qqueueIsIdleDA(qqueue_t *pThis)
* are not stable! Regular queue version
*/
static int
-qqueueIsIdleReg(qqueue_t *pThis)
+IsIdleReg(qqueue_t *pThis)
{
#if 0 /* enable for performance testing */
int ret;
@@ -1740,7 +1741,7 @@ qqueueIsIdleReg(qqueue_t *pThis)
* I am telling this, because I, too, always get confused by those...
*/
static rsRetVal
-qqueueRegOnWrkrShutdown(qqueue_t *pThis)
+RegOnWrkrShutdown(qqueue_t *pThis)
{
DEFiRet;
@@ -1761,7 +1762,7 @@ qqueueRegOnWrkrShutdown(qqueue_t *pThis)
* hook to indicate in the parent queue (if we are a child) that we are not done yet.
*/
static rsRetVal
-qqueueRegOnWrkrStartup(qqueue_t *pThis)
+RegOnWrkrStartup(qqueue_t *pThis)
{
DEFiRet;
@@ -1815,10 +1816,10 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, "
- "full delay %d, light delay %d starting\n",
+ "full delay %d, light delay %d, deq batch size %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1,
- pThis->iFullDlyMrk, pThis->iLightDlyMrk);
+ pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
@@ -1829,13 +1830,14 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
- CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown));
+ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg));
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
@@ -1945,7 +1947,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
* to the regular files. -- rgerhards, 2008-01-29
*/
while(pThis->iUngottenObjs > 0) {
- CHKiRet(qqueueGetUngottenObj(pThis, &pUsr));
+ CHKiRet(GetUngottenObj(pThis, &pUsr));
CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
objDestruct(pUsr);
}
@@ -2296,6 +2298,7 @@ DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
DEFpropSetMeth(qqueue, pUsr, void*)
DEFpropSetMeth(qqueue, iDeqSlowdown, int)
+DEFpropSetMeth(qqueue, iDeqBatchSize, int)
DEFpropSetMeth(qqueue, sizeOnDiskMax, int64)