summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c363
1 files changed, 319 insertions, 44 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 280ebd94..3c91fd02 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -12,7 +12,7 @@
* function names - this makes it really hard to read and does not provide much
* benefit, at least I (now) think so...
*
- * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -83,11 +83,49 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
+static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr);
+static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
+static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
+static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
+static rsRetVal qDestructDisk(qqueue_t *pThis);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
#define QUEUE_NO_CHECKPOINT 0
+/* tables for interfacing with the v6 config system */
+static struct cnfparamdescr cnfpdescr[] = {
+ { "queue.filename", eCmdHdlrGetWord, 0 },
+ { "queue.size", eCmdHdlrSize, 0 },
+ { "queue.dequeuebatchsize", eCmdHdlrInt, 0 },
+ { "queue.maxdiskspace", eCmdHdlrSize, 0 },
+ { "queue.highwatermark", eCmdHdlrInt, 0 },
+ { "queue.lowwatermark", eCmdHdlrInt, 0 },
+ { "queue.fulldelaymark", eCmdHdlrInt, 0 },
+ { "queue.lightdelaymark", eCmdHdlrInt, 0 },
+ { "queue.discardmark", eCmdHdlrInt, 0 },
+ { "queue.discardseverity", eCmdHdlrFacility, 0 },
+ { "queue.checkpointinterval", eCmdHdlrInt, 0 },
+ { "queue.syncqueuefiles", eCmdHdlrBinary, 0 },
+ { "queue.type", eCmdHdlrQueueType, 0 },
+ { "queue.workerthreads", eCmdHdlrInt, 0 },
+ { "queue.timeoutshutdown", eCmdHdlrInt, 0 },
+ { "queue.timeoutactioncompletion", eCmdHdlrInt, 0 },
+ { "queue.timeoutenqueue", eCmdHdlrInt, 0 },
+ { "queue.timeoutworkerthreadshutdown", eCmdHdlrInt, 0 },
+ { "queue.workerthreadminimummessages", eCmdHdlrInt, 0 },
+ { "queue.maxfilesize", eCmdHdlrSize, 0 },
+ { "queue.saveonshutdown", eCmdHdlrBinary, 0 },
+ { "queue.dequeueslowdown", eCmdHdlrInt, 0 },
+ { "queue.dequeuetimebegin", eCmdHdlrInt, 0 },
+ { "queue.dequeuetimeend", eCmdHdlrInt, 0 },
+};
+static struct cnfparamblk pblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(cnfpdescr)/sizeof(struct cnfparamdescr),
+ cnfpdescr
+ };
+
/* debug aid */
static void displayBatchState(batch_t *pBatch)
{
@@ -186,6 +224,55 @@ finalize_it:
/* methods */
+static inline char *
+getQueueTypeName(queueType_t t)
+{
+ char *r;
+
+ switch(t) {
+ case QUEUETYPE_FIXED_ARRAY:
+ r = "FixedArray";
+ case QUEUETYPE_LINKEDLIST:
+ r = "LinkedList";
+ case QUEUETYPE_DISK:
+ r = "Disk";
+ case QUEUETYPE_DIRECT:
+ r = "Direct";
+ }
+ return r;
+}
+
+void
+qqueueDbgPrint(qqueue_t *pThis)
+{
+ dbgoprint((obj_t*) pThis, "parameter dump:\n");
+ dbgoprint((obj_t*) pThis, "queue.filename '%s'\n",
+ (pThis->pszFilePrefix == NULL) ? "[NONE]" : (char*)pThis->pszFilePrefix);
+ dbgoprint((obj_t*) pThis, "queue.size: %d\n", pThis->iMaxQueueSize);
+ dbgoprint((obj_t*) pThis, "queue.dequeuebatchsize: %d\n", pThis->iDeqBatchSize);
+ dbgoprint((obj_t*) pThis, "queue.maxdiskspace: %lld\n", pThis->iMaxFileSize);
+ dbgoprint((obj_t*) pThis, "queue.highwatermark: %d\n", pThis->iHighWtrMrk);
+ dbgoprint((obj_t*) pThis, "queue.lowwatermark: %d\n", pThis->iLowWtrMrk);
+ dbgoprint((obj_t*) pThis, "queue.fulldelaymark: %d\n", pThis->iFullDlyMrk);
+ dbgoprint((obj_t*) pThis, "queue.lightdelaymark: %d\n", pThis->iLightDlyMrk);
+ dbgoprint((obj_t*) pThis, "queue.discardmark: %d\n", pThis->iDiscardMrk);
+ dbgoprint((obj_t*) pThis, "queue.discardseverity: %d\n", pThis->iDiscardSeverity);
+ dbgoprint((obj_t*) pThis, "queue.checkpointinterval: %d\n", pThis->iPersistUpdCnt);
+ dbgoprint((obj_t*) pThis, "queue.syncqueuefiles: %d\n", pThis->bSyncQueueFiles);
+ dbgoprint((obj_t*) pThis, "queue.type: %d [%s]\n", pThis->qType, getQueueTypeName(pThis->qType));
+ dbgoprint((obj_t*) pThis, "queue.workerthreads: %d\n", pThis->iNumWorkerThreads);
+ dbgoprint((obj_t*) pThis, "queue.timeoutshutdown: %d\n", pThis->toQShutdown);
+ dbgoprint((obj_t*) pThis, "queue.timeoutactioncompletion: %d\n", pThis->toActShutdown);
+ dbgoprint((obj_t*) pThis, "queue.timeoutenqueue: %d\n", pThis->toEnq);
+ dbgoprint((obj_t*) pThis, "queue.timeoutworkerthreadshutdown: %d\n", pThis->toWrkShutdown);
+ dbgoprint((obj_t*) pThis, "queue.workerthreadminimummessages: %d\n", pThis->iMinMsgsPerWrkr);
+ dbgoprint((obj_t*) pThis, "queue.maxfilesize: %lld\n", pThis->iMaxFileSize);
+ dbgoprint((obj_t*) pThis, "queue.saveonshutdown: %d\n", pThis->bSaveOnShutdown);
+ dbgoprint((obj_t*) pThis, "queue.dequeueslowdown: %d\n", pThis->iDeqSlowdown);
+ dbgoprint((obj_t*) pThis, "queue.dequeuetimebegin: %d\n", pThis->iDeqtWinFromHr);
+ dbgoprint((obj_t*) pThis, "queuedequeuetimend.: %d\n", pThis->iDeqtWinToHr);
+}
+
/* get the physical queue size. Must only be called
* while mutex is locked!
@@ -592,6 +679,47 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis)
/* -------------------- disk -------------------- */
+/* The following function is used to "save" ourself from being killed by
+ * a fatally failed disk queue. A fatal failure is, for example, if no
+ * data can be read or written. In that case, the disk support is disabled,
+ * with all on-disk structures kept as-is as much as possible. Instead, the
+ * queue is switched to direct mode, so that at least
+ * some processing can happen. Of course, this may still have lots of
+ * undesired side-effects, but is probably better than aborting the
+ * syslogd. Note that this function *must* succeed in one way or another, as
+ * we can not recover from failure here. But it may emit different return
+ * states, which can trigger different processing in the higher layers.
+ * rgerhards, 2011-05-03
+ */
+static inline rsRetVal
+queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError)
+{
+ pThis->iQueueSize = 0;
+ pThis->nLogDeq = 0;
+ qDestructDisk(pThis); /* free disk structures */
+
+ pThis->qType = QUEUETYPE_DIRECT;
+ pThis->qConstruct = qConstructDirect;
+ pThis->qDestruct = qDestructDirect;
+ pThis->qAdd = qAddDirect;
+ pThis->qDel = qDelDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ if(pThis->pqParent != NULL) {
+ DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n");
+ pThis->pqParent->bIsDA = 0;
+ pThis->pqParent->pqDA = NULL;
+ /* This may have undesired side effects, not sure if I really evaluated
+ * all. So you know where to look at if you come to this point during
+ * troubleshooting ;) -- rgerhards, 2011-05-03
+ */
+ }
+
+ errmsg.LogError(0, initiatingError, "fatal error on disk queue '%s', emergency switch to direct mode",
+ obj.GetName((obj_t*) pThis));
+ return RS_RET_ERR_QUEUE_EMERGENCY;
+}
+
+
static rsRetVal
qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pThis)
{
@@ -794,10 +922,7 @@ finalize_it:
static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr)
{
DEFiRet;
-
- CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL));
-
-finalize_it:
+ iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL);
RETiRet;
}
@@ -1224,8 +1349,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
/* set some water marks so that we have useful defaults if none are set specifically */
- pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */
- pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */
+ pThis->iFullDlyMrk = -1;
+ pThis->iLightDlyMrk = -1;
pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir);
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
@@ -1239,42 +1364,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
- /* set type-specific handlers and other very type-specific things (we can not totally hide it...) */
- switch(qType) {
- case QUEUETYPE_FIXED_ARRAY:
- pThis->qConstruct = qConstructFixedArray;
- pThis->qDestruct = qDestructFixedArray;
- pThis->qAdd = qAddFixedArray;
- pThis->qDeq = qDeqFixedArray;
- pThis->qDel = qDelFixedArray;
- pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
- break;
- case QUEUETYPE_LINKEDLIST:
- pThis->qConstruct = qConstructLinkedList;
- pThis->qDestruct = qDestructLinkedList;
- pThis->qAdd = qAddLinkedList;
- pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
- pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
- pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
- break;
- case QUEUETYPE_DISK:
- pThis->qConstruct = qConstructDisk;
- pThis->qDestruct = qDestructDisk;
- pThis->qAdd = qAddDisk;
- pThis->qDeq = qDeqDisk;
- pThis->qDel = qDelDisk;
- pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
- /* special handling */
- pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
- break;
- case QUEUETYPE_DIRECT:
- pThis->qConstruct = qConstructDirect;
- pThis->qDestruct = qDestructDirect;
- pThis->qAdd = qAddDirect;
- pThis->qDel = qDelDirect;
- pThis->MultiEnq = qqueueMultiEnqObjDirect;
- break;
- }
INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
@@ -1285,6 +1374,40 @@ finalize_it:
}
+/* set default inisde queue object suitable for action queues.
+ * This shall be called directly after queue construction. This functions has
+ * been added in support of the new v6 config system. It expect properly pre-initialized
+ * objects, but we need to differentiate between ruleset main and action queues.
+ * In order to avoid unnecessary complexity, we provide the necessary defaults
+ * via specific function calls.
+ */
+void
+qqueueSetDefaultsActionQueue(qqueue_t *pThis)
+{
+ pThis->qType = QUEUETYPE_DIRECT; /* type of the main message queue above */
+ pThis->iMaxQueueSize = 1000; /* size of the main message queue above */
+ pThis->iDeqBatchSize = 128; /* default batch size */
+ pThis->iHighWtrMrk = 800; /* high water mark for disk-assisted queues */
+ pThis->iLowWtrMrk = 200; /* low water mark for disk-assisted queues */
+ pThis->iDiscardMrk = 9800; /* begin to discard messages */
+ pThis->iDiscardSeverity = 8; /* turn off */
+ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
+ pThis->iMaxFileSize = 1024*1024;
+ pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */
+ pThis->bSyncQueueFiles = 0;
+ pThis->toQShutdown = 0; /* queue shutdown */
+ pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */
+ pThis->toEnq = 2000; /* timeout for queue enque */
+ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */
+ pThis->iMinMsgsPerWrkr = 100; /* minimum messages per worker needed to start a new one */
+ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+ pThis->sizeOnDiskMax = 0; /* unlimited */
+ pThis->iDeqSlowdown = 0;
+ pThis->iDeqtWinFromHr = 0;
+ pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
+}
+
+
/* This function checks if the provided message shall be discarded and does so, if needed.
* In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
* provide real-time creation of spool files.
@@ -1312,6 +1435,7 @@ static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr)
if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n",
iQueueSize, iSeverity);
+ STATSCOUNTER_INC(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd);
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
@@ -1693,7 +1817,18 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueForConsumer(pThis, pWti));
+ iRet = DequeueForConsumer(pThis, pWti);
+ if(iRet == RS_RET_FILE_NOT_FOUND) {
+ /* This is a fatal condition and means the queue is almost unusable */
+ d_pthread_mutex_unlock(pThis->mut);
+ DBGOPRINT((obj_t*) pThis, "got 'file not found' error %d, queue defunct\n", iRet);
+ iRet = queueSwitchToEmergencyMode(pThis, iRet);
+ // TODO: think about what to return as iRet -- keep RS_RET_FILE_NOT_FOUND?
+ d_pthread_mutex_lock(pThis->mut);
+ }
+ if (iRet != RS_RET_OK) {
+ FINALIZE;
+ }
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
d_pthread_mutex_unlock(pThis->mut);
@@ -1852,6 +1987,52 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
ASSERT(pThis != NULL);
+ /* set type-specific handlers and other very type-specific things
+ * (we can not totally hide it...)
+ */
+ switch(pThis->qType) {
+ case QUEUETYPE_FIXED_ARRAY:
+ pThis->qConstruct = qConstructFixedArray;
+ pThis->qDestruct = qDestructFixedArray;
+ pThis->qAdd = qAddFixedArray;
+ pThis->qDeq = qDeqFixedArray;
+ pThis->qDel = qDelFixedArray;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
+ break;
+ case QUEUETYPE_LINKEDLIST:
+ pThis->qConstruct = qConstructLinkedList;
+ pThis->qDestruct = qDestructLinkedList;
+ pThis->qAdd = qAddLinkedList;
+ pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
+ pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
+ break;
+ case QUEUETYPE_DISK:
+ pThis->qConstruct = qConstructDisk;
+ pThis->qDestruct = qDestructDisk;
+ pThis->qAdd = qAddDisk;
+ pThis->qDeq = qDeqDisk;
+ pThis->qDel = qDelDisk;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
+ /* special handling */
+ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
+ break;
+ case QUEUETYPE_DIRECT:
+ pThis->qConstruct = qConstructDirect;
+ pThis->qDestruct = qDestructDirect;
+ pThis->qAdd = qAddDirect;
+ pThis->qDel = qDelDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ break;
+ }
+
+ if(pThis->iFullDlyMrk == -1)
+ pThis->iFullDlyMrk = pThis->iMaxQueueSize
+ - (pThis->iMaxQueueSize / 100) * 3; /* default 97% */
+ if(pThis->iLightDlyMrk == -1)
+ pThis->iLightDlyMrk = pThis->iMaxQueueSize
+ - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */
+
/* we need to do a quick check if our water marks are set plausible. If not,
* we correct the most important shortcomings. TODO: do that!!!! -- rgerhards, 2008-03-14
*/
@@ -1943,6 +2124,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("full"),
ctrType_IntCtr, &pThis->ctrFull));
+ STATSCOUNTER_INIT(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.full"),
+ ctrType_IntCtr, &pThis->ctrFDscrd));
+ STATSCOUNTER_INIT(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd);
+ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.nf"),
+ ctrType_IntCtr, &pThis->ctrNFDscrd));
+
pThis->ctrMaxqsize = 0; /* no mutex needed, thus no init call */
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("maxqsize"),
ctrType_Int, &pThis->ctrMaxqsize));
@@ -2328,6 +2516,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull);
if(pThis->toEnq == 0 || pThis->bEnqOnly) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n");
+ STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
@@ -2339,6 +2528,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
timeoutComp(&t, pThis->toEnq);
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
+ STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
@@ -2466,6 +2656,90 @@ finalize_it:
}
+/* take v6 config list and extract the queue params out of it. Hand the
+ * param values back to the caller. Caller is responsible for destructing
+ * them when no longer needed. Caller can use this param block to configure
+ * all parameters for a newly created queue with one call to qqueueSetParams().
+ * rgerhards, 2011-07-22
+ */
+rsRetVal
+qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals)
+{
+ *ppvals = nvlstGetParams(lst, &pblk, NULL);
+ return RS_RET_OK;
+}
+
+/* apply all params from param block to queue. Must be called before
+ * finalizing. This supports the v6 config system. Defaults were already
+ * set during queue creation. The pvals object is destructed by this
+ * function.
+ */
+rsRetVal
+qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals)
+{
+ int i;
+ for(i = 0 ; i < pblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(pblk.descr[i].name, "queue.filename")) {
+ pThis->pszFilePrefix = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
+ pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr);
+ } else if(!strcmp(pblk.descr[i].name, "queue.size")) {
+ pThis->iMaxQueueSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) {
+ pThis->iDeqBatchSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.maxdiskspace")) {
+ pThis->iMaxFileSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.highwatermark")) {
+ pThis->iHighWtrMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.lowwatermark")) {
+ pThis->iLowWtrMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.fulldelaymark")) {
+ pThis->iFullDlyMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.lightdelaymark")) {
+ pThis->iLightDlyMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.discardmark")) {
+ pThis->iDiscardMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.discardseverity")) {
+ pThis->iDiscardSeverity = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.checkpointinterval")) {
+ pThis->iPersistUpdCnt = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.syncqueuefiles")) {
+ pThis->bSyncQueueFiles = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.type")) {
+ pThis->qType = (queueType_t) pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.workerthreads")) {
+ pThis->iNumWorkerThreads = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutshutdown")) {
+ pThis->toQShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutactioncompletion")) {
+ pThis->toActShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutenqueue")) {
+ pThis->toEnq = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutworkerthreadshutdown")) {
+ pThis->toWrkShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.workerthreadminimummessages")) {
+ pThis->iMinMsgsPerWrkr = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.maxfilesize")) {
+ pThis->iMaxFileSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.saveonshutdown")) {
+ pThis->bSaveOnShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.dequeueslowdown")) {
+ pThis->iDeqSlowdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.dequeuetimebegin")) {
+ pThis->iDeqtWinFromHr = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queuedequeuetimend.")) {
+ pThis->iDeqtWinToHr = pvals[i].val.d.n;
+ } else {
+ dbgprintf("queue: program error, non-handled "
+ "param '%s'\n", pblk.descr[i].name);
+ }
+ }
+ cnfparamvalsDestruct(pvals, &pblk);
+ return RS_RET_OK;
+}
+
+
/* some simple object access methods */
DEFpropSetMeth(qqueue, bSyncQueueFiles, int)
DEFpropSetMeth(qqueue, iPersistUpdCnt, int)
@@ -2480,6 +2754,7 @@ DEFpropSetMeth(qqueue, iLowWtrMrk, int)
DEFpropSetMeth(qqueue, iDiscardMrk, int)
DEFpropSetMeth(qqueue, iFullDlyMrk, int)
DEFpropSetMeth(qqueue, iDiscardSeverity, int)
+DEFpropSetMeth(qqueue, iLightDlyMrk, int)
DEFpropSetMeth(qqueue, bIsDA, int)
DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
DEFpropSetMeth(qqueue, bSaveOnShutdown, int)