diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-04-22 16:39:58 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-04-22 16:39:58 +0200 |
commit | e4b3f6d287d74b34d27b4e296c33cb3f1294a58c (patch) | |
tree | dd75cbbc5f857ccdb6d835fb2bf850ed1527b6ff /runtime/queue.c | |
parent | 7667845bd72b6f92eabc975318a4f288a77f2630 (diff) | |
download | rsyslog-e4b3f6d287d74b34d27b4e296c33cb3f1294a58c.tar.gz rsyslog-e4b3f6d287d74b34d27b4e296c33cb3f1294a58c.tar.xz rsyslog-e4b3f6d287d74b34d27b4e296c33cb3f1294a58c.zip |
now batches are handed down to the actual consumer
... but the action consumer does not do anything really intelligent
with them. But the DA consumer is already done, as is the
main message queue consumer.
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 30 |
1 files changed, 17 insertions, 13 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index c48eb724..ea8567ea 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -899,6 +899,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) { + aUsrp_t aUsrp; + obj_t *pMsgp; DEFiRet; ASSERT(pThis != NULL); @@ -908,8 +910,13 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) * mode the consumer probably has a lot to convey (which get's lost in the other modes * because they are asynchronous. But direct mode is deliberately synchronous. * rgerhards, 2008-02-12 + * We use our knowledge about the aUsrp_t structure below, but without that, we + * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - iRet = pThis->pConsumer(pThis->pUsr, pUsr); + pMsgp = (obj_t*) pUsr; + aUsrp.nElem = 1; /* there always is only one in direct mode */ + aUsrp.pUsrp = &pMsgp; + iRet = pThis->pConsumer(pThis->pUsr, &aUsrp); RETiRet; } @@ -959,7 +966,7 @@ UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) * rgerhards, 2008-01-29 */ static rsRetVal -qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) +GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) { DEFiRet; @@ -1015,7 +1022,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr) * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ if(pThis->iUngottenObjs > 0) { - iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr); + iRet = GetUngottenObj(pThis, (obj_t**) pUsr); } else { iRet = pThis->qDel(pThis, pUsr); ATOMIC_DEC(pThis->iQueueSize); @@ -1243,7 +1250,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)) { DEFiRet; qqueue_t *pThis; @@ -1578,14 +1585,12 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); -// MULTIQUEUE: here we need to iterate through array! - or better pass it as whole? ... probably - int i; - for(i = 0 ; i < pWti->paUsrp->nElem ; i++) - CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp->pUsrp[i])); + CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 */ +//TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that... if(pThis->iDeqSlowdown) { dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n", pThis->iDeqSlowdown); @@ -1597,7 +1602,7 @@ finalize_it: } -/* This is a special consumer to feed the disk-queue in disk-assited mode. +/* This is a special consumer to feed the disk-queue in disk-assisted mode. * When active, our own queue more or less acts as a memory buffer to the disk. * So this consumer just needs to drain the memory queue and submit entries * to the disk queue. The disk queue will then call the actual consumer from @@ -1609,18 +1614,17 @@ finalize_it: static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { + int i; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); -// MULTIQUEUE: - int i; + /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->paUsrp->nElem ; i++) CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i])); - finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); RETiRet; @@ -1944,7 +1948,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) * to the regular files. -- rgerhards, 2008-01-29 */ while(pThis->iUngottenObjs > 0) { - CHKiRet(qqueueGetUngottenObj(pThis, &pUsr)); + CHKiRet(GetUngottenObj(pThis, &pUsr)); CHKiRet((objSerialize(pUsr))(pUsr, psQIF)); objDestruct(pUsr); } |