summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-07-22 18:03:43 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2011-07-22 18:03:43 +0200
commit9757aeb56445eee3aca2b43e6b3efa1f1cb59ba3 (patch)
treeb44dad3ab3f3477c4b6c45b615b60f5e51f58e35 /runtime
parent6b8b7ba0091a4e59b9a45057756fc7f754576242 (diff)
downloadrsyslog-9757aeb56445eee3aca2b43e6b3efa1f1cb59ba3.tar.gz
rsyslog-9757aeb56445eee3aca2b43e6b3efa1f1cb59ba3.tar.xz
rsyslog-9757aeb56445eee3aca2b43e6b3efa1f1cb59ba3.zip
milestone: queue object now has a param handler for new conf interface
... and action queue defs use this new interface (but not yet the main queues)
Diffstat (limited to 'runtime')
-rw-r--r--runtime/conf.c2
-rw-r--r--runtime/queue.c207
-rw-r--r--runtime/queue.h3
-rw-r--r--runtime/rsconf.c6
-rw-r--r--runtime/typedefs.h1
5 files changed, 178 insertions, 41 deletions
diff --git a/runtime/conf.c b/runtime/conf.c
index 6136262e..d98a147b 100644
--- a/runtime/conf.c
+++ b/runtime/conf.c
@@ -741,7 +741,7 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction)
*/
if(currConfObj == eConfObjAction)
currConfObj = eConfObjActionWaitEnd;
- if((iRet = addAction(&pAction, pMod, pModData, pOMSR, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
+ if((iRet = addAction(&pAction, pMod, pModData, pOMSR, NULL, (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
/* now check if the module is compatible with select features */
if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK)
pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
diff --git a/runtime/queue.c b/runtime/queue.c
index c831836d..1ceec5da 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -12,7 +12,7 @@
* function names - this makes it really hard to read and does not provide much
* benefit, at least I (now) think so...
*
- * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -93,6 +93,41 @@ static rsRetVal qDestructDisk(qqueue_t *pThis);
#define QUEUE_CHECKPOINT 1
#define QUEUE_NO_CHECKPOINT 0
+
+/* tables for interfacing with the v6 config system */
+static struct cnfparamdescr cnfpdescr[] = {
+ { "queue.filename", eCmdHdlrGetWord, 0 },
+ { "queue.size", eCmdHdlrSize, 0 },
+ { "queue.dequeuebatchsize", eCmdHdlrInt, 0 },
+ { "queue.maxdiskspace", eCmdHdlrSize, 0 },
+ { "queue.highwatermark", eCmdHdlrInt, 0 },
+ { "queue.lowwatermark", eCmdHdlrInt, 0 },
+ { "queue.fulldelaymark", eCmdHdlrInt, 0 },
+ { "queue.lightdelaymark", eCmdHdlrInt, 0 },
+ { "queue.discardmark", eCmdHdlrInt, 0 },
+ { "queue.discardseverity", eCmdHdlrFacility, 0 },
+ { "queue.checkpointinterval", eCmdHdlrInt, 0 },
+ { "queue.syncqueuefiles", eCmdHdlrBinary, 0 },
+ { "queue.type", eCmdHdlrQueueType, 0 },
+ { "queue.workerthreads", eCmdHdlrInt, 0 },
+ { "queue.timeoutshutdown", eCmdHdlrInt, 0 },
+ { "queue.timeoutactioncompletion", eCmdHdlrInt, 0 },
+ { "queue.timeoutenqueue", eCmdHdlrInt, 0 },
+ { "queue.timeoutworkerthreadshutdown", eCmdHdlrInt, 0 },
+ { "queue.workerthreadminimummessages", eCmdHdlrInt, 0 },
+ { "queue.maxfilesize", eCmdHdlrSize, 0 },
+ { "queue.saveonshutdown", eCmdHdlrBinary, 0 },
+ { "queue.dequeueslowdown", eCmdHdlrInt, 0 },
+ { "queue.dequeuetimebegin", eCmdHdlrInt, 0 },
+ { "queue.dequeuetimeend", eCmdHdlrInt, 0 },
+};
+static struct cnfparamblk pblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(cnfpdescr)/sizeof(struct cnfparamdescr),
+ cnfpdescr
+ };
+
+
/***********************************************************************
* we need a private data structure, the "to-delete" list. As C does
* not provide any partly private data structures, we implement this
@@ -1258,8 +1293,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
/* set some water marks so that we have useful defaults if none are set specifically */
- pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */
- pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */
+ pThis->iFullDlyMrk = -1;
+ pThis->iLightDlyMrk = -1;
pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir);
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
@@ -1273,42 +1308,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
- /* set type-specific handlers and other very type-specific things (we can not totally hide it...) */
- switch(qType) {
- case QUEUETYPE_FIXED_ARRAY:
- pThis->qConstruct = qConstructFixedArray;
- pThis->qDestruct = qDestructFixedArray;
- pThis->qAdd = qAddFixedArray;
- pThis->qDeq = qDeqFixedArray;
- pThis->qDel = qDelFixedArray;
- pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
- break;
- case QUEUETYPE_LINKEDLIST:
- pThis->qConstruct = qConstructLinkedList;
- pThis->qDestruct = qDestructLinkedList;
- pThis->qAdd = qAddLinkedList;
- pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
- pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
- pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
- break;
- case QUEUETYPE_DISK:
- pThis->qConstruct = qConstructDisk;
- pThis->qDestruct = qDestructDisk;
- pThis->qAdd = qAddDisk;
- pThis->qDeq = qDeqDisk;
- pThis->qDel = qDelDisk;
- pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
- /* special handling */
- pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
- break;
- case QUEUETYPE_DIRECT:
- pThis->qConstruct = qConstructDirect;
- pThis->qDestruct = qDestructDirect;
- pThis->qAdd = qAddDirect;
- pThis->qDel = qDelDirect;
- pThis->MultiEnq = qqueueMultiEnqObjDirect;
- break;
- }
INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
@@ -1891,6 +1890,52 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
ASSERT(pThis != NULL);
+ /* set type-specific handlers and other very type-specific things
+ * (we can not totally hide it...)
+ */
+ switch(pThis->qType) {
+ case QUEUETYPE_FIXED_ARRAY:
+ pThis->qConstruct = qConstructFixedArray;
+ pThis->qDestruct = qDestructFixedArray;
+ pThis->qAdd = qAddFixedArray;
+ pThis->qDeq = qDeqFixedArray;
+ pThis->qDel = qDelFixedArray;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
+ break;
+ case QUEUETYPE_LINKEDLIST:
+ pThis->qConstruct = qConstructLinkedList;
+ pThis->qDestruct = qDestructLinkedList;
+ pThis->qAdd = qAddLinkedList;
+ pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
+ pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
+ break;
+ case QUEUETYPE_DISK:
+ pThis->qConstruct = qConstructDisk;
+ pThis->qDestruct = qDestructDisk;
+ pThis->qAdd = qAddDisk;
+ pThis->qDeq = qDeqDisk;
+ pThis->qDel = qDelDisk;
+ pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
+ /* special handling */
+ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
+ break;
+ case QUEUETYPE_DIRECT:
+ pThis->qConstruct = qConstructDirect;
+ pThis->qDestruct = qDestructDirect;
+ pThis->qAdd = qAddDirect;
+ pThis->qDel = qDelDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ break;
+ }
+
+ if(pThis->iFullDlyMrk == -1)
+ pThis->iFullDlyMrk = pThis->iMaxQueueSize
+ - (pThis->iMaxQueueSize / 100) * 3; /* default 97% */
+ if(pThis->iLightDlyMrk == -1)
+ pThis->iLightDlyMrk = pThis->iMaxQueueSize
+ - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */
+
/* we need to do a quick check if our water marks are set plausible. If not,
* we correct the most important shortcomings. TODO: do that!!!! -- rgerhards, 2008-03-14
*/
@@ -2459,6 +2504,90 @@ finalize_it:
}
+/* take v6 config list and extract the queue params out of it. Hand the
+ * param values back to the caler. Caller is responsible for destructing
+ * them when no longer needed. Caller can use this param block to configure
+ * all parameters for a newly created queue with one call to qqueueSetParams().
+ * rgerhards, 2011-07-22
+ */
+rsRetVal
+qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals)
+{
+ *ppvals = nvlstGetParams(lst, &pblk, NULL);
+ return RS_RET_OK;
+}
+
+/* apply all params from param block to queue. Must be called before
+ * finalizing. This supports the v6 config system. Defaults were already
+ * set during queue creation. The pvals object is destructed by this
+ * function.
+ */
+rsRetVal
+qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals)
+{
+ int i;
+ for(i = 0 ; i < pblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(pblk.descr[i].name, "queue.filename")) {
+ pThis->pszFilePrefix = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
+ pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr);
+ } else if(!strcmp(pblk.descr[i].name, "queue.size")) {
+ pThis->iMaxQueueSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) {
+ pThis->iDeqBatchSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.maxdiskspace")) {
+ pThis->iMaxFileSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.highwatermark")) {
+ pThis->iHighWtrMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.lowwatermark")) {
+ pThis->iLowWtrMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.fulldelaymark")) {
+ pThis->iFullDlyMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.lightdelaymark")) {
+ pThis->iLightDlyMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.discardmark")) {
+ pThis->iDiscardMrk = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.discardseverity")) {
+ pThis->iDiscardSeverity = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.checkpointinterval")) {
+ pThis->iPersistUpdCnt = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.syncqueuefiles")) {
+ pThis->bSyncQueueFiles = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.type")) {
+ pThis->qType = (queueType_t) pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.workerthreads")) {
+ pThis->iNumWorkerThreads = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutshutdown")) {
+ pThis->toQShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutactioncompletion")) {
+ pThis->toActShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutenqueue")) {
+ pThis->toEnq = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.timeoutworkerthreadshutdown")) {
+ pThis->toWrkShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.workerthreadminimummessages")) {
+ pThis->iMinMsgsPerWrkr = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.maxfilesize")) {
+ pThis->iMaxFileSize = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.saveonshutdown")) {
+ pThis->bSaveOnShutdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.dequeueslowdown")) {
+ pThis->iDeqSlowdown = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queue.dequeuetimebegin")) {
+ pThis->iDeqtWinFromHr = pvals[i].val.d.n;
+ } else if(!strcmp(pblk.descr[i].name, "queuedequeuetimend.")) {
+ pThis->iDeqtWinToHr = pvals[i].val.d.n;
+ } else {
+ dbgprintf("queue: program error, non-handled "
+ "param '%s'\n", pblk.descr[i].name);
+ }
+ }
+ cnfparamvalsDestruct(pvals, &pblk);
+ return RS_RET_OK;
+}
+
+
/* some simple object access methods */
DEFpropSetMeth(qqueue, bSyncQueueFiles, int)
DEFpropSetMeth(qqueue, iPersistUpdCnt, int)
diff --git a/runtime/queue.h b/runtime/queue.h
index 97057180..c18b9f47 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -190,6 +190,9 @@ rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefi
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
+rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals);
+rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals);
+
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int);
diff --git a/runtime/rsconf.c b/runtime/rsconf.c
index 20c3b4f0..61e8ca96 100644
--- a/runtime/rsconf.c
+++ b/runtime/rsconf.c
@@ -345,7 +345,10 @@ parser_errmsg(char *fmt, ...)
va_start(ap, fmt);
if(vsnprintf(errBuf, sizeof(errBuf), fmt, ap) == sizeof(errBuf))
- errBuf[1023] = '\0';
+ errBuf[sizeof(errBuf)-1] = '\0';
+dbgprintf("XXXX: msg: %s\n", errBuf);
+dbgprintf("XXXX: cnfcurrfn: %s\n", cnfcurrfn);
+dbgprintf("XXXX: yylineno: %d\n", yylineno);
errmsg.LogError(0, RS_RET_CONF_PARSE_ERROR,
"error during parsing file %s, on or before line %d: %s",
cnfcurrfn, yylineno, errBuf);
@@ -1258,6 +1261,7 @@ ourConf = loadConf; // TODO: remove, once ourConf is gone!
"run, but no output whatsoever is created.");
ABORT_FINALIZE(RS_RET_NO_ACTIONS);
}
+ tellLexEndParsing();
tellCoreConfigLoadDone();
tellModulesConfigLoadDone();
diff --git a/runtime/typedefs.h b/runtime/typedefs.h
index d46851f6..f994cbc4 100644
--- a/runtime/typedefs.h
+++ b/runtime/typedefs.h
@@ -160,6 +160,7 @@ typedef enum cslCmdHdlrType {
eCmdHdlrSeverity,
eCmdHdlrGetWord,
eCmdHdlrString,
+ eCmdHdlrQueueType,
eCmdHdlrGoneAway /* statment existed, but is no longer supported */
} ecslCmdHdrlType;