summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog2
-rw-r--r--action.c4
-rw-r--r--doc/queues.html27
-rw-r--r--doc/rsyslog_conf_global.html2
-rw-r--r--runtime/queue.c22
-rw-r--r--runtime/queue.h3
-rw-r--r--runtime/wti.c6
-rw-r--r--runtime/wtp.c4
-rw-r--r--runtime/wtp.h4
-rw-r--r--tests/testsuites/da-mainmsg-q.conf2
-rw-r--r--tools/syslogd.c4
11 files changed, 60 insertions, 20 deletions
diff --git a/ChangeLog b/ChangeLog
index 1ad99be1..ccc8e2c5 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,5 @@
+- added $MainMsgQueueDequeueBatchSize and $ActionQueueDequeueBatchSize
+ configuration directives
---------------------------------------------------------------------------
Version 4.3.1 [DEVEL] (rgerhards), 2009-04-??
- improved doc
diff --git a/action.c b/action.c
index 8395642c..be3c7556 100644
--- a/action.c
+++ b/action.c
@@ -64,6 +64,7 @@ static int bActionRepMsgHasMsg = 0; /* last messsage repeated... has msg fragme
/* main message queue and its configuration parameters */
static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
static int iActionQueueSize = 1000; /* size of the main message queue above */
+static int iActionQueueDeqBatchSize = 16; /* batch size for action queues */
static int iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */
static int iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */
static int iActionQDiscardMark = 9800; /* begin to discard messages */
@@ -144,6 +145,7 @@ actionResetQueueParams(void)
ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
iActionQueueSize = 1000; /* size of the main message queue above */
+ iActionQueueDeqBatchSize = 16; /* default batch size */
iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */
iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */
iActionQDiscardMark = 9800; /* begin to discard messages */
@@ -272,6 +274,7 @@ actionConstructFinalize(action_t *pThis)
qqueueSetpUsr(pThis->pQueue, pThis);
setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
+ setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", iActionQueueDeqBatchSize);
setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
@@ -857,6 +860,7 @@ actionAddCfSysLineHdrl(void)
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL));
diff --git a/doc/queues.html b/doc/queues.html
index 4a9509a0..f063e87c 100644
--- a/doc/queues.html
+++ b/doc/queues.html
@@ -332,6 +332,33 @@ in this regard - it was just not requested so far. So if you need more
fine-grained control, let us know and we'll probably implement it.
There are two configuration directives, both should be used together or
results are unpredictable:" <i>$&lt;object&gt;QueueDequeueTimeBegin &lt;hour&gt;</i>" and&nbsp;"<i>$&lt;object&gt;QueueDequeueTimeEnd &lt;hour&gt;</i>". The hour parameter must be specified in 24-hour format (so 10pm is 22). A use case for this parameter can be found in the <a href="http://wiki.rsyslog.com/index.php/OffPeakHours">rsyslog wiki</a>. </p>
+<h2>Performance</h2>
+<p>The locking involved with maintaining the queue has a potentially large
+performance impact. How large this is, and if it exists at all, depends much on
+the configuration and actual use case. However, the queue is able to work on
+so-called &quot;batches&quot; when dequeueing data elements. With batches,
+multiple data elements are dequeued at once (with a single locking call).
+The queue dequeues all available elements up to a configured upper
+limit (<i>&lt;object&gt;DequeueBatchSize &lt;number&gt;</i>). It is important
+to note that the actual upper limit is dictated by availability. The queue engine
+will never wait for a batch to fill. So even if a high upper limit is configured,
+batches may consist of fewer elements, even just one, if there are no more elements
+waiting in the queue.
+<p>Batching
+can improve performance considerably. Note, however, that it affects the
+order in which messages are passed to the queue worker threads, as each worker
+now receive as batch of messages. Also, the larger the batch size and the higher
+the maximum number of permitted worker threads, the more main memory is needed.
+For a busy server, large batch sizes (around 1,000 or even more elements) may be useful.
+Please note that with batching, the main memory must hold BatchSize * NumOfWorkers
+objects in memory (worst-case scenario), even if running in disk-only mode. So if you
+use the default 5 workers at the main message queue and set the batch size to 1,000, you need
+to be prepared that the main message queue holds up to 5,000 messages in main memory
+<b>in addition</b> to the configured queue size limits!
+<p>The queue object's default maximum batch size
+is eight, but there exists different defaults for the actual parts of
+rsyslog processing that utilize queues. So you need to check these object's
+defaults.
<h2>Terminating Queues</h2>
<p>Terminating a process sounds easy, but can be complex.
Terminating a running queue is in fact the most complex operation a queue
diff --git a/doc/rsyslog_conf_global.html b/doc/rsyslog_conf_global.html
index 3e33f0da..af05715d 100644
--- a/doc/rsyslog_conf_global.html
+++ b/doc/rsyslog_conf_global.html
@@ -56,6 +56,7 @@ default template for UDP and plain TCP forwarding action</li>
<li>$ActionGSSForwardDefaultTemplate [templateName] - sets a
new default template for GSS-API forwarding action</li>
<li>$ActionQueueCheckpointInterval &lt;number&gt;</li>
+<li>$ActionQueueDequeueBatchSize &lt;number&gt; [default 16]</li>
<li>$ActionQueueDequeueSlowdown &lt;number&gt; [number
is timeout in <i> micro</i>seconds (1000000us is 1sec!),
default 0 (no delay). Simple rate-limiting!]</li>
@@ -125,6 +126,7 @@ not recommended for use with rsyslog. To do a full restart, simply stop and star
compatibility reasons. If it is set to "off", a HUP will only close open files. This is a much quicker action and usually
the only one that is needed e.g. for log rotation. <b>It is recommended to set the setting to "off".</b></li>
<li><a href="rsconf1_includeconfig.html">$IncludeConfig</a></li><li>MainMsgQueueCheckpointInterval &lt;number&gt;</li>
+<li>$MainMsgQueueDequeueBatchSize &lt;number&gt; [default 32]</li>
<li>$MainMsgQueueDequeueSlowdown &lt;number&gt; [number
is timeout in <i> micro</i>seconds (1000000us is 1sec!),
default 0 (no delay). Simple rate-limiting!]</li>
diff --git a/runtime/queue.c b/runtime/queue.c
index c5f9df81..f3d3fe71 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -69,7 +69,7 @@ rsRetVal qqueueChkPersist(qqueue_t *pThis);
static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
-static rsRetVal GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal);
+static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static int qqueueIsIdleDA(qqueue_t *pThis);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2);
@@ -375,7 +375,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
// MULTIQUEUE: TODO: this should be DA-specific!
- CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce));
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup));
@@ -1280,7 +1280,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->pConsumer = pConsumer;
pThis->iNumWorkerThreads = iWorkerThreads;
pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
- pThis->iDeqMaxAtOnce = 8; /* conservative default, should still provide good performance */
+ pThis->iDeqBatchSize = 8; /* conservative default, should still provide good performance */
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
@@ -1421,7 +1421,7 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
/* all well, use this element */
pWti->paUsrp->pUsrp[nDequeued++] = pUsr;
- } while(iQueueSize > 0 && nDequeued < pThis->iDeqMaxAtOnce);
+ } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize);
//bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
pWti->paUsrp->nElem = nDequeued;
@@ -1691,14 +1691,11 @@ ChkStooWrkrReg(qqueue_t *pThis)
* rgerhards, 2009-04-22
*/
static rsRetVal
-GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal)
+GetDeqBatchSize(qqueue_t *pThis, int *pVal)
{
DEFiRet;
assert(pVal != NULL);
-RUNLOG_VAR("%d", pThis->iDeqMaxAtOnce); // MULTIQUEUE: delete this when done
-
- *pVal = pThis->iDeqMaxAtOnce;
-
+ *pVal = pThis->iDeqBatchSize;
RETiRet;
}
@@ -1819,10 +1816,10 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, "
- "full delay %d, light delay %d starting\n",
+ "full delay %d, light delay %d, deq batch size %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1,
- pThis->iFullDlyMrk, pThis->iLightDlyMrk);
+ pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
@@ -1835,7 +1832,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg));
- CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce));
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup));
@@ -2301,6 +2298,7 @@ DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
DEFpropSetMeth(qqueue, pUsr, void*)
DEFpropSetMeth(qqueue, iDeqSlowdown, int)
+DEFpropSetMeth(qqueue, iDeqBatchSize, int)
DEFpropSetMeth(qqueue, sizeOnDiskMax, int64)
diff --git a/runtime/queue.h b/runtime/queue.h
index 4fb57d07..8a60254b 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -85,7 +85,7 @@ typedef struct queue_s {
int toActShutdown; /* timeout for long-running action shutdown in ms */
int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
int toEnq; /* enqueue timeout */
- int iDeqMaxAtOnce; /* max number of elements that shall be dequeued at once */
+ int iDeqBatchSize; /* max number of elements that shall be dequeued at once */
/* rate limiting settings (will be expanded) */
int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */
/* end rate limiting */
@@ -202,6 +202,7 @@ PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int);
PROTOTYPEpropSetMeth(qqueue, pUsr, void*);
PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int);
PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64);
+PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int);
#define qqueueGetID(pThis) ((unsigned long) pThis)
#endif /* #ifndef QUEUE_H_INCLUDED */
diff --git a/runtime/wti.c b/runtime/wti.c
index f50b3894..df1ea0ed 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -226,7 +226,7 @@ rsRetVal
wtiConstructFinalize(wti_t *pThis)
{
DEFiRet;
- int iDeqMaxAtOnce;
+ int iDeqBatchSize;
ISOBJ_TYPE_assert(pThis, wti);
@@ -236,9 +236,9 @@ wtiConstructFinalize(wti_t *pThis)
pThis->tCurrCmd = eWRKTHRD_STOPPED;
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
- CHKiRet(pThis->pWtp->pfGetDeqMaxAtOnce(pThis->pWtp->pUsr, &iDeqMaxAtOnce));
+ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
CHKmalloc(pThis->paUsrp = calloc(1, sizeof(aUsrp_t)));
- CHKmalloc(pThis->paUsrp->pUsrp = calloc((size_t)iDeqMaxAtOnce, sizeof(void*)));
+ CHKmalloc(pThis->paUsrp->pUsrp = calloc((size_t)iDeqBatchSize, sizeof(void*)));
finalize_it:
RETiRet;
diff --git a/runtime/wtp.c b/runtime/wtp.c
index e1966099..8bb55cf7 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -88,7 +88,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pthread_cond_init(&pThis->condThrdTrm, NULL);
/* set all function pointers to "not implemented" dummy so that we can safely call them */
pThis->pfChkStopWrkr = NotImplementedDummy;
- pThis->pfGetDeqMaxAtOnce = NotImplementedDummy;
+ pThis->pfGetDeqBatchSize = NotImplementedDummy;
pThis->pfIsIdle = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
pThis->pfOnIdle = NotImplementedDummy;
@@ -585,7 +585,7 @@ DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t)
DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t)
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
-DEFpropSetMethFP(wtp, pfGetDeqMaxAtOnce, rsRetVal(*pVal)(void*, int*))
+DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int))
DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 5894000a..88bd9197 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -71,7 +71,7 @@ typedef struct wtp_s {
pthread_mutex_t *pmutUsr;
pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */
rsRetVal (*pfChkStopWrkr)(void *pUsr, int);
- rsRetVal (*pfGetDeqMaxAtOnce)(void *pUsr, int*); /* obtains max dequeue count from queue config */
+ rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */
rsRetVal (*pfRateLimiter)(void *pUsr);
rsRetVal (*pfIsIdle)(void *pUsr, int);
rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int);
@@ -105,7 +105,7 @@ int wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex);
PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));
-PROTOTYPEpropSetMethFP(wtp, pfGetDeqMaxAtOnce, rsRetVal(*pVal)(void*, int*));
+PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*));
PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int));
PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));
diff --git a/tests/testsuites/da-mainmsg-q.conf b/tests/testsuites/da-mainmsg-q.conf
index 3465d13b..b55643c0 100644
--- a/tests/testsuites/da-mainmsg-q.conf
+++ b/tests/testsuites/da-mainmsg-q.conf
@@ -16,6 +16,8 @@ $MainMsgQueueLowWatermark 40
$MainMsgQueueFilename mainq
$MainMsgQueueType linkedlist
$MainMsgQueueDeqzezeSlowdown 1000
+# ucomment, as we now have an issue (finally the test case works ;))
+#$MainMsgQueueDequeueBatchSize 80
$template outfmt,"%msg:F,58:2%\n"
$template dynfile,"rsyslog.out.log" # trick to use relative path names!
diff --git a/tools/syslogd.c b/tools/syslogd.c
index f48fa759..5b795755 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -304,6 +304,7 @@ static int iMainMsgQtoWrkShutdown = 60000; /* timeout for worker thread shutdo
static int iMainMsgQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
static int iMainMsgQDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
static int64 iMainMsgQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */
+static int iMainMsgQueDeqBatchSize = 32; /* dequeue batch size */
static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
static int iMainMsgQueueDeqtWinFromHr = 0; /* hour begin of time frame when queue is to be dequeued */
static int iMainMsgQueueDeqtWinToHr = 25; /* hour begin of time frame when queue is to be dequeued */
@@ -370,6 +371,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
bMainMsgQSaveOnShutdown = 1;
MainMsgQueType = QUEUETYPE_FIXED_ARRAY;
iMainMsgQueMaxDiskSpace = 0;
+ iMainMsgQueDeqBatchSize = 32;
glbliActionResumeRetryCount = 0;
return RS_RET_OK;
@@ -2508,6 +2510,7 @@ init(void)
setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize);
setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace);
+ setQPROP(qqueueSetiDeqBatchSize, "$MainMsgQueueDequeueBatchSize", iMainMsgQueDeqBatchSize);
setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName);
setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt);
setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown );
@@ -2888,6 +2891,7 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iMainMsgQDeqSlowdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeuebatchsize", 0, eCmdHdlrSize, NULL, &iMainMsgQueDeqBatchSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxDiskSpace, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iMainMsgQueueDeqtWinFromHr, NULL));