diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2011-07-22 18:03:43 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2011-07-22 18:03:43 +0200 |
commit | 9757aeb56445eee3aca2b43e6b3efa1f1cb59ba3 (patch) | |
tree | b44dad3ab3f3477c4b6c45b615b60f5e51f58e35 | |
parent | 6b8b7ba0091a4e59b9a45057756fc7f754576242 (diff) | |
download | rsyslog-9757aeb56445eee3aca2b43e6b3efa1f1cb59ba3.tar.gz rsyslog-9757aeb56445eee3aca2b43e6b3efa1f1cb59ba3.tar.xz rsyslog-9757aeb56445eee3aca2b43e6b3efa1f1cb59ba3.zip |
milestone: queue object now has a param handler for new conf interface
... and action queue defs use this new interface (but not yet the main queues)
-rw-r--r-- | action.c | 88 | ||||
-rw-r--r-- | action.h | 6 | ||||
-rw-r--r-- | doc/v6compatibility.html | 2 | ||||
-rw-r--r-- | grammar/lexer.l | 21 | ||||
-rw-r--r-- | grammar/parserif.h | 1 | ||||
-rw-r--r-- | grammar/rainerscript.c | 29 | ||||
-rw-r--r-- | runtime/conf.c | 2 | ||||
-rw-r--r-- | runtime/queue.c | 207 | ||||
-rw-r--r-- | runtime/queue.h | 3 | ||||
-rw-r--r-- | runtime/rsconf.c | 6 | ||||
-rw-r--r-- | runtime/typedefs.h | 1 |
11 files changed, 267 insertions, 99 deletions
@@ -8,7 +8,7 @@ * the right code in question): For performance reasons, this module * uses different methods of message submission based on the user-selected * configuration. This code is similar, but can not be abstracted because - * of the performanse-affecting differences in it. As such, it is often + * of the performance-affecting differences in it. As such, it is often * necessary to triple-check that everything works well in *all* modes. * The different modes (and calling sequence) are: * @@ -197,32 +197,6 @@ static struct cnfparamblk paramblk = cnfparamdescr }; -/* Still TODO: */ -#if 0 - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &cs.pszActionQFName, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueSize, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqBatchSize, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxDiskSpace, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQHighWtrMark, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQLowWtrMark, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardMark, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardSeverity, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &cs.iActionQPersistUpdCnt, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &cs.bActionQSyncQeueFiles, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &cs.iActionQueueNumWorkers, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoQShutdown, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &cs.iActionQtoActShutdown, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &cs.iActionQtoEnq, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoWrkShutdown, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &cs.iActionQWrkMinMsgs, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxFileSize, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &cs.bActionQSaveOnShutdown, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqSlowdown, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinFromHr, NULL, eConfObjAction)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinToHr, NULL, eConfObjAction)); -#endif - /* ------------------------------ methods ------------------------------ */ /* This function returns the "current" time for this action. Current time @@ -364,7 +338,7 @@ finalize_it: /* action construction finalizer */ rsRetVal -actionConstructFinalize(action_t *pThis) +actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) { DEFiRet; uchar pszQName[64]; /* friendly name of our queue */ @@ -434,25 +408,31 @@ actionConstructFinalize(action_t *pThis) } qqueueSetpUsr(pThis->pQueue, pThis); - setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", cs.iActionQueMaxDiskSpace); - setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", cs.iActionQueueDeqBatchSize); - setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", cs.iActionQueMaxFileSize); - setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", cs.pszActionQFName); - setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", cs.iActionQPersistUpdCnt); - setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", cs.bActionQSyncQeueFiles); - setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", cs.iActionQtoQShutdown ); - setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", cs.iActionQtoActShutdown); - setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", cs.iActionQtoWrkShutdown); - setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", cs.iActionQtoEnq); - setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", cs.iActionQHighWtrMark); - setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", cs.iActionQLowWtrMark); - setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark); - setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity); - setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs); - setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown); - setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown); - setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr); - setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", cs.iActionQueueDeqtWinToHr); + if(queueParams == NULL) { + /* use legacy params */ + setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", cs.iActionQueMaxDiskSpace); + setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", cs.iActionQueueDeqBatchSize); + setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", cs.iActionQueMaxFileSize); + setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", cs.pszActionQFName); + setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", cs.iActionQPersistUpdCnt); + setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", cs.bActionQSyncQeueFiles); + setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", cs.iActionQtoQShutdown ); + setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", cs.iActionQtoActShutdown); + setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", cs.iActionQtoWrkShutdown); + setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", cs.iActionQtoEnq); + setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", cs.iActionQHighWtrMark); + setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", cs.iActionQLowWtrMark); + setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark); + setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs); + setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown); + setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown); + setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr); + setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", cs.iActionQueueDeqtWinToHr); + } else { + /* we have v6-style config params */ + qqueueApplyCnfParam(pThis->pQueue, queueParams); + } # undef setQPROP # undef setQPROPstr @@ -1762,7 +1742,8 @@ doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) * rgerhards, 2007-07-27 */ rsRetVal -addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, int bSuspended) +addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, + omodStringRequest_t *pOMSR, struct cnfparamvals *queueParams, int bSuspended) { DEFiRet; int i; @@ -1774,7 +1755,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques assert(ppAction != NULL); assert(pMod != NULL); assert(pOMSR != NULL); - DBGPRINTF("Module %s processed this config line.\n", module.GetName(pMod)); + DBGPRINTF("Module %s processes this action.\n", module.GetName(pMod)); CHKiRet(actionConstruct(&pAction)); /* create action object first */ pAction->pMod = pMod; @@ -1851,7 +1832,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques if(bSuspended) actionSuspend(pAction, datetime.GetTime(NULL)); /* "good" time call, only during init and unavoidable */ - CHKiRet(actionConstructFinalize(pAction)); + CHKiRet(actionConstructFinalize(pAction, queueParams)); /* TODO: if we exit here, we have a memory leak... */ @@ -1938,6 +1919,7 @@ rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction) { struct cnfparamvals *paramvals; + struct cnfparamvals *queueParams; modInfo_t *pMod; uchar *cnfModName = NULL; omodStringRequest_t *pOMSR; @@ -1966,7 +1948,10 @@ actionNewInst(struct nvlst *lst, action_t **ppAction) FINALIZE; /* iRet is already set to error state */ } - if((iRet = addAction(&pAction, pMod, pModData, pOMSR, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { + qqueueDoCnfParams(lst, &queueParams); + + if((iRet = addAction(&pAction, pMod, pModData, pOMSR, queueParams, + (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { /* now check if the module is compatible with select features */ if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs; @@ -1994,6 +1979,7 @@ actionProcessCnf(struct cnfobj *o) { DEFiRet; #if 0 /* we need to check if we actually need this functionality -- later! */ +// This is for STAND-ALONE actions at the conf file TOP level struct cnfparamvals *paramvals; paramvals = nvlstGetParams(o->nvlst, ¶mblk, NULL); @@ -3,7 +3,7 @@ * * File begun on 2007-08-06 by RGerhards (extracted from syslogd.c) * - * Copyright 2007 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -95,7 +95,7 @@ struct action_s { /* function prototypes */ rsRetVal actionConstruct(action_t **ppThis); -rsRetVal actionConstructFinalize(action_t *pThis); +rsRetVal actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams); rsRetVal actionDestruct(action_t *pThis); rsRetVal actionDbgPrint(action_t *pThis); rsRetVal actionSetGlobalResumeInterval(int iNewVal); @@ -103,7 +103,7 @@ rsRetVal actionDoAction(action_t *pAction); rsRetVal actionWriteToAction(action_t *pAction); rsRetVal actionCallHUPHdlr(action_t *pAction); rsRetVal actionClassInit(void); -rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, int bSuspended); +rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *queueParams, int bSuspended); rsRetVal actionNewScope(void); rsRetVal actionRestoreScope(void); rsRetVal activateActions(void); diff --git a/doc/v6compatibility.html b/doc/v6compatibility.html index cf621943..38f1e622 100644 --- a/doc/v6compatibility.html +++ b/doc/v6compatibility.html @@ -113,7 +113,7 @@ longer in v6: <blockquote><code> *.* rgerhards, bgerhards </code> </blockquote> -To fix it in a way that is compatible with pre-v6, use (note the removed space!): +To fix it in a way that is compatible with pre-v4, use (note the removed space!): <blockquote><code> *.* rgerhards,bgerhards </code> </blockquote> diff --git a/grammar/lexer.l b/grammar/lexer.l index cf912db3..f7691c4a 100644 --- a/grammar/lexer.l +++ b/grammar/lexer.l @@ -182,7 +182,7 @@ int fileno(FILE *stream); <INOBJ>#.*$ /* skip comments in input */ <INOBJ>[ \n\t] <INOBJ>. { dbgprintf("INOBJ: invalid char '%s'\n", yytext); } -\$[a-z]+.*$ { /* see common on $IncludeConfig above */ +\$[a-z]+.*$ { /* see comment on $IncludeConfig above */ if(!strncasecmp(yytext, "$includeconfig ", 14)) { yyless(14); BEGIN INCL; @@ -282,9 +282,17 @@ popfile(void) if(bs == NULL) return 1; - /* delte current entry */ + /* delete current entry. But we must not free the file name if + * this is the top-level file, because then it may still be used + * in error messages for other processing steps. + * TODO: change this to another method which stores the file + * name inside the config objects. In the longer term, this is + * necessary, as otherwise we may provide wrong file name information + * at the end of include files as well. -- rgerhards, 2011-07-22 + */ yy_delete_buffer(bs->bs); - free(bs->fn); + if(bs->prev != NULL) + free(bs->fn); free(bs->estr); /* switch back to previous */ @@ -299,3 +307,10 @@ popfile(void) cnfcurrfn = currbs->fn; return 0; } + +void +tellLexEndParsing(void) +{ + free(cnfcurrfn); + cnfcurrfn= NULL; +} diff --git a/grammar/parserif.h b/grammar/parserif.h index adb0f42f..597cfe40 100644 --- a/grammar/parserif.h +++ b/grammar/parserif.h @@ -6,6 +6,7 @@ int yyparse(); char *cnfcurrfn; void dbgprintf(char *fmt, ...) __attribute__((format(printf, 1, 2))); void parser_errmsg(char *fmt, ...) __attribute__((format(printf, 1, 2))); +void tellLexEndParsing(void); extern int yydebug; extern int yylineno; diff --git a/grammar/rainerscript.c b/grammar/rainerscript.c index fe99e4aa..1b44974c 100644 --- a/grammar/rainerscript.c +++ b/grammar/rainerscript.c @@ -35,9 +35,11 @@ #include <sys/stat.h> #include <sys/types.h> #include <libestr.h> +#include "rsyslog.h" #include "rainerscript.h" #include "parserif.h" #include "grammar.h" +#include "queue.h" #include "srUtils.h" void @@ -271,6 +273,30 @@ doGetBinary(struct nvlst *valnode, struct cnfparamdescr *param, } } +static inline void +doGetQueueType(struct nvlst *valnode, struct cnfparamdescr *param, + struct cnfparamvals *val) +{ + char *cstr; + if(!es_strcasebufcmp(valnode->val.d.estr, (uchar*)"fixedarray", 10)) { + val->val.d.n = QUEUETYPE_FIXED_ARRAY; + } else if(!es_strcasebufcmp(valnode->val.d.estr, (uchar*)"linkedlist", 10)) { + val->val.d.n = QUEUETYPE_LINKEDLIST; + } else if(!es_strcasebufcmp(valnode->val.d.estr, (uchar*)"disk", 4)) { + val->val.d.n = QUEUETYPE_DISK; + } else if(!es_strcasebufcmp(valnode->val.d.estr, (uchar*)"direct", 6)) { + val->val.d.n = QUEUETYPE_DIRECT; + } else { + cstr = es_str2cstr(valnode->val.d.estr, NULL); + parser_errmsg("param '%s': unknown queue type: '%s'", + param->name, cstr); + free(cstr); + } +dbgprintf("XXXXX: queue type: %d\n", (int)val->val.d.n); + val->val.datatype = 'N'; +} + + /* A file create-mode must be a four-digit octal number * starting with '0'. */ @@ -416,6 +442,9 @@ nvlstGetParam(struct nvlst *valnode, struct cnfparamdescr *param, valnode->bUsed = 1; val->bUsed = 1; switch(param->type) { + case eCmdHdlrQueueType: + doGetQueueType(valnode, param, val); + break; case eCmdHdlrUID: doGetUID(valnode, param, val); break; diff --git a/runtime/conf.c b/runtime/conf.c index 6136262e..d98a147b 100644 --- a/runtime/conf.c +++ b/runtime/conf.c @@ -741,7 +741,7 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction) */ if(currConfObj == eConfObjAction) currConfObj = eConfObjActionWaitEnd; - if((iRet = addAction(&pAction, pMod, pModData, pOMSR, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { + if((iRet = addAction(&pAction, pMod, pModData, pOMSR, NULL, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { /* now check if the module is compatible with select features */ if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs; diff --git a/runtime/queue.c b/runtime/queue.c index c831836d..1ceec5da 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -12,7 +12,7 @@ * function names - this makes it really hard to read and does not provide much * benefit, at least I (now) think so... * - * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -93,6 +93,41 @@ static rsRetVal qDestructDisk(qqueue_t *pThis); #define QUEUE_CHECKPOINT 1 #define QUEUE_NO_CHECKPOINT 0 + +/* tables for interfacing with the v6 config system */ +static struct cnfparamdescr cnfpdescr[] = { + { "queue.filename", eCmdHdlrGetWord, 0 }, + { "queue.size", eCmdHdlrSize, 0 }, + { "queue.dequeuebatchsize", eCmdHdlrInt, 0 }, + { "queue.maxdiskspace", eCmdHdlrSize, 0 }, + { "queue.highwatermark", eCmdHdlrInt, 0 }, + { "queue.lowwatermark", eCmdHdlrInt, 0 }, + { "queue.fulldelaymark", eCmdHdlrInt, 0 }, + { "queue.lightdelaymark", eCmdHdlrInt, 0 }, + { "queue.discardmark", eCmdHdlrInt, 0 }, + { "queue.discardseverity", eCmdHdlrFacility, 0 }, + { "queue.checkpointinterval", eCmdHdlrInt, 0 }, + { "queue.syncqueuefiles", eCmdHdlrBinary, 0 }, + { "queue.type", eCmdHdlrQueueType, 0 }, + { "queue.workerthreads", eCmdHdlrInt, 0 }, + { "queue.timeoutshutdown", eCmdHdlrInt, 0 }, + { "queue.timeoutactioncompletion", eCmdHdlrInt, 0 }, + { "queue.timeoutenqueue", eCmdHdlrInt, 0 }, + { "queue.timeoutworkerthreadshutdown", eCmdHdlrInt, 0 }, + { "queue.workerthreadminimummessages", eCmdHdlrInt, 0 }, + { "queue.maxfilesize", eCmdHdlrSize, 0 }, + { "queue.saveonshutdown", eCmdHdlrBinary, 0 }, + { "queue.dequeueslowdown", eCmdHdlrInt, 0 }, + { "queue.dequeuetimebegin", eCmdHdlrInt, 0 }, + { "queue.dequeuetimeend", eCmdHdlrInt, 0 }, +}; +static struct cnfparamblk pblk = + { CNFPARAMBLK_VERSION, + sizeof(cnfpdescr)/sizeof(struct cnfparamdescr), + cnfpdescr + }; + + /*********************************************************************** * we need a private data structure, the "to-delete" list. As C does * not provide any partly private data structures, we implement this @@ -1258,8 +1293,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); /* set some water marks so that we have useful defaults if none are set specifically */ - pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */ - pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */ + pThis->iFullDlyMrk = -1; + pThis->iLightDlyMrk = -1; pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir); pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->iQueueSize = 0; @@ -1273,42 +1308,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->pszFilePrefix = NULL; pThis->qType = qType; - /* set type-specific handlers and other very type-specific things (we can not totally hide it...) */ - switch(qType) { - case QUEUETYPE_FIXED_ARRAY: - pThis->qConstruct = qConstructFixedArray; - pThis->qDestruct = qDestructFixedArray; - pThis->qAdd = qAddFixedArray; - pThis->qDeq = qDeqFixedArray; - pThis->qDel = qDelFixedArray; - pThis->MultiEnq = qqueueMultiEnqObjNonDirect; - break; - case QUEUETYPE_LINKEDLIST: - pThis->qConstruct = qConstructLinkedList; - pThis->qDestruct = qDestructLinkedList; - pThis->qAdd = qAddLinkedList; - pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; - pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; - pThis->MultiEnq = qqueueMultiEnqObjNonDirect; - break; - case QUEUETYPE_DISK: - pThis->qConstruct = qConstructDisk; - pThis->qDestruct = qDestructDisk; - pThis->qAdd = qAddDisk; - pThis->qDeq = qDeqDisk; - pThis->qDel = qDelDisk; - pThis->MultiEnq = qqueueMultiEnqObjNonDirect; - /* special handling */ - pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ - break; - case QUEUETYPE_DIRECT: - pThis->qConstruct = qConstructDirect; - pThis->qDestruct = qDestructDirect; - pThis->qAdd = qAddDirect; - pThis->qDel = qDelDirect; - pThis->MultiEnq = qqueueMultiEnqObjDirect; - break; - } INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize); INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq); @@ -1891,6 +1890,52 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ ASSERT(pThis != NULL); + /* set type-specific handlers and other very type-specific things + * (we can not totally hide it...) + */ + switch(pThis->qType) { + case QUEUETYPE_FIXED_ARRAY: + pThis->qConstruct = qConstructFixedArray; + pThis->qDestruct = qDestructFixedArray; + pThis->qAdd = qAddFixedArray; + pThis->qDeq = qDeqFixedArray; + pThis->qDel = qDelFixedArray; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; + break; + case QUEUETYPE_LINKEDLIST: + pThis->qConstruct = qConstructLinkedList; + pThis->qDestruct = qDestructLinkedList; + pThis->qAdd = qAddLinkedList; + pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; + pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; + break; + case QUEUETYPE_DISK: + pThis->qConstruct = qConstructDisk; + pThis->qDestruct = qDestructDisk; + pThis->qAdd = qAddDisk; + pThis->qDeq = qDeqDisk; + pThis->qDel = qDelDisk; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; + /* special handling */ + pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ + break; + case QUEUETYPE_DIRECT: + pThis->qConstruct = qConstructDirect; + pThis->qDestruct = qDestructDirect; + pThis->qAdd = qAddDirect; + pThis->qDel = qDelDirect; + pThis->MultiEnq = qqueueMultiEnqObjDirect; + break; + } + + if(pThis->iFullDlyMrk == -1) + pThis->iFullDlyMrk = pThis->iMaxQueueSize + - (pThis->iMaxQueueSize / 100) * 3; /* default 97% */ + if(pThis->iLightDlyMrk == -1) + pThis->iLightDlyMrk = pThis->iMaxQueueSize + - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */ + /* we need to do a quick check if our water marks are set plausible. If not, * we correct the most important shortcomings. TODO: do that!!!! -- rgerhards, 2008-03-14 */ @@ -2459,6 +2504,90 @@ finalize_it: } +/* take v6 config list and extract the queue params out of it. Hand the + * param values back to the caler. Caller is responsible for destructing + * them when no longer needed. Caller can use this param block to configure + * all parameters for a newly created queue with one call to qqueueSetParams(). + * rgerhards, 2011-07-22 + */ +rsRetVal +qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals) +{ + *ppvals = nvlstGetParams(lst, &pblk, NULL); + return RS_RET_OK; +} + +/* apply all params from param block to queue. Must be called before + * finalizing. This supports the v6 config system. Defaults were already + * set during queue creation. The pvals object is destructed by this + * function. + */ +rsRetVal +qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals) +{ + int i; + for(i = 0 ; i < pblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(pblk.descr[i].name, "queue.filename")) { + pThis->pszFilePrefix = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); + pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr); + } else if(!strcmp(pblk.descr[i].name, "queue.size")) { + pThis->iMaxQueueSize = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) { + pThis->iDeqBatchSize = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.maxdiskspace")) { + pThis->iMaxFileSize = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.highwatermark")) { + pThis->iHighWtrMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.lowwatermark")) { + pThis->iLowWtrMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.fulldelaymark")) { + pThis->iFullDlyMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.lightdelaymark")) { + pThis->iLightDlyMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.discardmark")) { + pThis->iDiscardMrk = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.discardseverity")) { + pThis->iDiscardSeverity = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.checkpointinterval")) { + pThis->iPersistUpdCnt = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.syncqueuefiles")) { + pThis->bSyncQueueFiles = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.type")) { + pThis->qType = (queueType_t) pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.workerthreads")) { + pThis->iNumWorkerThreads = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.timeoutshutdown")) { + pThis->toQShutdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.timeoutactioncompletion")) { + pThis->toActShutdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.timeoutenqueue")) { + pThis->toEnq = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.timeoutworkerthreadshutdown")) { + pThis->toWrkShutdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.workerthreadminimummessages")) { + pThis->iMinMsgsPerWrkr = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.maxfilesize")) { + pThis->iMaxFileSize = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.saveonshutdown")) { + pThis->bSaveOnShutdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.dequeueslowdown")) { + pThis->iDeqSlowdown = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queue.dequeuetimebegin")) { + pThis->iDeqtWinFromHr = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "queuedequeuetimend.")) { + pThis->iDeqtWinToHr = pvals[i].val.d.n; + } else { + dbgprintf("queue: program error, non-handled " + "param '%s'\n", pblk.descr[i].name); + } + } + cnfparamvalsDestruct(pvals, &pblk); + return RS_RET_OK; +} + + /* some simple object access methods */ DEFpropSetMeth(qqueue, bSyncQueueFiles, int) DEFpropSetMeth(qqueue, iPersistUpdCnt, int) diff --git a/runtime/queue.h b/runtime/queue.h index 97057180..c18b9f47 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -190,6 +190,9 @@ rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefi rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); +rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals); +rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals); + PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int); diff --git a/runtime/rsconf.c b/runtime/rsconf.c index 20c3b4f0..61e8ca96 100644 --- a/runtime/rsconf.c +++ b/runtime/rsconf.c @@ -345,7 +345,10 @@ parser_errmsg(char *fmt, ...) va_start(ap, fmt); if(vsnprintf(errBuf, sizeof(errBuf), fmt, ap) == sizeof(errBuf)) - errBuf[1023] = '\0'; + errBuf[sizeof(errBuf)-1] = '\0'; +dbgprintf("XXXX: msg: %s\n", errBuf); +dbgprintf("XXXX: cnfcurrfn: %s\n", cnfcurrfn); +dbgprintf("XXXX: yylineno: %d\n", yylineno); errmsg.LogError(0, RS_RET_CONF_PARSE_ERROR, "error during parsing file %s, on or before line %d: %s", cnfcurrfn, yylineno, errBuf); @@ -1258,6 +1261,7 @@ ourConf = loadConf; // TODO: remove, once ourConf is gone! "run, but no output whatsoever is created."); ABORT_FINALIZE(RS_RET_NO_ACTIONS); } + tellLexEndParsing(); tellCoreConfigLoadDone(); tellModulesConfigLoadDone(); diff --git a/runtime/typedefs.h b/runtime/typedefs.h index d46851f6..f994cbc4 100644 --- a/runtime/typedefs.h +++ b/runtime/typedefs.h @@ -160,6 +160,7 @@ typedef enum cslCmdHdlrType { eCmdHdlrSeverity, eCmdHdlrGetWord, eCmdHdlrString, + eCmdHdlrQueueType, eCmdHdlrGoneAway /* statment existed, but is no longer supported */ } ecslCmdHdrlType; |