summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-04-22 15:06:45 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-04-22 15:06:45 +0200
commit7667845bd72b6f92eabc975318a4f288a77f2630 (patch)
tree54b839ee7b3ae5347145bf4c721600ce5ca411b5 /runtime/queue.c
parentb0cfe39c851de0851d88e1b6a51bf40a76fb8304 (diff)
downloadrsyslog-7667845bd72b6f92eabc975318a4f288a77f2630.tar.gz
rsyslog-7667845bd72b6f92eabc975318a4f288a77f2630.tar.xz
rsyslog-7667845bd72b6f92eabc975318a4f288a77f2630.zip
first attempt at dequeueing multiple batches inside the queue
... but this code has serious problems when terminating the queue, also it is far from being optimal. I will commit a series of patches (hopefully) as I am on the path to the final implementation.
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c182
1 files changed, 111 insertions, 71 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index e5db866c..c48eb724 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -8,7 +8,11 @@
* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
* if you are getting aquainted to the object.
*
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ * NOTE: as of 2009-04-22, I have begin to remove the qqueue* prefix from static
+ * function names - this makes it really hard to read and does not provide much
+ * benefit, at least I (now) think so...
+ *
+ * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -63,12 +67,13 @@ DEFobjCurrIf(glbl)
/* forward-definitions */
rsRetVal qqueueChkPersist(qqueue_t *pThis);
static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
-static rsRetVal qqueueRateLimiter(qqueue_t *pThis);
+static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
+static rsRetVal GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal);
static int qqueueIsIdleDA(qqueue_t *pThis);
-static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
-static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2);
-static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
+static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
+static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2);
+static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -369,9 +374,11 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
+ // MULTIQUEUE: TODO: this should be DA-specific!
+ CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
@@ -721,7 +728,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
while(iUngottenObjs > 0) {
/* fill the queue from disk */
CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL));
- qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
+ UngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
--iUngottenObjs; /* one less */
}
@@ -922,7 +929,7 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute_
* rgerhards, 2008-01-20
*/
static rsRetVal
-qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
+UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
@@ -1266,6 +1273,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->pConsumer = pConsumer;
pThis->iNumWorkerThreads = iWorkerThreads;
pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
+ pThis->iDeqMaxAtOnce = 8; /* conservative default, should still provide good performance */
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
@@ -1315,7 +1323,7 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static rsRetVal
-qqueueConsumerCancelCleanup(void *arg1, void *arg2)
+ConsumerCancelCleanup(void *arg1, void *arg2)
{
DEFiRet;
@@ -1327,7 +1335,7 @@ qqueueConsumerCancelCleanup(void *arg1, void *arg2)
if(pUsr != NULL) {
/* make sure the data element is not lost */
dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
- CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX));
+ CHKiRet(UngetObj(pThis, pUsr, LOCK_MUTEX));
}
finalize_it:
@@ -1376,38 +1384,68 @@ finalize_it:
}
+/* dequeue as many user points as are available, until we hit the configured
+ * upper limit of pointers.
+ * This must only be called when the queue mutex is LOOKED, otherwise serious
+ * malfunction will happen.
+ */
+static inline rsRetVal
+DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize)
+{
+ int nDequeued;
+ int iQueueSize;
+ void *pUsr;
+ rsRetVal localRet;
+ DEFiRet;
+
+ nDequeued = 0;
+ do {
+dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
+ CHKiRet(qqueueDel(pThis, &pUsr));
+ qqueueChkPersist(pThis); /* is is questionable if we should really need to call this every time... */
+ iQueueSize = qqueueGetOverallQueueSize(pThis);
+
+ /* check if we should discard this element */
+ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr);
+ if(localRet == RS_RET_QUEUE_FULL)
+ continue;
+ else if(localRet != RS_RET_OK)
+ ABORT_FINALIZE(localRet);
+
+ /* all well, use this element */
+ pWti->paUsrp->pUsrp[nDequeued++] = pUsr;
+ } while(iQueueSize > 0 && nDequeued < pThis->iDeqMaxAtOnce);
+
+ //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
+ pWti->paUsrp->nElem = nDequeued;
+ *iRemainingQueueSize = iQueueSize;
+
+finalize_it:
+ RETiRet;
+}
+
+
/* dequeue the queued object for the queue consumers.
* rgerhards, 2008-10-21
+ * I made a radical change - we now dequeue multiple elements, and store these objects in
+ * an array of user pointers. We expect that this increases performance.
+ * rgerhards, 2009-04-22
*/
static rsRetVal
-qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
- void *pUsr;
- int iQueueSize;
- int bRunsDA; /* cache for early mutex release */
+ int iQueueSize = 0; /* keep the compiler happy... */
- /* dequeue element (still protected from mutex) */
- iRet = qqueueDel(pThis, &pUsr);
- qqueueChkPersist(pThis);
- iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */
- bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
-
- /* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY
- * if we could successfully obtain a user pointer. Otherwise, we would bring the
- * cancel cleanup handler into big troubles (and we did ;)). Note that we can
- * NOT set the variable further below, as this may lead to an object leak. We
- * may get cancelled before we reach that part of the code, so the only
- * solution is to do it here. -- rgerhards, 2008-02-27
- */
- if(iRet == RS_RET_OK) {
- pWti->pUsrp = pUsr;
- }
+ /* dequeue element batch (still protected from mutex) */
+ iRet = DequeueConsumableElements(pThis, pWti, &iQueueSize);
/* awake some flow-controlled sources if we can do this right now */
/* TODO: this could be done better from a performance point of view -- do it only if
* we have someone waiting for the condition (or only when we hit the watermark right
* on the nail [exact value]) -- rgerhards, 2008-03-14
+ * now that we dequeue batches of pointers, this is much less an issue...
+ * rgerhards, 2009-04-22
*/
if(iQueueSize < pThis->iFullDlyMrk) {
pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk);
@@ -1417,37 +1455,16 @@ qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
- /* rgerhards, 2008-09-30: I reversed the order of cond_signal und mutex_unlock
- * as of the pthreads recommendation on predictable scheduling behaviour. I don't see
- * any problems caused by this, but I add this comment in case some will be seen
- * in the next time.
- */
pthread_cond_signal(&pThis->notFull);
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
- /* do actual processing (the lengthy part, runs in parallel)
- * If we had a problem while dequeing, we do not call the consumer,
- * but we otherwise ignore it. This is in the hopes that it will be
- * self-healing. However, this is really not a good thing.
- * rgerhards, 2008-01-03
- */
- if(iRet != RS_RET_OK)
- FINALIZE;
-
- /* we are running in normal, non-disk-assisted mode do a quick check if we need to drain the queue.
- * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
- * provide real-time creation of spool files.
- * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong.
- */
- CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
-
-finalize_it:
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
dbgoprint((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things "
"may happen\n", iRet);
}
+
RETiRet;
}
@@ -1490,7 +1507,7 @@ finalize_it:
* but you get the idea from the code above.
*/
static rsRetVal
-qqueueRateLimiter(qqueue_t *pThis)
+RateLimiter(qqueue_t *pThis)
{
DEFiRet;
int iDelay;
@@ -1553,15 +1570,18 @@ qqueueRateLimiter(qqueue_t *pThis)
* rgerhards, 2008-01-21
*/
static rsRetVal
-qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
+ CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+// MULTIQUEUE: here we need to iterate through array! - or better pass it as whole? ... probably
+ int i;
+ for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
+ CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp->pUsrp[i]));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -1587,15 +1607,19 @@ finalize_it:
* rgerhards, 2008-01-14
*/
static rsRetVal
-qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
+ CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+// MULTIQUEUE:
+ int i;
+ for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i]));
+
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
@@ -1653,12 +1677,27 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
* the DA queue
*/
static int
-qqueueChkStopWrkrReg(qqueue_t *pThis)
+ChkStooWrkrReg(qqueue_t *pThis)
{
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0);
}
+/* return the configured "deq max at once" interval
+ * rgerhards, 2009-04-22
+ */
+static rsRetVal
+GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal)
+{
+ DEFiRet;
+ assert(pVal != NULL);
+RUNLOG_VAR("%d", pThis->iDeqMaxAtOnce); // MULTIQUEUE: delete this when done
+
+ *pVal = pThis->iDeqMaxAtOnce;
+
+ RETiRet;
+}
+
/* must only be called when the queue mutex is locked, else results
* are not stable! DA queue version
*/
@@ -1673,7 +1712,7 @@ qqueueIsIdleDA(qqueue_t *pThis)
* are not stable! Regular queue version
*/
static int
-qqueueIsIdleReg(qqueue_t *pThis)
+IsIdleReg(qqueue_t *pThis)
{
#if 0 /* enable for performance testing */
int ret;
@@ -1701,7 +1740,7 @@ qqueueIsIdleReg(qqueue_t *pThis)
* I am telling this, because I, too, always get confused by those...
*/
static rsRetVal
-qqueueRegOnWrkrShutdown(qqueue_t *pThis)
+RegOnWrkrShutdown(qqueue_t *pThis)
{
DEFiRet;
@@ -1722,7 +1761,7 @@ qqueueRegOnWrkrShutdown(qqueue_t *pThis)
* hook to indicate in the parent queue (if we are a child) that we are not done yet.
*/
static rsRetVal
-qqueueRegOnWrkrStartup(qqueue_t *pThis)
+RegOnWrkrStartup(qqueue_t *pThis)
{
DEFiRet;
@@ -1788,13 +1827,14 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
- CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown));
+ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg));
+ CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));