summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-08-01 15:56:34 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2011-08-01 15:56:34 +0200
commita789813b14de79c5ecac8b0d7f7c222ae7f47412 (patch)
tree7e850b1085fd1bdb2223154a3cc96480320ca334
parentd0bab973d2ddf93af71c859f3ed47dd40d561c76 (diff)
downloadrsyslog-a789813b14de79c5ecac8b0d7f7c222ae7f47412.tar.gz
rsyslog-a789813b14de79c5ecac8b0d7f7c222ae7f47412.tar.xz
rsyslog-a789813b14de79c5ecac8b0d7f7c222ae7f47412.zip
milestone: queue-params are properly initialized for action queues
-rw-r--r--action.c29
-rw-r--r--doc/v6compatibility.html4
-rw-r--r--runtime/queue.c34
-rw-r--r--runtime/queue.h1
4 files changed, 54 insertions, 14 deletions
diff --git a/action.c b/action.c
index 7be5841c..477051ca 100644
--- a/action.c
+++ b/action.c
@@ -363,7 +363,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr);
} else {
ustrncpy(pszQName, pThis->pszName, sizeof(pszQName));
- pszQName[63] = '\0'; /* to be on the save side */
+ pszQName[sizeof(pszQName)-1] = '\0'; /* to be on the save side */
}
/* now check if we can run the action in "firehose mode" during stage one of
@@ -409,20 +409,20 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize,
(rsRetVal (*)(void*, batch_t*, int*))processBatchMain));
obj.SetName((obj_t*) pThis->pQueue, pszQName);
-
- /* ... set some properties ... */
-# define setQPROP(func, directive, data) \
- CHKiRet_Hdlr(func(pThis->pQueue, data)) { \
- errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
- }
-# define setQPROPstr(func, directive, data) \
- CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \
- errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
- }
-
qqueueSetpUsr(pThis->pQueue, pThis);
- if(queueParams == NULL) {
- /* use legacy params */
+
+ if(queueParams == NULL) { /* use legacy params? */
+ /* ... set some properties ... */
+# define setQPROP(func, directive, data) \
+ CHKiRet_Hdlr(func(pThis->pQueue, data)) { \
+ errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", \
+ error %d. Ignored, running with default setting", iRet); \
+ }
+# define setQPROPstr(func, directive, data) \
+ CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \
+ errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", \
+ error %d. Ignored, running with default setting", iRet); \
+ }
setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", cs.iActionQueMaxDiskSpace);
setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", cs.iActionQueueDeqBatchSize);
setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", cs.iActionQueMaxFileSize);
@@ -444,6 +444,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", cs.iActionQueueDeqtWinToHr);
} else {
/* we have v6-style config params */
+ qqueueSetDefaultsActionQueue(pThis->pQueue);
qqueueApplyCnfParam(pThis->pQueue, queueParams);
}
diff --git a/doc/v6compatibility.html b/doc/v6compatibility.html
index 38f1e622..7c99ab63 100644
--- a/doc/v6compatibility.html
+++ b/doc/v6compatibility.html
@@ -76,6 +76,10 @@ to check for those cases as this means log data is potentially lost.
Please note that
the root problem is the same for earlier versions as well. With them, it was just harder
to spot why things went wrong (and if at all).
+<h2>Default Batch Sizes</h2>
+<p>Due to their positive effect on performance and comparatively low overhead,
+default batch sizes have been increased. Starting with 6.3.4, the action queues
+have a default batch size of 128 messages.
<h2>outchannels</h2>
<p>Outchannels are a to-be-removed feature of rsyslog, at least as far as the config
syntax is concerned. Nevertheless, v6 still supports it, but a new syntax is required
diff --git a/runtime/queue.c b/runtime/queue.c
index 5f224e7e..f390d987 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1318,6 +1318,40 @@ finalize_it:
}
+/* set default inisde queue object suitable for action queues.
+ * This shall be called directly after queue construction. This functions has
+ * been added in support of the new v6 config system. It expect properly pre-initialized
+ * objects, but we need to differentiate between ruleset main and action queues.
+ * In order to avoid unnecessary complexity, we provide the necessary defaults
+ * via specific function calls.
+ */
+void
+qqueueSetDefaultsActionQueue(qqueue_t *pThis)
+{
+ pThis->qType = QUEUETYPE_DIRECT; /* type of the main message queue above */
+ pThis->iMaxQueueSize = 1000; /* size of the main message queue above */
+ pThis->iDeqBatchSize = 128; /* default batch size */
+ pThis->iHighWtrMrk = 800; /* high water mark for disk-assisted queues */
+ pThis->iLowWtrMrk = 200; /* low water mark for disk-assisted queues */
+ pThis->iDiscardMrk = 9800; /* begin to discard messages */
+ pThis->iDiscardSeverity = 8; /* turn off */
+ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
+ pThis->iMaxFileSize = 1024*1024;
+ pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */
+ pThis->bSyncQueueFiles = 0;
+ pThis->toQShutdown = 0; /* queue shutdown */
+ pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */
+ pThis->toEnq = 2000; /* timeout for queue enque */
+ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */
+ pThis->iMinMsgsPerWrkr = 100; /* minimum messages per worker needed to start a new one */
+ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+ pThis->sizeOnDiskMax = 0; /* unlimited */
+ pThis->iDeqSlowdown = 0;
+ pThis->iDeqtWinFromHr = 0;
+ pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
+}
+
+
/* This function checks if the provided message shall be discarded and does so, if needed.
* In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
* provide real-time creation of spool files.
diff --git a/runtime/queue.h b/runtime/queue.h
index c18b9f47..2b1fcfa8 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -192,6 +192,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals);
rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals);
+void qqueueSetDefaultsActionQueue(qqueue_t *pThis);
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);