summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-12 15:27:40 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-12 15:27:40 +0200
commitbb79e96dc300fa5a2182e7c047afb3b15c5dc870 (patch)
tree85680d43b503648da48fafe1178b1cb1ce62cc86
parent21b7f7e603639fa8f354c954b0e467abb72b6c12 (diff)
downloadrsyslog-bb79e96dc300fa5a2182e7c047afb3b15c5dc870.tar.gz
rsyslog-bb79e96dc300fa5a2182e7c047afb3b15c5dc870.tar.xz
rsyslog-bb79e96dc300fa5a2182e7c047afb3b15c5dc870.zip
moving to a cleaner implementation of batches
... now that we know what we need from a theoretical POV.
-rw-r--r--action.c25
-rw-r--r--runtime/Makefile.am1
-rw-r--r--runtime/batch.h63
-rw-r--r--runtime/debug.c14
-rw-r--r--runtime/queue.c49
-rw-r--r--runtime/queue.h5
-rw-r--r--runtime/rsyslog.h2
-rw-r--r--runtime/wti.c10
-rw-r--r--runtime/wti.h14
-rw-r--r--tools/syslogd.c9
10 files changed, 132 insertions, 60 deletions
diff --git a/action.c b/action.c
index 509ad749..b12eda6e 100644
--- a/action.c
+++ b/action.c
@@ -42,13 +42,14 @@
#include "cfsysline.h"
#include "srUtils.h"
#include "errmsg.h"
+#include "batch.h"
#include "wti.h"
#include "datetime.h"
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
-rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t*);
+static rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -261,7 +262,7 @@ actionConstructFinalize(action_t *pThis)
* spec. -- rgerhards, 2008-01-30
*/
CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize,
- (rsRetVal (*)(void*,aUsrp_t*))actionCallDoActionMULTIQUEUE));
+ (rsRetVal (*)(void*, batch_t*))actionCallDoActionMULTIQUEUE));
obj.SetName((obj_t*) pThis->pQueue, pszQName);
/* ... set some properties ... */
@@ -782,19 +783,19 @@ finalize_it:
* for processing.
* rgerhards, 2009-04-22
*/
-rsRetVal
-actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, aUsrp_t *paUsrp)
+static rsRetVal
+actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, batch_t *pBatch)
{
int i;
msg_t *pMsg;
rsRetVal localRet;
DEFiRet;
- assert(paUsrp != NULL);
+ assert(pBatch != NULL);
- for(i = 0 ; i < paUsrp->nElem ; i++) {
- pMsg = (msg_t*) paUsrp->pUsrp[i];
-dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
+ for(i = 0 ; i < pBatch->nElem ; i++) {
+ pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
+dbgprintf("actionCall..MULTIQUEUE: i: %d/%d, pMsg: %p\n", i, pBatch->nElem, pMsg);
localRet = actionProcessMessage(pAction, pMsg);
dbgprintf("action call returned %d\n", localRet);
msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */
@@ -810,13 +811,13 @@ finalize_it:
* for processing.
* rgerhards, 2009-04-22
*/
-rsRetVal
-actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp)
+static rsRetVal
+actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch)
{
int iCancelStateSave;
DEFiRet;
- assert(paUsrp != NULL);
+ assert(pBatch != NULL);
/* We now must guard the output module against execution by multiple threads. The
* plugin interface specifies that output modules must not be thread-safe (except
@@ -828,7 +829,7 @@ actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp)
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
pthread_setcancelstate(iCancelStateSave, NULL);
- iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, paUsrp);
+ iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, pBatch);
pthread_cleanup_pop(1); /* unlock mutex */
diff --git a/runtime/Makefile.am b/runtime/Makefile.am
index 2f0a1aa0..f0fb1cdd 100644
--- a/runtime/Makefile.am
+++ b/runtime/Makefile.am
@@ -8,6 +8,7 @@ librsyslog_la_SOURCES = \
rsyslog.c \
rsyslog.h \
atomic.h \
+ batch.h \
syslogd-types.h \
module-template.h \
obj-types.h \
diff --git a/runtime/batch.h b/runtime/batch.h
new file mode 100644
index 00000000..cb40cf42
--- /dev/null
+++ b/runtime/batch.h
@@ -0,0 +1,63 @@
+/* Definition of the batch_t data structure.
+ * I am not sure yet if this will become a full-blown object. For now, this header just
+ * includes the object definition and is not accompanied by code.
+ *
+ * Copyright 2009 by Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of the rsyslog runtime library.
+ *
+ * The rsyslog runtime library is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The rsyslog runtime library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
+ */
+
+#ifndef BATCH_H_INCLUDED
+#define BATCH_H_INCLUDED
+
+/* enum for batch states. Actually, we violate a layer here, in that we assume that a batch is used
+ * for action processing. So far, this seems acceptable, the status is simply ignored inside the
+ * main message queue. But over time, it could potentially be useful to split the two.
+ * rgerhad, 2009-05-12
+ */
+typedef enum {
+ BATCH_STATE_RDY = 0, /* object ready for processing */
+ BATCH_STATE_BAD = 1, /* unrecoverable failure while processing, do NOT resubmit to same action */
+ BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unkonwn */
+ BATCH_STATE_DISC = 3, /* discarded - processed OK, but do not submit to any other action */
+} batch_state_t;
+
+
+/* an object inside a batch, including any information (state!) needed for it to "life".
+ */
+struct batch_obj_s {
+ obj_t *pUsrp; /* pointer to user object (most often message) */
+ batch_state_t state; /* associated state */
+};
+
+/* the batch
+ * This object is used to dequeue multiple user pointers which are than handed over
+ * to processing. The size of elements is fixed after queue creation, but may be
+ * modified by config variables (better said: queue properties).
+ * Note that a "user pointer" in rsyslog context so far always is a message
+ * object. We stick to the more generic term because queues may potentially hold
+ * other types of objects, too.
+ * rgerhards, 2009-05-12
+ */
+struct batch_s {
+ int nElem; /* actual number of element in this entry */
+ batch_obj_t *pElem; /* batch elements */
+};
+
+#endif /* #ifndef BATCH_H_INCLUDED */
diff --git a/runtime/debug.c b/runtime/debug.c
index 4ee90226..4f45a1e3 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -1050,7 +1050,9 @@ int dbgEntrFunc(dbgFuncDB_t **ppFuncDB, const char *file, const char *func, int
/* when we reach this point, we have a fully-initialized FuncDB! */
ATOMIC_INC(pFuncDB->nTimesCalled);
if(bLogFuncFlow && dbgPrintNameIsInList((const uchar*)pFuncDB->file, printNameFileRoot))
- dbgprintf("%s:%d: %s: enter\n", pFuncDB->file, pFuncDB->line, pFuncDB->func);
+ if(strcmp(pFuncDB->file, "stringbuf.c")) { /* TODO: make configurable */
+ dbgprintf("%s:%d: %s: enter\n", pFuncDB->file, pFuncDB->line, pFuncDB->func);
+ }
if(pThrd->stackPtr >= (int) (sizeof(pThrd->callStack) / sizeof(dbgFuncDB_t*))) {
dbgprintf("%s:%d: %s: debug module: call stack for this thread full, suspending call tracking\n",
pFuncDB->file, pFuncDB->line, pFuncDB->func);
@@ -1080,10 +1082,12 @@ void dbgExitFunc(dbgFuncDB_t *pFuncDB, int iStackPtrRestore, int iRet)
dbgFuncDBPrintActiveMutexes(pFuncDB, "WARNING: mutex still owned by us as we exit function, mutex: ", pthread_self());
if(bLogFuncFlow && dbgPrintNameIsInList((const uchar*)pFuncDB->file, printNameFileRoot)) {
- if(iRet == RS_RET_NO_IRET)
- dbgprintf("%s:%d: %s: exit: (no iRet)\n", pFuncDB->file, pFuncDB->line, pFuncDB->func);
- else
- dbgprintf("%s:%d: %s: exit: %d\n", pFuncDB->file, pFuncDB->line, pFuncDB->func, iRet);
+ if(strcmp(pFuncDB->file, "stringbuf.c")) { /* TODO: make configurable */
+ if(iRet == RS_RET_NO_IRET)
+ dbgprintf("%s:%d: %s: exit: (no iRet)\n", pFuncDB->file, pFuncDB->line, pFuncDB->func);
+ else
+ dbgprintf("%s:%d: %s: exit: %d\n", pFuncDB->file, pFuncDB->line, pFuncDB->func, iRet);
+ }
}
pThrd->stackPtr = iStackPtrRestore;
if(pThrd->stackPtr < 0) {
diff --git a/runtime/queue.c b/runtime/queue.c
index c2df928b..c3a8e9d4 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -65,7 +65,7 @@ DEFobjStaticHelpers
DEFobjCurrIf(glbl)
/* forward-definitions */
-rsRetVal qqueueChkPersist(qqueue_t *pThis);
+static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
@@ -896,8 +896,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
- aUsrp_t aUsrp;
- obj_t *pMsgp;
+ batch_t singleBatch;
+ batch_obj_t batchObj;
DEFiRet;
ASSERT(pThis != NULL);
@@ -907,17 +907,19 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
* mode the consumer probably has a lot to convey (which get's lost in the other modes
* because they are asynchronous. But direct mode is deliberately synchronous.
* rgerhards, 2008-02-12
- * We use our knowledge about the aUsrp_t structure below, but without that, we
+ * We use our knowledge about the batch_t structure below, but without that, we
* pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
- pMsgp = (obj_t*) pUsr;
- aUsrp.nElem = 1; /* there always is only one in direct mode */
- aUsrp.pUsrp = &pMsgp;
- iRet = pThis->pConsumer(pThis->pUsr, &aUsrp);
+ batchObj.state = BATCH_STATE_RDY;
+ batchObj.pUsrp = (obj_t*) pUsr;
+ singleBatch.nElem = 1; /* there always is only one in direct mode */
+ singleBatch.pElem = &batchObj;
+ iRet = pThis->pConsumer(pThis->pUsr, &singleBatch);
RETiRet;
}
+
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
{
return RS_RET_OK;
@@ -1247,7 +1249,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1402,11 +1404,12 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize
rsRetVal localRet;
DEFiRet;
+ /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */
+
nDequeued = 0;
do {
dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
CHKiRet(qqueueDel(pThis, &pUsr));
- qqueueChkPersist(pThis); /* is is questionable if we should really need to call this every time... */
iQueueSize = qqueueGetOverallQueueSize(pThis);
/* check if we should discard this element */
@@ -1417,11 +1420,15 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
ABORT_FINALIZE(localRet);
/* all well, use this element */
- pWti->paUsrp->pUsrp[nDequeued++] = pUsr;
+ pWti->batch.pElem[nDequeued].pUsrp = pUsr;
+ pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
+ ++nDequeued;
} while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize);
+ qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */
+
//bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
- pWti->paUsrp->nElem = nDequeued;
+ pWti->batch.nElem = nDequeued;
*iRemainingQueueSize = iQueueSize;
finalize_it:
@@ -1582,7 +1589,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp));
+ CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -1619,8 +1626,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
/* iterate over returned results and enqueue them in DA queue */
- for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i]));
+ for(i = 0 ; i < pWti->batch.nElem ; i++)
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp));
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
@@ -1974,17 +1981,21 @@ finalize_it:
/* check if we need to persist the current queue info. If an
- * error occurs, thus should be ignored by caller (but we still
+ * error occurs, this should be ignored by caller (but we still
* abide to our regular call interface)...
* rgerhards, 2008-01-13
+ * nUpdates is the number of updates since the last call to this function.
+ * It may be > 1 due to batches. -- rgerhards, 2009-05-12
*/
-rsRetVal qqueueChkPersist(qqueue_t *pThis)
+static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(nUpdates > 0);
- if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
+ pThis->iUpdsSincePersist += nUpdates;
+ if(pThis->iPersistUpdCnt && pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
qqueuePersist(pThis, QUEUE_CHECKPOINT);
pThis->iUpdsSincePersist = 0;
}
@@ -2199,7 +2210,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
/* and finally enqueue the message */
CHKiRet(qqueueAdd(pThis, pUsr));
- qqueueChkPersist(pThis);
+ qqueueChkPersist(pThis, 1);
finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
diff --git a/runtime/queue.h b/runtime/queue.h
index 8a60254b..4a5f16a1 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -27,6 +27,7 @@
#include <pthread.h>
#include "obj.h"
#include "wtp.h"
+#include "batch.h"
#include "stream.h"
/* queue types */
@@ -100,7 +101,7 @@ typedef struct queue_s {
* the user really wanted...). -- rgerhards, 2008-04-02
*/
/* end dequeue time window */
- rsRetVal (*pConsumer)(void *,aUsrp_t*); /* user-supplied consumer function for dequeued messages */
+ rsRetVal (*pConsumer)(void *,batch_t*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
* user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
* is pointer to an array of message message pointers)
@@ -184,7 +185,7 @@ rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*));
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*));
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int);
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index ee941b2b..53a510b3 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -95,6 +95,8 @@ typedef struct permittedPeers_s permittedPeers_t; /* this should go away in the
typedef struct permittedPeerWildcard_s permittedPeerWildcard_t; /* this should go away in the long term -- rgerhards, 2008-05-19 */
typedef struct tcpsrv_s tcpsrv_t;
typedef struct vmstk_s vmstk_t;
+typedef struct batch_obj_s batch_obj_t;
+typedef struct batch_s batch_t;
typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */
/* some universal 64 bit define... */
diff --git a/runtime/wti.c b/runtime/wti.c
index 346ef7aa..c3fa127e 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -201,8 +201,7 @@ CODESTARTobjDestruct(wti)
pthread_cond_destroy(&pThis->condExitDone);
pthread_mutex_destroy(&pThis->mut);
- free(pThis->paUsrp->pUsrp);
- free(pThis->paUsrp);
+ free(pThis->batch.pElem);
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -234,8 +233,7 @@ wtiConstructFinalize(wti_t *pThis)
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
- CHKmalloc(pThis->paUsrp = calloc(1, sizeof(aUsrp_t)));
- CHKmalloc(pThis->paUsrp->pUsrp = calloc((size_t)iDeqBatchSize, sizeof(void*)));
+ CHKmalloc(pThis->batch.pElem = calloc((size_t)iDeqBatchSize, sizeof(batch_obj_t*)));
finalize_it:
RETiRet;
@@ -322,7 +320,7 @@ wtiWorkerCancelCleanup(void *arg)
/* call user supplied handler (that one e.g. requeues the element) */
// MULTIQUEUE: need to change here!
- pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->paUsrp->pUsrp[0]);
+ pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pWtp->mut);
@@ -374,7 +372,7 @@ wtiWorker(wti_t *pThis)
ISOBJ_TYPE_assert(pWtp, wtp);
dbgSetThrdName(pThis->pszDbgHdr);
- pThis->paUsrp->nElem = 0; /* flag no elements present */ // MULTIQUEUE: do we really need this any longer (cnacel handeler)?
+ pThis->batch.nElem = 0; /* flag no elements present */ // MULTIQUEUE: do we really need this any longer (cnacel handeler)?
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
diff --git a/runtime/wti.h b/runtime/wti.h
index 85c98fe6..0990941e 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -27,17 +27,7 @@
#include <pthread.h>
#include "wtp.h"
#include "obj.h"
-
-/* the user pointer array object
- * This object is used to dequeue multiple user pointers which are than handed over
- * to processing. The size of elements is fixed after queue creation, but may be
- * modified by config variables (better said: queue properties).
- * rgerhards, 2009-04-22
- */
-struct aUsrp_s {
- int nElem; /* actual number of element in this entry */
- obj_t **pUsrp; /* actual elements (array!) */
-};
+#include "batch.h"
/* the worker thread instance class */
@@ -46,11 +36,11 @@ typedef struct wti_s {
int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */
pthread_t thrdID; /* thread ID */
qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
- aUsrp_t *paUsrp; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
pthread_cond_t condExitDone; /* signaled when the thread exit is done (once per thread existance) */
pthread_mutex_t mut;
int bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */
+ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
uchar *pszDbgHdr; /* header string for debug messages */
} wti_t;
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 866c0173..7ee5dbd7 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -138,6 +138,7 @@
#include "datetime.h"
#include "parser.h"
#include "sysvar.h"
+#include "batch.h"
/* definitions for objects we access */
DEFobjCurrIf(obj)
@@ -1211,16 +1212,16 @@ processMsg(msg_t *pMsg)
* for the main queue.
*/
static rsRetVal
-msgConsumer(void __attribute__((unused)) *notNeeded, aUsrp_t *paUsrp)
+msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch)
{
int i;
msg_t *pMsg;
DEFiRet;
- assert(paUsrp != NULL);
+ assert(pBatch != NULL);
- for(i = 0 ; i < paUsrp->nElem ; i++) {
- pMsg = (msg_t*) paUsrp->pUsrp[i];
+ for(i = 0 ; i < pBatch->nElem ; i++) {
+ pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
dbgprintf("msgConsumer..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
parseMsg(pMsg);