summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c237
1 files changed, 199 insertions, 38 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 56d05571..a1ac3eff 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -93,6 +93,39 @@ static rsRetVal qDestructDisk(qqueue_t *pThis);
#define QUEUE_CHECKPOINT 1
#define QUEUE_NO_CHECKPOINT 0
+/* tables for interfacing with the v6 config system */
+static struct cnfparamdescr cnfpdescr[] = {
+ { "queue.filename", eCmdHdlrGetWord, 0 },
+ { "queue.size", eCmdHdlrSize, 0 },
+ { "queue.dequeuebatchsize", eCmdHdlrInt, 0 },
+ { "queue.maxdiskspace", eCmdHdlrSize, 0 },
+ { "queue.highwatermark", eCmdHdlrInt, 0 },
+ { "queue.lowwatermark", eCmdHdlrInt, 0 },
+ { "queue.fulldelaymark", eCmdHdlrInt, 0 },
+ { "queue.lightdelaymark", eCmdHdlrInt, 0 },
+ { "queue.discardmark", eCmdHdlrInt, 0 },
+ { "queue.discardseverity", eCmdHdlrFacility, 0 },
+ { "queue.checkpointinterval", eCmdHdlrInt, 0 },
+ { "queue.syncqueuefiles", eCmdHdlrBinary, 0 },
+ { "queue.type", eCmdHdlrQueueType, 0 },
+ { "queue.workerthreads", eCmdHdlrInt, 0 },
+ { "queue.timeoutshutdown", eCmdHdlrInt, 0 },
+ { "queue.timeoutactioncompletion", eCmdHdlrInt, 0 },
+ { "queue.timeoutenqueue", eCmdHdlrInt, 0 },
+ { "queue.timeoutworkerthreadshutdown", eCmdHdlrInt, 0 },
+ { "queue.workerthreadminimummessages", eCmdHdlrInt, 0 },
+ { "queue.maxfilesize", eCmdHdlrSize, 0 },
+ { "queue.saveonshutdown", eCmdHdlrBinary, 0 },
+ { "queue.dequeueslowdown", eCmdHdlrInt, 0 },
+ { "queue.dequeuetimebegin", eCmdHdlrInt, 0 },
+ { "queue.dequeuetimeend", eCmdHdlrInt, 0 },
+};
+static struct cnfparamblk pblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(cnfpdescr)/sizeof(struct cnfparamdescr),
+ cnfpdescr
+ };
+
/* debug aid */
static void displayBatchState(batch_t *pBatch)
{
@@ -1267,8 +1300,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
/* set some water marks so that we have useful defaults if none are set specifically */
- pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */
- pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */
+ pThis->iFullDlyMrk = -1;
+ pThis->iLightDlyMrk = -1;
pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir);
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
@@ -1282,42 +1315,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
- /* set type-specific handlers and other very type-specific things (we can not totally hide it...) */
- switch(qType) {
- case QUEUETYPE_FIXED_ARRAY:
- pThis->qConstruct = qConstructFixedArray;
- pThis->qDestruct = qDestructFixedArray;
- pThis->qAdd = qAddFixedArray;
- pThis->qDeq = qDeqFixedArray;
- pThis->qDel = qDelFixedArray;
- pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
- break;
- case QUEUETYPE_LINKEDLIST:
- pThis->qConstruct = qConstructLinkedList;
- pThis->qDestruct = qDestructLinkedList;
- pThis->qAdd = qAddLinkedList;
- pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
- pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
- pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
- break;
- case QUEUETYPE_DISK:
- pThis->qConstruct = qConstructDisk;
- pThis->qDestruct = qDestructDisk;
- pThis->qAdd = qAddDisk;
- pThis->qDeq = qDeqDisk;
- pThis->qDel = qDelDisk;
- pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
- /* special handling */
- pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
- break;
- case QUEUETYPE_DIRECT:
- pThis->qConstruct = qConstructDirect;
- pThis->qDestruct = qDestructDirect;
- pThis->qAdd = qAddDirect;
- pThis->qDel = qDelDirect;
- pThis->MultiEnq = qqueueMultiEnqObjDirect;
- break;
- }
INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
@@ -1328,6 +1325,40 @@ finalize_it:
}
+/* set default inisde queue object suitable for action queues.
+ * This shall be called directly after queue construction. This functions has
+ * been added in support of the new v6 config system. It expect properly pre-initialized
+ * objects, but we need to differentiate between ruleset main and action queues.
+ * In order to avoid unnecessary complexity, we provide the necessary defaults
+ * via specific function calls.
+ */
+void
+qqueueSetDefaultsActionQueue(qqueue_t *pThis)
+{
+ pThis->qType = QUEUETYPE_DIRECT; /* type of the main message queue above */
+ pThis->iMaxQueueSize = 1000; /* size of the main message queue above */
+ pThis->iDeqBatchSize = 128; /* default batch size */
+ pThis->iHighWtrMrk = 800; /* high water mark for disk-assisted queues */
+ pThis->iLowWtrMrk = 200; /* low water mark for disk-assisted queues */
+ pThis->iDiscardMrk = 9800; /* begin to discard messages */
+ pThis->iDiscardSeverity = 8; /* turn off */
+ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
+ pThis->iMaxFileSize = 1024*1024;
+ pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */
+ pThis->bSyncQueueFiles = 0;
+ pThis->toQShutdown = 0; /* queue shutdown */
+ pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */
+ pThis->toEnq = 2000; /* timeout for queue enque */
+ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */
+ pThis->iMinMsgsPerWrkr = 100; /* minimum messages per worker needed to start a new one */
+ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+ pThis->sizeOnDiskMax = 0; /* unlimited */
+ pThis->iDeqSlowdown = 0;
+ pThis->iDeqtWinFromHr = 0;
+ pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
+}
+
+
/* This function checks if the provided message shall be discarded and does so, if needed.
* In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
* provide real-time creation of spool files.
@@ -1901,6 +1932,52 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
ASSERT(pThis != NULL);
+ /* set type-specific handlers and other very type-specific things
+ * (we can not totally hide it...)
+ */
+ switch(pThis->qType) {
+ case QUEUETYPE_FIXED_ARRAY:
+ pThis->qConstruct = qConstructFixedArray;
+ pThis->qDestruct = qDestructFixedArray;
+ pThis->qAdd = qAddFixedArray;
+ pThis->qDeq = qDeqFixedArray;
+ pThis->qDel = qDelFixedArray;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
+ break;
+ case QUEUETYPE_LINKEDLIST:
+ pThis->qConstruct = qConstructLinkedList;
+ pThis->qDestruct = qDestructLinkedList;
+ pThis->qAdd = qAddLinkedList;
+ pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
+ pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
+ break;
+ case QUEUETYPE_DISK:
+ pThis->qConstruct = qConstructDisk;
+ pThis->qDestruct = qDestructDisk;
+ pThis->qAdd = qAddDisk;
+ pThis->qDeq = qDeqDisk;
+ pThis->qDel = qDelDisk;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
+ /* special handling */
+ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
+ break;
+ case QUEUETYPE_DIRECT:
+ pThis->qConstruct = qConstructDirect;
+ pThis->qDestruct = qDestructDirect;
+ pThis->qAdd = qAddDirect;
+ pThis->qDel = qDelDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ break;
+ }
+
+ if(pThis->iFullDlyMrk == -1)
+ pThis->iFullDlyMrk = pThis->iMaxQueueSize
+ - (pThis->iMaxQueueSize / 100) * 3; /* default 97% */
+ if(pThis->iLightDlyMrk == -1)
+ pThis->iLightDlyMrk = pThis->iMaxQueueSize
+ - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */
+
/* we need to do a quick check if our water marks are set plausible. If not,
* we correct the most important shortcomings. TODO: do that!!!! -- rgerhards, 2008-03-14
*/
@@ -2477,6 +2554,90 @@ finalize_it:
}
+/* take v6 config list and extract the queue params out of it. Hand the
+ * param values back to the caller. Caller is responsible for destructing
+ * them when no longer needed. Caller can use this param block to configure
+ * all parameters for a newly created queue with one call to qqueueSetParams().
+ * rgerhards, 2011-07-22
+ */
+rsRetVal
+qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals)
+{
+ *ppvals = nvlstGetParams(lst, &pblk, NULL);
+ return RS_RET_OK;
+}
+
+/* apply all params from param block to queue. Must be called before
+ * finalizing. This supports the v6 config system. Defaults were already
+ * set during queue creation. The pvals object is destructed by this
+ * function.
+ */
+rsRetVal
+qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals)
+{
+ int i;
+ for(i = 0 ; i < pblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(pblk.descr[i].name, "queue.filename")) {
+ pThis->pszFilePrefix = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
+ pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr);
+ } else if(!strcmp(pblk.descr[i].name, "queue.size")) {
+ pThis->iMaxQueueSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) {
+ pThis->iDeqBatchSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.maxdiskspace")) {
+ pThis->iMaxFileSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.highwatermark")) {
+ pThis->iHighWtrMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.lowwatermark")) {
+ pThis->iLowWtrMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.fulldelaymark")) {
+ pThis->iFullDlyMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.lightdelaymark")) {
+ pThis->iLightDlyMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.discardmark")) {
+ pThis->iDiscardMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.discardseverity")) {
+ pThis->iDiscardSeverity = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.checkpointinterval")) {
+ pThis->iPersistUpdCnt = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.syncqueuefiles")) {
+ pThis->bSyncQueueFiles = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.type")) {
+ pThis->qType = (queueType_t) pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.workerthreads")) {
+ pThis->iNumWorkerThreads = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutshutdown")) {
+ pThis->toQShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutactioncompletion")) {
+ pThis->toActShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutenqueue")) {
+ pThis->toEnq = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutworkerthreadshutdown")) {
+ pThis->toWrkShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.workerthreadminimummessages")) {
+ pThis->iMinMsgsPerWrkr = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.maxfilesize")) {
+ pThis->iMaxFileSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.saveonshutdown")) {
+ pThis->bSaveOnShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.dequeueslowdown")) {
+ pThis->iDeqSlowdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.dequeuetimebegin")) {
+ pThis->iDeqtWinFromHr = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queuedequeuetimend.")) {
+ pThis->iDeqtWinToHr = pvals[i].val.d.n;
+ } else {
+ dbgprintf("queue: program error, non-handled "
+ "param '%s'\n", pblk.descr[i].name);
+ }
+ }
+ cnfparamvalsDestruct(pvals, &pblk);
+ return RS_RET_OK;
+}
+
+
/* some simple object access methods */
DEFpropSetMeth(qqueue, bSyncQueueFiles, int)
DEFpropSetMeth(qqueue, iPersistUpdCnt, int)