summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-04-22 16:39:58 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-04-22 16:39:58 +0200
commite4b3f6d287d74b34d27b4e296c33cb3f1294a58c (patch)
treedd75cbbc5f857ccdb6d835fb2bf850ed1527b6ff /runtime/queue.c
parent7667845bd72b6f92eabc975318a4f288a77f2630 (diff)
downloadrsyslog-e4b3f6d287d74b34d27b4e296c33cb3f1294a58c.tar.gz
rsyslog-e4b3f6d287d74b34d27b4e296c33cb3f1294a58c.tar.xz
rsyslog-e4b3f6d287d74b34d27b4e296c33cb3f1294a58c.zip
now batches are handed down to the actual consumer
... but the action consumer does not do anything really intelligent with them. But the DA consumer is already done, as is the main message queue consumer.
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c30
1 files changed, 17 insertions, 13 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index c48eb724..ea8567ea 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -899,6 +899,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
+ aUsrp_t aUsrp;
+ obj_t *pMsgp;
DEFiRet;
ASSERT(pThis != NULL);
@@ -908,8 +910,13 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
* mode the consumer probably has a lot to convey (which get's lost in the other modes
* because they are asynchronous. But direct mode is deliberately synchronous.
* rgerhards, 2008-02-12
+ * We use our knowledge about the aUsrp_t structure below, but without that, we
+ * pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
- iRet = pThis->pConsumer(pThis->pUsr, pUsr);
+ pMsgp = (obj_t*) pUsr;
+ aUsrp.nElem = 1; /* there always is only one in direct mode */
+ aUsrp.pUsrp = &pMsgp;
+ iRet = pThis->pConsumer(pThis->pUsr, &aUsrp);
RETiRet;
}
@@ -959,7 +966,7 @@ UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
* rgerhards, 2008-01-29
*/
static rsRetVal
-qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
+GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
@@ -1015,7 +1022,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
if(pThis->iUngottenObjs > 0) {
- iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
+ iRet = GetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
ATOMIC_DEC(pThis->iQueueSize);
@@ -1243,7 +1250,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1578,14 +1585,12 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
-// MULTIQUEUE: here we need to iterate through array! - or better pass it as whole? ... probably
- int i;
- for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
- CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp->pUsrp[i]));
+ CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
*/
+//TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that...
if(pThis->iDeqSlowdown) {
dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n",
pThis->iDeqSlowdown);
@@ -1597,7 +1602,7 @@ finalize_it:
}
-/* This is a special consumer to feed the disk-queue in disk-assited mode.
+/* This is a special consumer to feed the disk-queue in disk-assisted mode.
* When active, our own queue more or less acts as a memory buffer to the disk.
* So this consumer just needs to drain the memory queue and submit entries
* to the disk queue. The disk queue will then call the actual consumer from
@@ -1609,18 +1614,17 @@ finalize_it:
static rsRetVal
ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
-// MULTIQUEUE:
- int i;
+ /* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i]));
-
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
RETiRet;
@@ -1944,7 +1948,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
* to the regular files. -- rgerhards, 2008-01-29
*/
while(pThis->iUngottenObjs > 0) {
- CHKiRet(qqueueGetUngottenObj(pThis, &pUsr));
+ CHKiRet(GetUngottenObj(pThis, &pUsr));
CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
objDestruct(pUsr);
}