diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 237 |
1 files changed, 199 insertions, 38 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 94cc1fcd..a2f80d29 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -93,6 +93,39 @@ static rsRetVal qDestructDisk(qqueue_t *pThis); #define QUEUE_CHECKPOINT 1 #define QUEUE_NO_CHECKPOINT 0 +/* tables for interfacing with the v6 config system */ +static struct cnfparamdescr cnfpdescr[] = { + { "queue.filename", eCmdHdlrGetWord, 0 }, + { "queue.size", eCmdHdlrSize, 0 }, + { "queue.dequeuebatchsize", eCmdHdlrInt, 0 }, + { "queue.maxdiskspace", eCmdHdlrSize, 0 }, + { "queue.highwatermark", eCmdHdlrInt, 0 }, + { "queue.lowwatermark", eCmdHdlrInt, 0 }, + { "queue.fulldelaymark", eCmdHdlrInt, 0 }, + { "queue.lightdelaymark", eCmdHdlrInt, 0 }, + { "queue.discardmark", eCmdHdlrInt, 0 }, + { "queue.discardseverity", eCmdHdlrFacility, 0 }, + { "queue.checkpointinterval", eCmdHdlrInt, 0 }, + { "queue.syncqueuefiles", eCmdHdlrBinary, 0 }, + { "queue.type", eCmdHdlrQueueType, 0 }, + { "queue.workerthreads", eCmdHdlrInt, 0 }, + { "queue.timeoutshutdown", eCmdHdlrInt, 0 }, + { "queue.timeoutactioncompletion", eCmdHdlrInt, 0 }, + { "queue.timeoutenqueue", eCmdHdlrInt, 0 }, + { "queue.timeoutworkerthreadshutdown", eCmdHdlrInt, 0 }, + { "queue.workerthreadminimummessages", eCmdHdlrInt, 0 }, + { "queue.maxfilesize", eCmdHdlrSize, 0 }, + { "queue.saveonshutdown", eCmdHdlrBinary, 0 }, + { "queue.dequeueslowdown", eCmdHdlrInt, 0 }, + { "queue.dequeuetimebegin", eCmdHdlrInt, 0 }, + { "queue.dequeuetimeend", eCmdHdlrInt, 0 }, +}; +static struct cnfparamblk pblk = + { CNFPARAMBLK_VERSION, + sizeof(cnfpdescr)/sizeof(struct cnfparamdescr), + cnfpdescr + }; + /* debug aid */ static void displayBatchState(batch_t *pBatch) { @@ -1267,8 +1300,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); /* set some water marks so that we have useful defaults if none are set specifically */ - pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */ - pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */ + pThis->iFullDlyMrk = -1; + pThis->iLightDlyMrk = -1; pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir); pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->iQueueSize = 0; @@ -1282,42 +1315,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->pszFilePrefix = NULL; pThis->qType = qType; - /* set type-specific handlers and other very type-specific things (we can not totally hide it...) */ - switch(qType) { - case QUEUETYPE_FIXED_ARRAY: - pThis->qConstruct = qConstructFixedArray; - pThis->qDestruct = qDestructFixedArray; - pThis->qAdd = qAddFixedArray; - pThis->qDeq = qDeqFixedArray; - pThis->qDel = qDelFixedArray; - pThis->MultiEnq = qqueueMultiEnqObjNonDirect; - break; - case QUEUETYPE_LINKEDLIST: - pThis->qConstruct = qConstructLinkedList; - pThis->qDestruct = qDestructLinkedList; - pThis->qAdd = qAddLinkedList; - pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; - pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; - pThis->MultiEnq = qqueueMultiEnqObjNonDirect; - break; - case QUEUETYPE_DISK: - pThis->qConstruct = qConstructDisk; - pThis->qDestruct = qDestructDisk; - pThis->qAdd = qAddDisk; - pThis->qDeq = qDeqDisk; - pThis->qDel = qDelDisk; - pThis->MultiEnq = qqueueMultiEnqObjNonDirect; - /* special handling */ - pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ - break; - case QUEUETYPE_DIRECT: - pThis->qConstruct = qConstructDirect; - pThis->qDestruct = qDestructDirect; - pThis->qAdd = qAddDirect; - pThis->qDel = qDelDirect; - pThis->MultiEnq = qqueueMultiEnqObjDirect; - break; - } INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize); INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq); @@ -1328,6 +1325,40 @@ finalize_it: } +/* set default inisde queue object suitable for action queues. + * This shall be called directly after queue construction. This functions has + * been added in support of the new v6 config system. It expect properly pre-initialized + * objects, but we need to differentiate between ruleset main and action queues. + * In order to avoid unnecessary complexity, we provide the necessary defaults + * via specific function calls. + */ +void +qqueueSetDefaultsActionQueue(qqueue_t *pThis) +{ + pThis->qType = QUEUETYPE_DIRECT; /* type of the main message queue above */ + pThis->iMaxQueueSize = 1000; /* size of the main message queue above */ + pThis->iDeqBatchSize = 128; /* default batch size */ + pThis->iHighWtrMrk = 800; /* high water mark for disk-assisted queues */ + pThis->iLowWtrMrk = 200; /* low water mark for disk-assisted queues */ + pThis->iDiscardMrk = 9800; /* begin to discard messages */ + pThis->iDiscardSeverity = 8; /* turn off */ + pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ + pThis->iMaxFileSize = 1024*1024; + pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */ + pThis->bSyncQueueFiles = 0; + pThis->toQShutdown = 0; /* queue shutdown */ + pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */ + pThis->toEnq = 2000; /* timeout for queue enque */ + pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */ + pThis->iMinMsgsPerWrkr = 100; /* minimum messages per worker needed to start a new one */ + pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ + pThis->sizeOnDiskMax = 0; /* unlimited */ + pThis->iDeqSlowdown = 0; + pThis->iDeqtWinFromHr = 0; + pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ +} + + /* This function checks if the provided message shall be discarded and does so, if needed. * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to * provide real-time creation of spool files. @@ -1905,6 +1936,52 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ ASSERT(pThis != NULL); + /* set type-specific handlers and other very type-specific things + * (we can not totally hide it...) + */ + switch(pThis->qType) { + case QUEUETYPE_FIXED_ARRAY: + pThis->qConstruct = qConstructFixedArray; + pThis->qDestruct = qDestructFixedArray; + pThis->qAdd = qAddFixedArray; + pThis->qDeq = qDeqFixedArray; + pThis->qDel = qDelFixedArray; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; + break; + case QUEUETYPE_LINKEDLIST: + pThis->qConstruct = qConstructLinkedList; + pThis->qDestruct = qDestructLinkedList; + pThis->qAdd = qAddLinkedList; + pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; + pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; + break; + case QUEUETYPE_DISK: + pThis->qConstruct = qConstructDisk; + pThis->qDestruct = qDestructDisk; + pThis->qAdd = qAddDisk; + pThis->qDeq = qDeqDisk; + pThis->qDel = qDelDisk; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; + /* special handling */ + pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ + break; + case QUEUETYPE_DIRECT: + pThis->qConstruct = qConstructDirect; + pThis->qDestruct = qDestructDirect; + pThis->qAdd = qAddDirect; + pThis->qDel = qDelDirect; + pThis->MultiEnq = qqueueMultiEnqObjDirect; + break; + } + + if(pThis->iFullDlyMrk == -1) + pThis->iFullDlyMrk = pThis->iMaxQueueSize + - (pThis->iMaxQueueSize / 100) * 3; /* default 97% */ + if(pThis->iLightDlyMrk == -1) + pThis->iLightDlyMrk = pThis->iMaxQueueSize + - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */ + /* we need to do a quick check if our water marks are set plausible. If not, * we correct the most important shortcomings. TODO: do that!!!! -- rgerhards, 2008-03-14 */ @@ -2528,6 +2605,90 @@ finalize_it: } +/* take v6 config list and extract the queue params out of it. Hand the + * param values back to the caller. Caller is responsible for destructing + * them when no longer needed. Caller can use this param block to configure + * all parameters for a newly created queue with one call to qqueueSetParams(). + * rgerhards, 2011-07-22 + */ +rsRetVal +qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals) +{ + *ppvals = nvlstGetParams(lst, &pblk, NULL); + return RS_RET_OK; +} + +/* apply all params from param block to queue. Must be called before + * finalizing. This supports the v6 config system. Defaults were already + * set during queue creation. The pvals object is destructed by this + * function. + */ +rsRetVal +qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals) +{ + int i; + for(i = 0 ; i < pblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(pblk.descr[i].name, "queue.filename")) { + pThis->pszFilePrefix = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); + pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr); + } else if(!strcmp(pblk.descr[i].name, "queue.size")) { + pThis->iMaxQueueSize = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) { + pThis->iDeqBatchSize = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.maxdiskspace")) { + pThis->iMaxFileSize = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.highwatermark")) { + pThis->iHighWtrMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.lowwatermark")) { + pThis->iLowWtrMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.fulldelaymark")) { + pThis->iFullDlyMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.lightdelaymark")) { + pThis->iLightDlyMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.discardmark")) { + pThis->iDiscardMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.discardseverity")) { + pThis->iDiscardSeverity = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.checkpointinterval")) { + pThis->iPersistUpdCnt = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.syncqueuefiles")) { + pThis->bSyncQueueFiles = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.type")) { + pThis->qType = (queueType_t) pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.workerthreads")) { + pThis->iNumWorkerThreads = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.timeoutshutdown")) { + pThis->toQShutdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.timeoutactioncompletion")) { + pThis->toActShutdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.timeoutenqueue")) { + pThis->toEnq = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.timeoutworkerthreadshutdown")) { + pThis->toWrkShutdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.workerthreadminimummessages")) { + pThis->iMinMsgsPerWrkr = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.maxfilesize")) { + pThis->iMaxFileSize = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.saveonshutdown")) { + pThis->bSaveOnShutdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.dequeueslowdown")) { + pThis->iDeqSlowdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.dequeuetimebegin")) { + pThis->iDeqtWinFromHr = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queuedequeuetimend.")) { + pThis->iDeqtWinToHr = pvals[i].val.d.n; + } else { + dbgprintf("queue: program error, non-handled " + "param '%s'\n", pblk.descr[i].name); + } + } + cnfparamvalsDestruct(pvals, &pblk); + return RS_RET_OK; +} + + /* some simple object access methods */ DEFpropSetMeth(qqueue, bSyncQueueFiles, int) DEFpropSetMeth(qqueue, iPersistUpdCnt, int) |