summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c30
-rw-r--r--runtime/queue.c30
-rw-r--r--runtime/queue.h9
-rw-r--r--tools/syslogd.c26
4 files changed, 66 insertions, 29 deletions
diff --git a/action.c b/action.c
index 03073153..8395642c 100644
--- a/action.c
+++ b/action.c
@@ -42,10 +42,11 @@
#include "cfsysline.h"
#include "srUtils.h"
#include "errmsg.h"
+#include "wti.h"
#include "datetime.h"
/* forward definitions */
-rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg);
+rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t*);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -255,7 +256,8 @@ actionConstructFinalize(action_t *pThis)
* to be run on multiple threads. So far, this is forbidden by the interface
* spec. -- rgerhards, 2008-01-30
*/
- CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
+ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize,
+ (rsRetVal (*)(void*,aUsrp_t*))actionCallDoActionMULTIQUEUE));
obj.SetName((obj_t*) pThis->pQueue, pszQName);
/* ... set some properties ... */
@@ -415,6 +417,7 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
+//MULTIQUEUE: think about these two functions below
/* call the DoAction output plugin entry point
* rgerhards, 2008-01-28
*/
@@ -527,6 +530,29 @@ finalize_it:
#pragma GCC diagnostic warning "-Wempty-body"
+/* receive an array of to-process user pointers and submit them
+ * for processing.
+ * rgerhards, 2009-04-22
+ */
+rsRetVal
+actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp)
+{
+ int i;
+ msg_t *pMsg;
+ DEFiRet;
+
+ assert(paUsrp != NULL);
+
+ for(i = 0 ; i < paUsrp->nElem ; i++) {
+ pMsg = (msg_t*) paUsrp->pUsrp[i];
+dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
+ CHKiRet(actionCallDoAction(pAction, pMsg));
+ }
+finalize_it:
+ RETiRet;
+}
+
+
/* call the HUP handler for a given action, if such a handler is defined. The
* action mutex is locked, because the HUP handler most probably needs to modify
* some internal state information.
diff --git a/runtime/queue.c b/runtime/queue.c
index c48eb724..ea8567ea 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -899,6 +899,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
+ aUsrp_t aUsrp;
+ obj_t *pMsgp;
DEFiRet;
ASSERT(pThis != NULL);
@@ -908,8 +910,13 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
* mode the consumer probably has a lot to convey (which get's lost in the other modes
* because they are asynchronous. But direct mode is deliberately synchronous.
* rgerhards, 2008-02-12
+ * We use our knowledge about the aUsrp_t structure below, but without that, we
+ * pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
- iRet = pThis->pConsumer(pThis->pUsr, pUsr);
+ pMsgp = (obj_t*) pUsr;
+ aUsrp.nElem = 1; /* there always is only one in direct mode */
+ aUsrp.pUsrp = &pMsgp;
+ iRet = pThis->pConsumer(pThis->pUsr, &aUsrp);
RETiRet;
}
@@ -959,7 +966,7 @@ UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
* rgerhards, 2008-01-29
*/
static rsRetVal
-qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
+GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
@@ -1015,7 +1022,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
if(pThis->iUngottenObjs > 0) {
- iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
+ iRet = GetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
ATOMIC_DEC(pThis->iQueueSize);
@@ -1243,7 +1250,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1578,14 +1585,12 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
-// MULTIQUEUE: here we need to iterate through array! - or better pass it as whole? ... probably
- int i;
- for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
- CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp->pUsrp[i]));
+ CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
*/
+//TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that...
if(pThis->iDeqSlowdown) {
dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n",
pThis->iDeqSlowdown);
@@ -1597,7 +1602,7 @@ finalize_it:
}
-/* This is a special consumer to feed the disk-queue in disk-assited mode.
+/* This is a special consumer to feed the disk-queue in disk-assisted mode.
* When active, our own queue more or less acts as a memory buffer to the disk.
* So this consumer just needs to drain the memory queue and submit entries
* to the disk queue. The disk queue will then call the actual consumer from
@@ -1609,18 +1614,17 @@ finalize_it:
static rsRetVal
ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
-// MULTIQUEUE:
- int i;
+ /* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i]));
-
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
RETiRet;
@@ -1944,7 +1948,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
* to the regular files. -- rgerhards, 2008-01-29
*/
while(pThis->iUngottenObjs > 0) {
- CHKiRet(qqueueGetUngottenObj(pThis, &pUsr));
+ CHKiRet(GetUngottenObj(pThis, &pUsr));
CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
objDestruct(pUsr);
}
diff --git a/runtime/queue.h b/runtime/queue.h
index 7ecb9294..4fb57d07 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -100,11 +100,10 @@ typedef struct queue_s {
* the user really wanted...). -- rgerhards, 2008-04-02
*/
/* end dequeue time window */
- rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */
+ rsRetVal (*pConsumer)(void *,aUsrp_t*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
- * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer
- * to message)
- * rgerhards, 2008-01-28
+ * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
+ * is pointer to an array of message message pointers)
*/
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
@@ -185,7 +184,7 @@ rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*));
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*));
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int);
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 8c86c12e..f48fa759 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -129,6 +129,7 @@
#include "omfile.h"
#include "omdiscard.h"
#include "threads.h"
+#include "wti.h"
#include "queue.h"
#include "stream.h"
#include "conf.h"
@@ -1202,22 +1203,29 @@ processMsg(msg_t *pMsg)
/* The consumer of dequeued messages. This function is called by the
* queue engine on dequeueing of a message. It runs on a SEPARATE
- * THREAD.
- * Please note: the message object is destructed by the queue itself!
+ * THREAD. It receives an array of pointers, which it must iterate
+ * over. We do not do any further batching, as this is of no benefit
+ * for the main queue.
*/
static rsRetVal
-msgConsumer(void __attribute__((unused)) *notNeeded, void *pUsr)
+msgConsumer(void __attribute__((unused)) *notNeeded, aUsrp_t *paUsrp)
{
+ int i;
+ msg_t *pMsg;
DEFiRet;
- msg_t *pMsg = (msg_t*) pUsr;
- assert(pMsg != NULL);
+ assert(paUsrp != NULL);
- if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
- parseMsg(pMsg);
+ for(i = 0 ; i < paUsrp->nElem ; i++) {
+ pMsg = (msg_t*) paUsrp->pUsrp[i];
+dbgprintf("msgConsumer..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
+ if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
+ parseMsg(pMsg);
+ }
+ processMsg(pMsg);
+ msgDestruct(&pMsg);
}
- processMsg(pMsg);
- msgDestruct(&pMsg);
+dbgprintf("DONE msgConsumer..MULTIQUEUE:\n");
RETiRet;
}