summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-04-22 16:39:58 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-04-22 16:39:58 +0200
commite4b3f6d287d74b34d27b4e296c33cb3f1294a58c (patch)
treedd75cbbc5f857ccdb6d835fb2bf850ed1527b6ff
parent7667845bd72b6f92eabc975318a4f288a77f2630 (diff)
downloadrsyslog-e4b3f6d287d74b34d27b4e296c33cb3f1294a58c.tar.gz
rsyslog-e4b3f6d287d74b34d27b4e296c33cb3f1294a58c.tar.xz
rsyslog-e4b3f6d287d74b34d27b4e296c33cb3f1294a58c.zip
now batches are handed down to the actual consumer
... but the action consumer does not do anything really intelligent with them. But the DA consumer is already done, as is the main message queue consumer.
-rw-r--r--action.c30
-rw-r--r--runtime/queue.c30
-rw-r--r--runtime/queue.h9
-rw-r--r--tools/syslogd.c26
4 files changed, 66 insertions, 29 deletions
diff --git a/action.c b/action.c
index 03073153..8395642c 100644
--- a/action.c
+++ b/action.c
@@ -42,10 +42,11 @@
#include "cfsysline.h"
#include "srUtils.h"
#include "errmsg.h"
+#include "wti.h"
#include "datetime.h"
/* forward definitions */
-rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg);
+rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t*);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -255,7 +256,8 @@ actionConstructFinalize(action_t *pThis)
* to be run on multiple threads. So far, this is forbidden by the interface
* spec. -- rgerhards, 2008-01-30
*/
- CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
+ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize,
+ (rsRetVal (*)(void*,aUsrp_t*))actionCallDoActionMULTIQUEUE));
obj.SetName((obj_t*) pThis->pQueue, pszQName);
/* ... set some properties ... */
@@ -415,6 +417,7 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
+//MULTIQUEUE: think about these two functions below
/* call the DoAction output plugin entry point
* rgerhards, 2008-01-28
*/
@@ -527,6 +530,29 @@ finalize_it:
#pragma GCC diagnostic warning "-Wempty-body"
+/* receive an array of to-process user pointers and submit them
+ * for processing.
+ * rgerhards, 2009-04-22
+ */
+rsRetVal
+actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp)
+{
+ int i;
+ msg_t *pMsg;
+ DEFiRet;
+
+ assert(paUsrp != NULL);
+
+ for(i = 0 ; i < paUsrp->nElem ; i++) {
+ pMsg = (msg_t*) paUsrp->pUsrp[i];
+dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
+ CHKiRet(actionCallDoAction(pAction, pMsg));
+ }
+finalize_it:
+ RETiRet;
+}
+
+
/* call the HUP handler for a given action, if such a handler is defined. The
* action mutex is locked, because the HUP handler most probably needs to modify
* some internal state information.
diff --git a/runtime/queue.c b/runtime/queue.c
index c48eb724..ea8567ea 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -899,6 +899,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
+ aUsrp_t aUsrp;
+ obj_t *pMsgp;
DEFiRet;
ASSERT(pThis != NULL);
@@ -908,8 +910,13 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
* mode the consumer probably has a lot to convey (which get's lost in the other modes
* because they are asynchronous. But direct mode is deliberately synchronous.
* rgerhards, 2008-02-12
+ * We use our knowledge about the aUsrp_t structure below, but without that, we
+ * pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
- iRet = pThis->pConsumer(pThis->pUsr, pUsr);
+ pMsgp = (obj_t*) pUsr;
+ aUsrp.nElem = 1; /* there always is only one in direct mode */
+ aUsrp.pUsrp = &pMsgp;
+ iRet = pThis->pConsumer(pThis->pUsr, &aUsrp);
RETiRet;
}
@@ -959,7 +966,7 @@ UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
* rgerhards, 2008-01-29
*/
static rsRetVal
-qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
+GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
@@ -1015,7 +1022,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
if(pThis->iUngottenObjs > 0) {
- iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
+ iRet = GetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
ATOMIC_DEC(pThis->iQueueSize);
@@ -1243,7 +1250,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1578,14 +1585,12 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
-// MULTIQUEUE: here we need to iterate through array! - or better pass it as whole? ... probably
- int i;
- for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
- CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp->pUsrp[i]));
+ CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
*/
+//TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that...
if(pThis->iDeqSlowdown) {
dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n",
pThis->iDeqSlowdown);
@@ -1597,7 +1602,7 @@ finalize_it:
}
-/* This is a special consumer to feed the disk-queue in disk-assited mode.
+/* This is a special consumer to feed the disk-queue in disk-assisted mode.
* When active, our own queue more or less acts as a memory buffer to the disk.
* So this consumer just needs to drain the memory queue and submit entries
* to the disk queue. The disk queue will then call the actual consumer from
@@ -1609,18 +1614,17 @@ finalize_it:
static rsRetVal
ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
-// MULTIQUEUE:
- int i;
+ /* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i]));
-
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
RETiRet;
@@ -1944,7 +1948,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
* to the regular files. -- rgerhards, 2008-01-29
*/
while(pThis->iUngottenObjs > 0) {
- CHKiRet(qqueueGetUngottenObj(pThis, &pUsr));
+ CHKiRet(GetUngottenObj(pThis, &pUsr));
CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
objDestruct(pUsr);
}
diff --git a/runtime/queue.h b/runtime/queue.h
index 7ecb9294..4fb57d07 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -100,11 +100,10 @@ typedef struct queue_s {
* the user really wanted...). -- rgerhards, 2008-04-02
*/
/* end dequeue time window */
- rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */
+ rsRetVal (*pConsumer)(void *,aUsrp_t*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
- * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer
- * to message)
- * rgerhards, 2008-01-28
+ * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
+ * is pointer to an array of message message pointers)
*/
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
@@ -185,7 +184,7 @@ rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*));
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*));
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int);
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 8c86c12e..f48fa759 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -129,6 +129,7 @@
#include "omfile.h"
#include "omdiscard.h"
#include "threads.h"
+#include "wti.h"
#include "queue.h"
#include "stream.h"
#include "conf.h"
@@ -1202,22 +1203,29 @@ processMsg(msg_t *pMsg)
/* The consumer of dequeued messages. This function is called by the
* queue engine on dequeueing of a message. It runs on a SEPARATE
- * THREAD.
- * Please note: the message object is destructed by the queue itself!
+ * THREAD. It receives an array of pointers, which it must iterate
+ * over. We do not do any further batching, as this is of no benefit
+ * for the main queue.
*/
static rsRetVal
-msgConsumer(void __attribute__((unused)) *notNeeded, void *pUsr)
+msgConsumer(void __attribute__((unused)) *notNeeded, aUsrp_t *paUsrp)
{
+ int i;
+ msg_t *pMsg;
DEFiRet;
- msg_t *pMsg = (msg_t*) pUsr;
- assert(pMsg != NULL);
+ assert(paUsrp != NULL);
- if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
- parseMsg(pMsg);
+ for(i = 0 ; i < paUsrp->nElem ; i++) {
+ pMsg = (msg_t*) paUsrp->pUsrp[i];
+dbgprintf("msgConsumer..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
+ if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
+ parseMsg(pMsg);
+ }
+ processMsg(pMsg);
+ msgDestruct(&pMsg);
}
- processMsg(pMsg);
- msgDestruct(&pMsg);
+dbgprintf("DONE msgConsumer..MULTIQUEUE:\n");
RETiRet;
}