summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-07-22 18:03:43 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2011-07-22 18:03:43 +0200
commit9757aeb56445eee3aca2b43e6b3efa1f1cb59ba3 (patch)
treeb44dad3ab3f3477c4b6c45b615b60f5e51f58e35
parent6b8b7ba0091a4e59b9a45057756fc7f754576242 (diff)
downloadrsyslog-9757aeb56445eee3aca2b43e6b3efa1f1cb59ba3.tar.gz
rsyslog-9757aeb56445eee3aca2b43e6b3efa1f1cb59ba3.tar.xz
rsyslog-9757aeb56445eee3aca2b43e6b3efa1f1cb59ba3.zip
milestone: queue object now has a param handler for new conf interface
... and action queue defs use this new interface (but not yet the main queues)
-rw-r--r--action.c88
-rw-r--r--action.h6
-rw-r--r--doc/v6compatibility.html2
-rw-r--r--grammar/lexer.l21
-rw-r--r--grammar/parserif.h1
-rw-r--r--grammar/rainerscript.c29
-rw-r--r--runtime/conf.c2
-rw-r--r--runtime/queue.c207
-rw-r--r--runtime/queue.h3
-rw-r--r--runtime/rsconf.c6
-rw-r--r--runtime/typedefs.h1
11 files changed, 267 insertions, 99 deletions
diff --git a/action.c b/action.c
index c459a738..74f6f7f7 100644
--- a/action.c
+++ b/action.c
@@ -8,7 +8,7 @@
* the right code in question): For performance reasons, this module
* uses different methods of message submission based on the user-selected
* configuration. This code is similar, but can not be abstracted because
- * of the performanse-affecting differences in it. As such, it is often
+ * of the performance-affecting differences in it. As such, it is often
* necessary to triple-check that everything works well in *all* modes.
* The different modes (and calling sequence) are:
*
@@ -197,32 +197,6 @@ static struct cnfparamblk paramblk =
cnfparamdescr
};
-/* Still TODO: */
-#if 0
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &cs.pszActionQFName, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueSize, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqBatchSize, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxDiskSpace, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQHighWtrMark, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQLowWtrMark, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardMark, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardSeverity, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &cs.iActionQPersistUpdCnt, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &cs.bActionQSyncQeueFiles, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &cs.iActionQueueNumWorkers, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoQShutdown, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &cs.iActionQtoActShutdown, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &cs.iActionQtoEnq, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoWrkShutdown, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &cs.iActionQWrkMinMsgs, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxFileSize, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &cs.bActionQSaveOnShutdown, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqSlowdown, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinFromHr, NULL, eConfObjAction));
- CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinToHr, NULL, eConfObjAction));
-#endif
-
/* ------------------------------ methods ------------------------------ */
/* This function returns the "current" time for this action. Current time
@@ -364,7 +338,7 @@ finalize_it:
/* action construction finalizer
*/
rsRetVal
-actionConstructFinalize(action_t *pThis)
+actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
{
DEFiRet;
uchar pszQName[64]; /* friendly name of our queue */
@@ -434,25 +408,31 @@ actionConstructFinalize(action_t *pThis)
}
qqueueSetpUsr(pThis->pQueue, pThis);
- setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", cs.iActionQueMaxDiskSpace);
- setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", cs.iActionQueueDeqBatchSize);
- setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", cs.iActionQueMaxFileSize);
- setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", cs.pszActionQFName);
- setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", cs.iActionQPersistUpdCnt);
- setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", cs.bActionQSyncQeueFiles);
- setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", cs.iActionQtoQShutdown );
- setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", cs.iActionQtoActShutdown);
- setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", cs.iActionQtoWrkShutdown);
- setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", cs.iActionQtoEnq);
- setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", cs.iActionQHighWtrMark);
- setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", cs.iActionQLowWtrMark);
- setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark);
- setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity);
- setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs);
- setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown);
- setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown);
- setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr);
- setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", cs.iActionQueueDeqtWinToHr);
+ if(queueParams == NULL) {
+ /* use legacy params */
+ setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", cs.iActionQueMaxDiskSpace);
+ setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", cs.iActionQueueDeqBatchSize);
+ setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", cs.iActionQueMaxFileSize);
+ setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", cs.pszActionQFName);
+ setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", cs.iActionQPersistUpdCnt);
+ setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", cs.bActionQSyncQeueFiles);
+ setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", cs.iActionQtoQShutdown );
+ setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", cs.iActionQtoActShutdown);
+ setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", cs.iActionQtoWrkShutdown);
+ setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", cs.iActionQtoEnq);
+ setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", cs.iActionQHighWtrMark);
+ setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", cs.iActionQLowWtrMark);
+ setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark);
+ setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity);
+ setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs);
+ setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown);
+ setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown);
+ setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr);
+ setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", cs.iActionQueueDeqtWinToHr);
+ } else {
+ /* we have v6-style config params */
+ qqueueApplyCnfParam(pThis->pQueue, queueParams);
+ }
# undef setQPROP
# undef setQPROPstr
@@ -1762,7 +1742,8 @@ doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
* rgerhards, 2007-07-27
*/
rsRetVal
-addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, int bSuspended)
+addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
+ omodStringRequest_t *pOMSR, struct cnfparamvals *queueParams, int bSuspended)
{
DEFiRet;
int i;
@@ -1774,7 +1755,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
assert(ppAction != NULL);
assert(pMod != NULL);
assert(pOMSR != NULL);
- DBGPRINTF("Module %s processed this config line.\n", module.GetName(pMod));
+ DBGPRINTF("Module %s processes this action.\n", module.GetName(pMod));
CHKiRet(actionConstruct(&pAction)); /* create action object first */
pAction->pMod = pMod;
@@ -1851,7 +1832,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
if(bSuspended)
actionSuspend(pAction, datetime.GetTime(NULL)); /* "good" time call, only during init and unavoidable */
- CHKiRet(actionConstructFinalize(pAction));
+ CHKiRet(actionConstructFinalize(pAction, queueParams));
/* TODO: if we exit here, we have a memory leak... */
@@ -1938,6 +1919,7 @@ rsRetVal
actionNewInst(struct nvlst *lst, action_t **ppAction)
{
struct cnfparamvals *paramvals;
+ struct cnfparamvals *queueParams;
modInfo_t *pMod;
uchar *cnfModName = NULL;
omodStringRequest_t *pOMSR;
@@ -1966,7 +1948,10 @@ actionNewInst(struct nvlst *lst, action_t **ppAction)
FINALIZE; /* iRet is already set to error state */
}
- if((iRet = addAction(&pAction, pMod, pModData, pOMSR, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
+ qqueueDoCnfParams(lst, &queueParams);
+
+ if((iRet = addAction(&pAction, pMod, pModData, pOMSR, queueParams,
+ (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
/* now check if the module is compatible with select features */
if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK)
pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
@@ -1994,6 +1979,7 @@ actionProcessCnf(struct cnfobj *o)
{
DEFiRet;
#if 0 /* we need to check if we actually need this functionality -- later! */
+// This is for STAND-ALONE actions at the conf file TOP level
struct cnfparamvals *paramvals;
paramvals = nvlstGetParams(o->nvlst, &paramblk, NULL);
diff --git a/action.h b/action.h
index 4d56106b..5d1f387b 100644
--- a/action.h
+++ b/action.h
@@ -3,7 +3,7 @@
*
* File begun on 2007-08-06 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -95,7 +95,7 @@ struct action_s {
/* function prototypes
*/
rsRetVal actionConstruct(action_t **ppThis);
-rsRetVal actionConstructFinalize(action_t *pThis);
+rsRetVal actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams);
rsRetVal actionDestruct(action_t *pThis);
rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
@@ -103,7 +103,7 @@ rsRetVal actionDoAction(action_t *pAction);
rsRetVal actionWriteToAction(action_t *pAction);
rsRetVal actionCallHUPHdlr(action_t *pAction);
rsRetVal actionClassInit(void);
-rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, int bSuspended);
+rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *queueParams, int bSuspended);
rsRetVal actionNewScope(void);
rsRetVal actionRestoreScope(void);
rsRetVal activateActions(void);
diff --git a/doc/v6compatibility.html b/doc/v6compatibility.html
index cf621943..38f1e622 100644
--- a/doc/v6compatibility.html
+++ b/doc/v6compatibility.html
@@ -113,7 +113,7 @@ longer in v6:
<blockquote><code>
*.* rgerhards, bgerhards
</code> </blockquote>
-To fix it in a way that is compatible with pre-v6, use (note the removed space!):
+To fix it in a way that is compatible with pre-v4, use (note the removed space!):
<blockquote><code>
*.* rgerhards,bgerhards
</code> </blockquote>
diff --git a/grammar/lexer.l b/grammar/lexer.l
index cf912db3..f7691c4a 100644
--- a/grammar/lexer.l
+++ b/grammar/lexer.l
@@ -182,7 +182,7 @@ int fileno(FILE *stream);
<INOBJ>#.*$ /* skip comments in input */
<INOBJ>[ \n\t]
<INOBJ>. { dbgprintf("INOBJ: invalid char '%s'\n", yytext); }
-\$[a-z]+.*$ { /* see common on $IncludeConfig above */
+\$[a-z]+.*$ { /* see comment on $IncludeConfig above */
if(!strncasecmp(yytext, "$includeconfig ", 14)) {
yyless(14);
BEGIN INCL;
@@ -282,9 +282,17 @@ popfile(void)
if(bs == NULL)
return 1;
- /* delte current entry */
+ /* delete current entry. But we must not free the file name if
+ * this is the top-level file, because then it may still be used
+ * in error messages for other processing steps.
+ * TODO: change this to another method which stores the file
+ * name inside the config objects. In the longer term, this is
+ * necessary, as otherwise we may provide wrong file name information
+ * at the end of include files as well. -- rgerhards, 2011-07-22
+ */
yy_delete_buffer(bs->bs);
- free(bs->fn);
+ if(bs->prev != NULL)
+ free(bs->fn);
free(bs->estr);
/* switch back to previous */
@@ -299,3 +307,10 @@ popfile(void)
cnfcurrfn = currbs->fn;
return 0;
}
+
+void
+tellLexEndParsing(void)
+{
+ free(cnfcurrfn);
+ cnfcurrfn= NULL;
+}
diff --git a/grammar/parserif.h b/grammar/parserif.h
index adb0f42f..597cfe40 100644
--- a/grammar/parserif.h
+++ b/grammar/parserif.h
@@ -6,6 +6,7 @@ int yyparse();
char *cnfcurrfn;
void dbgprintf(char *fmt, ...) __attribute__((format(printf, 1, 2)));
void parser_errmsg(char *fmt, ...) __attribute__((format(printf, 1, 2)));
+void tellLexEndParsing(void);
extern int yydebug;
extern int yylineno;
diff --git a/grammar/rainerscript.c b/grammar/rainerscript.c
index fe99e4aa..1b44974c 100644
--- a/grammar/rainerscript.c
+++ b/grammar/rainerscript.c
@@ -35,9 +35,11 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <libestr.h>
+#include "rsyslog.h"
#include "rainerscript.h"
#include "parserif.h"
#include "grammar.h"
+#include "queue.h"
#include "srUtils.h"
void
@@ -271,6 +273,30 @@ doGetBinary(struct nvlst *valnode, struct cnfparamdescr *param,
}
}
+static inline void
+doGetQueueType(struct nvlst *valnode, struct cnfparamdescr *param,
+ struct cnfparamvals *val)
+{
+ char *cstr;
+ if(!es_strcasebufcmp(valnode->val.d.estr, (uchar*)"fixedarray", 10)) {
+ val->val.d.n = QUEUETYPE_FIXED_ARRAY;
+ } else if(!es_strcasebufcmp(valnode->val.d.estr, (uchar*)"linkedlist", 10)) {
+ val->val.d.n = QUEUETYPE_LINKEDLIST;
+ } else if(!es_strcasebufcmp(valnode->val.d.estr, (uchar*)"disk", 4)) {
+ val->val.d.n = QUEUETYPE_DISK;
+ } else if(!es_strcasebufcmp(valnode->val.d.estr, (uchar*)"direct", 6)) {
+ val->val.d.n = QUEUETYPE_DIRECT;
+ } else {
+ cstr = es_str2cstr(valnode->val.d.estr, NULL);
+ parser_errmsg("param '%s': unknown queue type: '%s'",
+ param->name, cstr);
+ free(cstr);
+ }
+dbgprintf("XXXXX: queue type: %d\n", (int)val->val.d.n);
+ val->val.datatype = 'N';
+}
+
+
/* A file create-mode must be a four-digit octal number
* starting with '0'.
*/
@@ -416,6 +442,9 @@ nvlstGetParam(struct nvlst *valnode, struct cnfparamdescr *param,
valnode->bUsed = 1;
val->bUsed = 1;
switch(param->type) {
+ case eCmdHdlrQueueType:
+ doGetQueueType(valnode, param, val);
+ break;
case eCmdHdlrUID:
doGetUID(valnode, param, val);
break;
diff --git a/runtime/conf.c b/runtime/conf.c
index 6136262e..d98a147b 100644
--- a/runtime/conf.c
+++ b/runtime/conf.c
@@ -741,7 +741,7 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction)
*/
if(currConfObj == eConfObjAction)
currConfObj = eConfObjActionWaitEnd;
- if((iRet = addAction(&pAction, pMod, pModData, pOMSR, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
+ if((iRet = addAction(&pAction, pMod, pModData, pOMSR, NULL, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
/* now check if the module is compatible with select features */
if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK)
pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
diff --git a/runtime/queue.c b/runtime/queue.c
index c831836d..1ceec5da 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -12,7 +12,7 @@
* function names - this makes it really hard to read and does not provide much
* benefit, at least I (now) think so...
*
- * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -93,6 +93,41 @@ static rsRetVal qDestructDisk(qqueue_t *pThis);
#define QUEUE_CHECKPOINT 1
#define QUEUE_NO_CHECKPOINT 0
+
+/* tables for interfacing with the v6 config system */
+static struct cnfparamdescr cnfpdescr[] = {
+ { "queue.filename", eCmdHdlrGetWord, 0 },
+ { "queue.size", eCmdHdlrSize, 0 },
+ { "queue.dequeuebatchsize", eCmdHdlrInt, 0 },
+ { "queue.maxdiskspace", eCmdHdlrSize, 0 },
+ { "queue.highwatermark", eCmdHdlrInt, 0 },
+ { "queue.lowwatermark", eCmdHdlrInt, 0 },
+ { "queue.fulldelaymark", eCmdHdlrInt, 0 },
+ { "queue.lightdelaymark", eCmdHdlrInt, 0 },
+ { "queue.discardmark", eCmdHdlrInt, 0 },
+ { "queue.discardseverity", eCmdHdlrFacility, 0 },
+ { "queue.checkpointinterval", eCmdHdlrInt, 0 },
+ { "queue.syncqueuefiles", eCmdHdlrBinary, 0 },
+ { "queue.type", eCmdHdlrQueueType, 0 },
+ { "queue.workerthreads", eCmdHdlrInt, 0 },
+ { "queue.timeoutshutdown", eCmdHdlrInt, 0 },
+ { "queue.timeoutactioncompletion", eCmdHdlrInt, 0 },
+ { "queue.timeoutenqueue", eCmdHdlrInt, 0 },
+ { "queue.timeoutworkerthreadshutdown", eCmdHdlrInt, 0 },
+ { "queue.workerthreadminimummessages", eCmdHdlrInt, 0 },
+ { "queue.maxfilesize", eCmdHdlrSize, 0 },
+ { "queue.saveonshutdown", eCmdHdlrBinary, 0 },
+ { "queue.dequeueslowdown", eCmdHdlrInt, 0 },
+ { "queue.dequeuetimebegin", eCmdHdlrInt, 0 },
+ { "queue.dequeuetimeend", eCmdHdlrInt, 0 },
+};
+static struct cnfparamblk pblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(cnfpdescr)/sizeof(struct cnfparamdescr),
+ cnfpdescr
+ };
+
+
/***********************************************************************
* we need a private data structure, the "to-delete" list. As C does
* not provide any partly private data structures, we implement this
@@ -1258,8 +1293,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
/* set some water marks so that we have useful defaults if none are set specifically */
- pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */
- pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */
+ pThis->iFullDlyMrk = -1;
+ pThis->iLightDlyMrk = -1;
pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir);
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
@@ -1273,42 +1308,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
- /* set type-specific handlers and other very type-specific things (we can not totally hide it...) */
- switch(qType) {
- case QUEUETYPE_FIXED_ARRAY:
- pThis->qConstruct = qConstructFixedArray;
- pThis->qDestruct = qDestructFixedArray;
- pThis->qAdd = qAddFixedArray;
- pThis->qDeq = qDeqFixedArray;
- pThis->qDel = qDelFixedArray;
- pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
- break;
- case QUEUETYPE_LINKEDLIST:
- pThis->qConstruct = qConstructLinkedList;
- pThis->qDestruct = qDestructLinkedList;
- pThis->qAdd = qAddLinkedList;
- pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
- pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
- pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
- break;
- case QUEUETYPE_DISK:
- pThis->qConstruct = qConstructDisk;
- pThis->qDestruct = qDestructDisk;
- pThis->qAdd = qAddDisk;
- pThis->qDeq = qDeqDisk;
- pThis->qDel = qDelDisk;
- pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
- /* special handling */
- pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
- break;
- case QUEUETYPE_DIRECT:
- pThis->qConstruct = qConstructDirect;
- pThis->qDestruct = qDestructDirect;
- pThis->qAdd = qAddDirect;
- pThis->qDel = qDelDirect;
- pThis->MultiEnq = qqueueMultiEnqObjDirect;
- break;
- }
INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
@@ -1891,6 +1890,52 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
ASSERT(pThis != NULL);
+ /* set type-specific handlers and other very type-specific things
+ * (we can not totally hide it...)
+ */
+ switch(pThis->qType) {
+ case QUEUETYPE_FIXED_ARRAY:
+ pThis->qConstruct = qConstructFixedArray;
+ pThis->qDestruct = qDestructFixedArray;
+ pThis->qAdd = qAddFixedArray;
+ pThis->qDeq = qDeqFixedArray;
+ pThis->qDel = qDelFixedArray;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
+ break;
+ case QUEUETYPE_LINKEDLIST:
+ pThis->qConstruct = qConstructLinkedList;
+ pThis->qDestruct = qDestructLinkedList;
+ pThis->qAdd = qAddLinkedList;
+ pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
+ pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
+ break;
+ case QUEUETYPE_DISK:
+ pThis->qConstruct = qConstructDisk;
+ pThis->qDestruct = qDestructDisk;
+ pThis->qAdd = qAddDisk;
+ pThis->qDeq = qDeqDisk;
+ pThis->qDel = qDelDisk;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
+ /* special handling */
+ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
+ break;
+ case QUEUETYPE_DIRECT:
+ pThis->qConstruct = qConstructDirect;
+ pThis->qDestruct = qDestructDirect;
+ pThis->qAdd = qAddDirect;
+ pThis->qDel = qDelDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ break;
+ }
+
+ if(pThis->iFullDlyMrk == -1)
+ pThis->iFullDlyMrk = pThis->iMaxQueueSize
+ - (pThis->iMaxQueueSize / 100) * 3; /* default 97% */
+ if(pThis->iLightDlyMrk == -1)
+ pThis->iLightDlyMrk = pThis->iMaxQueueSize
+ - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */
+
/* we need to do a quick check if our water marks are set plausible. If not,
* we correct the most important shortcomings. TODO: do that!!!! -- rgerhards, 2008-03-14
*/
@@ -2459,6 +2504,90 @@ finalize_it:
}
+/* take v6 config list and extract the queue params out of it. Hand the
+ * param values back to the caler. Caller is responsible for destructing
+ * them when no longer needed. Caller can use this param block to configure
+ * all parameters for a newly created queue with one call to qqueueSetParams().
+ * rgerhards, 2011-07-22
+ */
+rsRetVal
+qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals)
+{
+ *ppvals = nvlstGetParams(lst, &pblk, NULL);
+ return RS_RET_OK;
+}
+
+/* apply all params from param block to queue. Must be called before
+ * finalizing. This supports the v6 config system. Defaults were already
+ * set during queue creation. The pvals object is destructed by this
+ * function.
+ */
+rsRetVal
+qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals)
+{
+ int i;
+ for(i = 0 ; i < pblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(pblk.descr[i].name, "queue.filename")) {
+ pThis->pszFilePrefix = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
+ pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr);
+ } else if(!strcmp(pblk.descr[i].name, "queue.size")) {
+ pThis->iMaxQueueSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) {
+ pThis->iDeqBatchSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.maxdiskspace")) {
+ pThis->iMaxFileSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.highwatermark")) {
+ pThis->iHighWtrMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.lowwatermark")) {
+ pThis->iLowWtrMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.fulldelaymark")) {
+ pThis->iFullDlyMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.lightdelaymark")) {
+ pThis->iLightDlyMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.discardmark")) {
+ pThis->iDiscardMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.discardseverity")) {
+ pThis->iDiscardSeverity = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.checkpointinterval")) {
+ pThis->iPersistUpdCnt = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.syncqueuefiles")) {
+ pThis->bSyncQueueFiles = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.type")) {
+ pThis->qType = (queueType_t) pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.workerthreads")) {
+ pThis->iNumWorkerThreads = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutshutdown")) {
+ pThis->toQShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutactioncompletion")) {
+ pThis->toActShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutenqueue")) {
+ pThis->toEnq = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutworkerthreadshutdown")) {
+ pThis->toWrkShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.workerthreadminimummessages")) {
+ pThis->iMinMsgsPerWrkr = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.maxfilesize")) {
+ pThis->iMaxFileSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.saveonshutdown")) {
+ pThis->bSaveOnShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.dequeueslowdown")) {
+ pThis->iDeqSlowdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.dequeuetimebegin")) {
+ pThis->iDeqtWinFromHr = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queuedequeuetimend.")) {
+ pThis->iDeqtWinToHr = pvals[i].val.d.n;
+ } else {
+ dbgprintf("queue: program error, non-handled "
+ "param '%s'\n", pblk.descr[i].name);
+ }
+ }
+ cnfparamvalsDestruct(pvals, &pblk);
+ return RS_RET_OK;
+}
+
+
/* some simple object access methods */
DEFpropSetMeth(qqueue, bSyncQueueFiles, int)
DEFpropSetMeth(qqueue, iPersistUpdCnt, int)
diff --git a/runtime/queue.h b/runtime/queue.h
index 97057180..c18b9f47 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -190,6 +190,9 @@ rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefi
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
+rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals);
+rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals);
+
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int);
diff --git a/runtime/rsconf.c b/runtime/rsconf.c
index 20c3b4f0..61e8ca96 100644
--- a/runtime/rsconf.c
+++ b/runtime/rsconf.c
@@ -345,7 +345,10 @@ parser_errmsg(char *fmt, ...)
va_start(ap, fmt);
if(vsnprintf(errBuf, sizeof(errBuf), fmt, ap) == sizeof(errBuf))
- errBuf[1023] = '\0';
+ errBuf[sizeof(errBuf)-1] = '\0';
+dbgprintf("XXXX: msg: %s\n", errBuf);
+dbgprintf("XXXX: cnfcurrfn: %s\n", cnfcurrfn);
+dbgprintf("XXXX: yylineno: %d\n", yylineno);
errmsg.LogError(0, RS_RET_CONF_PARSE_ERROR,
"error during parsing file %s, on or before line %d: %s",
cnfcurrfn, yylineno, errBuf);
@@ -1258,6 +1261,7 @@ ourConf = loadConf; // TODO: remove, once ourConf is gone!
"run, but no output whatsoever is created.");
ABORT_FINALIZE(RS_RET_NO_ACTIONS);
}
+ tellLexEndParsing();
tellCoreConfigLoadDone();
tellModulesConfigLoadDone();
diff --git a/runtime/typedefs.h b/runtime/typedefs.h
index d46851f6..f994cbc4 100644
--- a/runtime/typedefs.h
+++ b/runtime/typedefs.h
@@ -160,6 +160,7 @@ typedef enum cslCmdHdlrType {
eCmdHdlrSeverity,
eCmdHdlrGetWord,
eCmdHdlrString,
+ eCmdHdlrQueueType,
eCmdHdlrGoneAway /* statment existed, but is no longer supported */
} ecslCmdHdrlType;