summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-20 15:12:49 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-20 15:12:49 +0200
commit9f45b80ea9ea86d516c895d97fd8670df37e319e (patch)
tree83a6ca3416c783c16801dfdc3651d52a3af6f4d1 /runtime/queue.c
parentad7ccabe5ec616a4bf9fda1472d8041eaf1bf815 (diff)
downloadrsyslog-9f45b80ea9ea86d516c895d97fd8670df37e319e.tar.gz
rsyslog-9f45b80ea9ea86d516c895d97fd8670df37e319e.tar.xz
rsyslog-9f45b80ea9ea86d516c895d97fd8670df37e319e.zip
free last processed message in all cases
so far, the last processed message was only freed when the next one was processed. This has been changed now. More precisely, a better algorithm has been selected for the queue worker process, which also involves less overhead than the previous one. The fix for "free last processed message" as then more or less a side-effect (easy to do) of the new algorithm.
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c51
1 files changed, 41 insertions, 10 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 3118bb19..0019297b 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -469,7 +469,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
@@ -1257,7 +1257,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
/* we need to re-aquire the mutex for the next check in this case! */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
}
- if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
+ if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
/* and now the same for the DA queue */
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA workers\n");
@@ -1548,18 +1548,19 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
{
int nDequeued;
int nDiscarded;
+ int nDeleted;
int iQueueSize;
void *pUsr;
rsRetVal localRet;
DEFiRet;
+ nDeleted = pWti->batch.nElemDeq;
DeleteProcessedBatch(pThis, &pWti->batch);
nDequeued = nDiscarded = 0;
- do {
+ while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
CHKiRet(qqueueDeq(pThis, &pUsr));
- iQueueSize = getLogicalQueueSize(pThis);
/* check if we should discard this element */
localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr);
@@ -1574,9 +1575,10 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
pWti->batch.pElem[nDequeued].pUsrp = pUsr;
pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
++nDequeued;
- } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize);
+ }
- qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */
+ /* it is sufficient to persist only when the bulk of work is done */
+ qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted);
pWti->batch.nElem = nDequeued;
pWti->batch.nElemDeq = nDequeued + nDiscarded;
@@ -1618,6 +1620,7 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
+ // TODO: MULTI: check physical queue size!
pthread_cond_signal(&pThis->notFull);
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -1727,6 +1730,27 @@ RateLimiter(qqueue_t *pThis)
}
+/* This dequeues the next batch and checks if the queue is empty. If it is
+ * empty, return RS_RET_IDLE. That will trigger termination of the function
+ * and tell the upper layer caller to initiate idle processing.
+ * rgerhards, 2009-05-20
+ */
+static inline rsRetVal
+DequeueForConsumer(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ ISOBJ_TYPE_assert(pWti, wti);
+
+ CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+
+ if(pWti->batch.nElem == 0)
+ ABORT_FINALIZE(RS_RET_IDLE);
+
+finalize_it:
+ RETiRet;
+}
/* This is the queue consumer in the regular (non-DA) case. It is
* protected by the queue mutex, but MUST release it as soon as possible.
@@ -1740,7 +1764,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
/* we now need to check if we should deliberately delay processing a bit
@@ -1754,6 +1778,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
}
finalize_it:
+dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
}
@@ -1776,7 +1801,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem ; i++)
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp));
@@ -1855,6 +1880,8 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal)
RETiRet;
}
+
+/* common function for the idle functions that deletes the last batch if TODO MULTI */
/* must only be called when the queue mutex is locked, else results
* are not stable! DA queue version
*/
@@ -1990,7 +2017,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
@@ -2133,7 +2160,10 @@ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates)
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
- assert(nUpdates > 0);
+ assert(nUpdates >= 0);
+
+ if(nUpdates == 0)
+ FINALIZE;
pThis->iUpdsSincePersist += nUpdates;
if(pThis->iPersistUpdCnt && pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
@@ -2141,6 +2171,7 @@ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates)
pThis->iUpdsSincePersist = 0;
}
+finalize_it:
RETiRet;
}