summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--runtime/queue.c182
-rw-r--r--runtime/queue.h4
-rw-r--r--runtime/rsyslog.h1
-rw-r--r--runtime/wti.c17
-rw-r--r--runtime/wti.h16
-rw-r--r--runtime/wtp.c4
-rw-r--r--runtime/wtp.h4
7 files changed, 149 insertions, 79 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index e5db866c..c48eb724 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -8,7 +8,11 @@
* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
* if you are getting aquainted to the object.
*
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ * NOTE: as of 2009-04-22, I have begin to remove the qqueue* prefix from static
+ * function names - this makes it really hard to read and does not provide much
+ * benefit, at least I (now) think so...
+ *
+ * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -63,12 +67,13 @@ DEFobjCurrIf(glbl)
/* forward-definitions */
rsRetVal qqueueChkPersist(qqueue_t *pThis);
static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
-static rsRetVal qqueueRateLimiter(qqueue_t *pThis);
+static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
+static rsRetVal GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal);
static int qqueueIsIdleDA(qqueue_t *pThis);
-static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
-static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2);
-static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
+static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
+static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2);
+static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -369,9 +374,11 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
+ // MULTIQUEUE: TODO: this should be DA-specific!
+ CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
@@ -721,7 +728,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
while(iUngottenObjs > 0) {
/* fill the queue from disk */
CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL));
- qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
+ UngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
--iUngottenObjs; /* one less */
}
@@ -922,7 +929,7 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute_
* rgerhards, 2008-01-20
*/
static rsRetVal
-qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
+UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
@@ -1266,6 +1273,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->pConsumer = pConsumer;
pThis->iNumWorkerThreads = iWorkerThreads;
pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
+ pThis->iDeqMaxAtOnce = 8; /* conservative default, should still provide good performance */
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
@@ -1315,7 +1323,7 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static rsRetVal
-qqueueConsumerCancelCleanup(void *arg1, void *arg2)
+ConsumerCancelCleanup(void *arg1, void *arg2)
{
DEFiRet;
@@ -1327,7 +1335,7 @@ qqueueConsumerCancelCleanup(void *arg1, void *arg2)
if(pUsr != NULL) {
/* make sure the data element is not lost */
dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
- CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX));
+ CHKiRet(UngetObj(pThis, pUsr, LOCK_MUTEX));
}
finalize_it:
@@ -1376,38 +1384,68 @@ finalize_it:
}
+/* dequeue as many user points as are available, until we hit the configured
+ * upper limit of pointers.
+ * This must only be called when the queue mutex is LOOKED, otherwise serious
+ * malfunction will happen.
+ */
+static inline rsRetVal
+DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize)
+{
+ int nDequeued;
+ int iQueueSize;
+ void *pUsr;
+ rsRetVal localRet;
+ DEFiRet;
+
+ nDequeued = 0;
+ do {
+dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
+ CHKiRet(qqueueDel(pThis, &pUsr));
+ qqueueChkPersist(pThis); /* is is questionable if we should really need to call this every time... */
+ iQueueSize = qqueueGetOverallQueueSize(pThis);
+
+ /* check if we should discard this element */
+ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr);
+ if(localRet == RS_RET_QUEUE_FULL)
+ continue;
+ else if(localRet != RS_RET_OK)
+ ABORT_FINALIZE(localRet);
+
+ /* all well, use this element */
+ pWti->paUsrp->pUsrp[nDequeued++] = pUsr;
+ } while(iQueueSize > 0 && nDequeued < pThis->iDeqMaxAtOnce);
+
+ //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
+ pWti->paUsrp->nElem = nDequeued;
+ *iRemainingQueueSize = iQueueSize;
+
+finalize_it:
+ RETiRet;
+}
+
+
/* dequeue the queued object for the queue consumers.
* rgerhards, 2008-10-21
+ * I made a radical change - we now dequeue multiple elements, and store these objects in
+ * an array of user pointers. We expect that this increases performance.
+ * rgerhards, 2009-04-22
*/
static rsRetVal
-qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
- void *pUsr;
- int iQueueSize;
- int bRunsDA; /* cache for early mutex release */
+ int iQueueSize = 0; /* keep the compiler happy... */
- /* dequeue element (still protected from mutex) */
- iRet = qqueueDel(pThis, &pUsr);
- qqueueChkPersist(pThis);
- iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */
- bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
-
- /* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY
- * if we could successfully obtain a user pointer. Otherwise, we would bring the
- * cancel cleanup handler into big troubles (and we did ;)). Note that we can
- * NOT set the variable further below, as this may lead to an object leak. We
- * may get cancelled before we reach that part of the code, so the only
- * solution is to do it here. -- rgerhards, 2008-02-27
- */
- if(iRet == RS_RET_OK) {
- pWti->pUsrp = pUsr;
- }
+ /* dequeue element batch (still protected from mutex) */
+ iRet = DequeueConsumableElements(pThis, pWti, &iQueueSize);
/* awake some flow-controlled sources if we can do this right now */
/* TODO: this could be done better from a performance point of view -- do it only if
* we have someone waiting for the condition (or only when we hit the watermark right
* on the nail [exact value]) -- rgerhards, 2008-03-14
+ * now that we dequeue batches of pointers, this is much less an issue...
+ * rgerhards, 2009-04-22
*/
if(iQueueSize < pThis->iFullDlyMrk) {
pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk);
@@ -1417,37 +1455,16 @@ qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
- /* rgerhards, 2008-09-30: I reversed the order of cond_signal und mutex_unlock
- * as of the pthreads recommendation on predictable scheduling behaviour. I don't see
- * any problems caused by this, but I add this comment in case some will be seen
- * in the next time.
- */
pthread_cond_signal(&pThis->notFull);
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
- /* do actual processing (the lengthy part, runs in parallel)
- * If we had a problem while dequeing, we do not call the consumer,
- * but we otherwise ignore it. This is in the hopes that it will be
- * self-healing. However, this is really not a good thing.
- * rgerhards, 2008-01-03
- */
- if(iRet != RS_RET_OK)
- FINALIZE;
-
- /* we are running in normal, non-disk-assisted mode do a quick check if we need to drain the queue.
- * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
- * provide real-time creation of spool files.
- * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong.
- */
- CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
-
-finalize_it:
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
dbgoprint((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things "
"may happen\n", iRet);
}
+
RETiRet;
}
@@ -1490,7 +1507,7 @@ finalize_it:
* but you get the idea from the code above.
*/
static rsRetVal
-qqueueRateLimiter(qqueue_t *pThis)
+RateLimiter(qqueue_t *pThis)
{
DEFiRet;
int iDelay;
@@ -1553,15 +1570,18 @@ qqueueRateLimiter(qqueue_t *pThis)
* rgerhards, 2008-01-21
*/
static rsRetVal
-qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
+ CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+// MULTIQUEUE: here we need to iterate through array! - or better pass it as whole? ... probably
+ int i;
+ for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
+ CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp->pUsrp[i]));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -1587,15 +1607,19 @@ finalize_it:
* rgerhards, 2008-01-14
*/
static rsRetVal
-qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
+ CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+// MULTIQUEUE:
+ int i;
+ for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i]));
+
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
@@ -1653,12 +1677,27 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
* the DA queue
*/
static int
-qqueueChkStopWrkrReg(qqueue_t *pThis)
+ChkStooWrkrReg(qqueue_t *pThis)
{
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0);
}
+/* return the configured "deq max at once" interval
+ * rgerhards, 2009-04-22
+ */
+static rsRetVal
+GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal)
+{
+ DEFiRet;
+ assert(pVal != NULL);
+RUNLOG_VAR("%d", pThis->iDeqMaxAtOnce); // MULTIQUEUE: delete this when done
+
+ *pVal = pThis->iDeqMaxAtOnce;
+
+ RETiRet;
+}
+
/* must only be called when the queue mutex is locked, else results
* are not stable! DA queue version
*/
@@ -1673,7 +1712,7 @@ qqueueIsIdleDA(qqueue_t *pThis)
* are not stable! Regular queue version
*/
static int
-qqueueIsIdleReg(qqueue_t *pThis)
+IsIdleReg(qqueue_t *pThis)
{
#if 0 /* enable for performance testing */
int ret;
@@ -1701,7 +1740,7 @@ qqueueIsIdleReg(qqueue_t *pThis)
* I am telling this, because I, too, always get confused by those...
*/
static rsRetVal
-qqueueRegOnWrkrShutdown(qqueue_t *pThis)
+RegOnWrkrShutdown(qqueue_t *pThis)
{
DEFiRet;
@@ -1722,7 +1761,7 @@ qqueueRegOnWrkrShutdown(qqueue_t *pThis)
* hook to indicate in the parent queue (if we are a child) that we are not done yet.
*/
static rsRetVal
-qqueueRegOnWrkrStartup(qqueue_t *pThis)
+RegOnWrkrStartup(qqueue_t *pThis)
{
DEFiRet;
@@ -1788,13 +1827,14 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
- CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown));
+ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg));
+ CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
diff --git a/runtime/queue.h b/runtime/queue.h
index a267862d..7ecb9294 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -54,6 +54,7 @@ typedef struct qWrkThrd_s {
pthread_mutex_t mut;
} qWrkThrd_t; /* type for queue worker threads */
+
/* the queue object */
typedef struct queue_s {
BEGINobjInstance;
@@ -84,6 +85,7 @@ typedef struct queue_s {
int toActShutdown; /* timeout for long-running action shutdown in ms */
int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
int toEnq; /* enqueue timeout */
+ int iDeqMaxAtOnce; /* max number of elements that shall be dequeued at once */
/* rate limiting settings (will be expanded) */
int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */
/* end rate limiting */
@@ -97,7 +99,7 @@ typedef struct queue_s {
* applied to detect user configuration errors (and tell me how should we detect what
* the user really wanted...). -- rgerhards, 2008-04-02
*/
- /* ane dequeue time window */
+ /* end dequeue time window */
rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
* user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 026fbbed..74ea5270 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -60,6 +60,7 @@
/* define some base data types */
typedef unsigned char uchar;/* get rid of the unhandy "unsigned char" */
+typedef struct aUsrp_s aUsrp_t;
typedef struct thrdInfo thrdInfo_t;
typedef struct obj_s obj_t;
typedef struct filed selector_t;/* TODO: this so far resides in syslogd.c, think about modularization */
diff --git a/runtime/wti.c b/runtime/wti.c
index 544bffa7..f50b3894 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -201,6 +201,9 @@ CODESTARTobjDestruct(wti)
pthread_cond_destroy(&pThis->condExitDone);
pthread_mutex_destroy(&pThis->mut);
+ if(pThis->paUsrp != NULL)
+ free(pThis->paUsrp);
+
if(pThis->pszDbgHdr != NULL)
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -209,6 +212,7 @@ ENDobjDestruct(wti)
/* Standard-Constructor for the wti object
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
+
pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc();
pthread_cond_init(&pThis->condExitDone, NULL);
pthread_mutex_init(&pThis->mut, NULL);
@@ -222,15 +226,21 @@ rsRetVal
wtiConstructFinalize(wti_t *pThis)
{
DEFiRet;
+ int iDeqMaxAtOnce;
ISOBJ_TYPE_assert(pThis, wti);
dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
/* initialize our thread instance descriptor */
- pThis->pUsrp = NULL;
pThis->tCurrCmd = eWRKTHRD_STOPPED;
+ /* we now alloc the array for user pointers. We obtain the max from the queue itself. */
+ CHKiRet(pThis->pWtp->pfGetDeqMaxAtOnce(pThis->pWtp->pUsr, &iDeqMaxAtOnce));
+ CHKmalloc(pThis->paUsrp = calloc(1, sizeof(aUsrp_t)));
+ CHKmalloc(pThis->paUsrp->pUsrp = calloc((size_t)iDeqMaxAtOnce, sizeof(void*)));
+
+finalize_it:
RETiRet;
}
@@ -314,7 +324,8 @@ wtiWorkerCancelCleanup(void *arg)
DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
/* call user supplied handler (that one e.g. requeues the element) */
- pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp);
+// MULTIQUEUE: need to change here!
+ pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->paUsrp->pUsrp[0]);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pWtp->mut);
@@ -366,7 +377,7 @@ wtiWorker(wti_t *pThis)
ISOBJ_TYPE_assert(pWtp, wtp);
dbgSetThrdName(pThis->pszDbgHdr);
- pThis->pUsrp = NULL;
+ pThis->paUsrp->nElem = 0; /* flag no elements present */ // MULTIQUEUE: do we really need this any longer (cnacel handeler)?
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
diff --git a/runtime/wti.h b/runtime/wti.h
index 6b60b833..85c98fe6 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -1,6 +1,6 @@
/* Definition of the worker thread instance (wti) class.
*
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008, 2009 by Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -28,13 +28,25 @@
#include "wtp.h"
#include "obj.h"
+/* the user pointer array object
+ * This object is used to dequeue multiple user pointers which are than handed over
+ * to processing. The size of elements is fixed after queue creation, but may be
+ * modified by config variables (better said: queue properties).
+ * rgerhards, 2009-04-22
+ */
+struct aUsrp_s {
+ int nElem; /* actual number of element in this entry */
+ obj_t **pUsrp; /* actual elements (array!) */
+};
+
+
/* the worker thread instance class */
typedef struct wti_s {
BEGINobjInstance;
int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */
pthread_t thrdID; /* thread ID */
qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
- obj_t *pUsrp; /* pointer to an object meaningful for current user pointer (e.g. queue pUsr data elemt) */
+ aUsrp_t *paUsrp; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
pthread_cond_t condExitDone; /* signaled when the thread exit is done (once per thread existance) */
pthread_mutex_t mut;
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 04eb974f..e1966099 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -88,6 +88,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pthread_cond_init(&pThis->condThrdTrm, NULL);
/* set all function pointers to "not implemented" dummy so that we can safely call them */
pThis->pfChkStopWrkr = NotImplementedDummy;
+ pThis->pfGetDeqMaxAtOnce = NotImplementedDummy;
pThis->pfIsIdle = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
pThis->pfOnIdle = NotImplementedDummy;
@@ -117,7 +118,7 @@ wtpConstructFinalize(wtp_t *pThis)
*/
if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
-
+
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
CHKiRet(wtiConstruct(&pThis->pWrkr[i]));
pWti = pThis->pWrkr[i];
@@ -584,6 +585,7 @@ DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t)
DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t)
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
+DEFpropSetMethFP(wtp, pfGetDeqMaxAtOnce, rsRetVal(*pVal)(void*, int*))
DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int))
DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
diff --git a/runtime/wtp.h b/runtime/wtp.h
index b9cb07c5..5894000a 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -67,10 +67,11 @@ typedef struct wtp_s {
int bThrdStateChanged; /* at least one thread state has changed if 1 */
/* end sync variables */
/* user objects */
- void *pUsr; /* pointer to user object */
+ void *pUsr; /* pointer to user object (in this case, the queue the wtp belongs to) */
pthread_mutex_t *pmutUsr;
pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */
rsRetVal (*pfChkStopWrkr)(void *pUsr, int);
+ rsRetVal (*pfGetDeqMaxAtOnce)(void *pUsr, int*); /* obtains max dequeue count from queue config */
rsRetVal (*pfRateLimiter)(void *pUsr);
rsRetVal (*pfIsIdle)(void *pUsr, int);
rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int);
@@ -104,6 +105,7 @@ int wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex);
PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));
+PROTOTYPEpropSetMethFP(wtp, pfGetDeqMaxAtOnce, rsRetVal(*pVal)(void*, int*));
PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int));
PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));