diff options
-rw-r--r-- | action.c | 30 | ||||
-rw-r--r-- | runtime/queue.c | 30 | ||||
-rw-r--r-- | runtime/queue.h | 9 | ||||
-rw-r--r-- | tools/syslogd.c | 26 |
4 files changed, 66 insertions, 29 deletions
@@ -42,10 +42,11 @@ #include "cfsysline.h" #include "srUtils.h" #include "errmsg.h" +#include "wti.h" #include "datetime.h" /* forward definitions */ -rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg); +rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t*); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -255,7 +256,8 @@ actionConstructFinalize(action_t *pThis) * to be run on multiple threads. So far, this is forbidden by the interface * spec. -- rgerhards, 2008-01-30 */ - CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); + CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, + (rsRetVal (*)(void*,aUsrp_t*))actionCallDoActionMULTIQUEUE)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -415,6 +417,7 @@ rsRetVal actionDbgPrint(action_t *pThis) } +//MULTIQUEUE: think about these two functions below /* call the DoAction output plugin entry point * rgerhards, 2008-01-28 */ @@ -527,6 +530,29 @@ finalize_it: #pragma GCC diagnostic warning "-Wempty-body" +/* receive an array of to-process user pointers and submit them + * for processing. + * rgerhards, 2009-04-22 + */ +rsRetVal +actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp) +{ + int i; + msg_t *pMsg; + DEFiRet; + + assert(paUsrp != NULL); + + for(i = 0 ; i < paUsrp->nElem ; i++) { + pMsg = (msg_t*) paUsrp->pUsrp[i]; +dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); + CHKiRet(actionCallDoAction(pAction, pMsg)); + } +finalize_it: + RETiRet; +} + + /* call the HUP handler for a given action, if such a handler is defined. The * action mutex is locked, because the HUP handler most probably needs to modify * some internal state information. diff --git a/runtime/queue.c b/runtime/queue.c index c48eb724..ea8567ea 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -899,6 +899,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) { + aUsrp_t aUsrp; + obj_t *pMsgp; DEFiRet; ASSERT(pThis != NULL); @@ -908,8 +910,13 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) * mode the consumer probably has a lot to convey (which get's lost in the other modes * because they are asynchronous. But direct mode is deliberately synchronous. * rgerhards, 2008-02-12 + * We use our knowledge about the aUsrp_t structure below, but without that, we + * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - iRet = pThis->pConsumer(pThis->pUsr, pUsr); + pMsgp = (obj_t*) pUsr; + aUsrp.nElem = 1; /* there always is only one in direct mode */ + aUsrp.pUsrp = &pMsgp; + iRet = pThis->pConsumer(pThis->pUsr, &aUsrp); RETiRet; } @@ -959,7 +966,7 @@ UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) * rgerhards, 2008-01-29 */ static rsRetVal -qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) +GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) { DEFiRet; @@ -1015,7 +1022,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr) * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ if(pThis->iUngottenObjs > 0) { - iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr); + iRet = GetUngottenObj(pThis, (obj_t**) pUsr); } else { iRet = pThis->qDel(pThis, pUsr); ATOMIC_DEC(pThis->iQueueSize); @@ -1243,7 +1250,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)) { DEFiRet; qqueue_t *pThis; @@ -1578,14 +1585,12 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); -// MULTIQUEUE: here we need to iterate through array! - or better pass it as whole? ... probably - int i; - for(i = 0 ; i < pWti->paUsrp->nElem ; i++) - CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp->pUsrp[i])); + CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 */ +//TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that... if(pThis->iDeqSlowdown) { dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n", pThis->iDeqSlowdown); @@ -1597,7 +1602,7 @@ finalize_it: } -/* This is a special consumer to feed the disk-queue in disk-assited mode. +/* This is a special consumer to feed the disk-queue in disk-assisted mode. * When active, our own queue more or less acts as a memory buffer to the disk. * So this consumer just needs to drain the memory queue and submit entries * to the disk queue. The disk queue will then call the actual consumer from @@ -1609,18 +1614,17 @@ finalize_it: static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { + int i; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); -// MULTIQUEUE: - int i; + /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->paUsrp->nElem ; i++) CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i])); - finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); RETiRet; @@ -1944,7 +1948,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) * to the regular files. -- rgerhards, 2008-01-29 */ while(pThis->iUngottenObjs > 0) { - CHKiRet(qqueueGetUngottenObj(pThis, &pUsr)); + CHKiRet(GetUngottenObj(pThis, &pUsr)); CHKiRet((objSerialize(pUsr))(pUsr, psQIF)); objDestruct(pUsr); } diff --git a/runtime/queue.h b/runtime/queue.h index 7ecb9294..4fb57d07 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -100,11 +100,10 @@ typedef struct queue_s { * the user really wanted...). -- rgerhards, 2008-04-02 */ /* end dequeue time window */ - rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */ + rsRetVal (*pConsumer)(void *,aUsrp_t*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the - * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer - * to message) - * rgerhards, 2008-01-28 + * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 + * is pointer to an array of message message pointers) */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); @@ -185,7 +184,7 @@ rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int); diff --git a/tools/syslogd.c b/tools/syslogd.c index 8c86c12e..f48fa759 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -129,6 +129,7 @@ #include "omfile.h" #include "omdiscard.h" #include "threads.h" +#include "wti.h" #include "queue.h" #include "stream.h" #include "conf.h" @@ -1202,22 +1203,29 @@ processMsg(msg_t *pMsg) /* The consumer of dequeued messages. This function is called by the * queue engine on dequeueing of a message. It runs on a SEPARATE - * THREAD. - * Please note: the message object is destructed by the queue itself! + * THREAD. It receives an array of pointers, which it must iterate + * over. We do not do any further batching, as this is of no benefit + * for the main queue. */ static rsRetVal -msgConsumer(void __attribute__((unused)) *notNeeded, void *pUsr) +msgConsumer(void __attribute__((unused)) *notNeeded, aUsrp_t *paUsrp) { + int i; + msg_t *pMsg; DEFiRet; - msg_t *pMsg = (msg_t*) pUsr; - assert(pMsg != NULL); + assert(paUsrp != NULL); - if((pMsg->msgFlags & NEEDS_PARSING) != 0) { - parseMsg(pMsg); + for(i = 0 ; i < paUsrp->nElem ; i++) { + pMsg = (msg_t*) paUsrp->pUsrp[i]; +dbgprintf("msgConsumer..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); + if((pMsg->msgFlags & NEEDS_PARSING) != 0) { + parseMsg(pMsg); + } + processMsg(pMsg); + msgDestruct(&pMsg); } - processMsg(pMsg); - msgDestruct(&pMsg); +dbgprintf("DONE msgConsumer..MULTIQUEUE:\n"); RETiRet; } |