summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog6
-rw-r--r--action.c549
-rw-r--r--action.h15
-rw-r--r--doc/action-call.dot33
-rw-r--r--doc/action_state.dot33
-rw-r--r--doc/batch_state.dot28
-rw-r--r--doc/design.tex494
-rw-r--r--doc/dev_oplugins.html179
-rw-r--r--doc/queues.html27
-rw-r--r--doc/rsyslog_conf_global.html2
-rw-r--r--plugins/imtcp/imtcp.c12
-rw-r--r--plugins/ompgsql/ompgsql.c22
-rw-r--r--plugins/omtesting/omtesting.c156
-rw-r--r--runtime/conf.c2
-rw-r--r--runtime/module-template.h72
-rw-r--r--runtime/modules.c50
-rw-r--r--runtime/modules.h2
-rw-r--r--runtime/queue.c253
-rw-r--r--runtime/queue.h14
-rw-r--r--runtime/rsyslog.h12
-rw-r--r--runtime/wti.c18
-rw-r--r--runtime/wti.h16
-rw-r--r--runtime/wtp.c9
-rw-r--r--runtime/wtp.h4
-rw-r--r--tests/Makefile.am4
-rwxr-xr-xtests/da-mainmsg-q.sh62
-rw-r--r--tests/tcpflood.c15
-rw-r--r--tests/testsuites/da-mainmsg-q.conf21
-rw-r--r--tools/syslogd.c30
29 files changed, 1822 insertions, 318 deletions
diff --git a/ChangeLog b/ChangeLog
index d37b8e0b..fbe7f236 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,9 @@
+- added $MainMsgQueueDequeueBatchSize and $ActionQueueDequeueBatchSize
+ configuration directives
+- implemented a new transactional output module interface which provides
+ superior performance (for databases potentially far superior performance)
+- increased ompgsql performance by adapting to new transactional
+ output module interface
---------------------------------------------------------------------------
Version 4.3.1 [DEVEL] (rgerhards), 2009-04-??
- performance enhancemnt: imtcp calls parser no longer on input thread
diff --git a/action.c b/action.c
index 51620fce..509ad749 100644
--- a/action.c
+++ b/action.c
@@ -42,10 +42,13 @@
#include "cfsysline.h"
#include "srUtils.h"
#include "errmsg.h"
+#include "wti.h"
#include "datetime.h"
+#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
+
/* forward definitions */
-rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg);
+rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t*);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -64,6 +67,7 @@ static uchar *pszActionName; /* short name for the action */
/* main message queue and its configuration parameters */
static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
static int iActionQueueSize = 1000; /* size of the main message queue above */
+static int iActionQueueDeqBatchSize = 16; /* batch size for action queues */
static int iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */
static int iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */
static int iActionQDiscardMark = 9800; /* begin to discard messages */
@@ -144,6 +148,7 @@ actionResetQueueParams(void)
ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
iActionQueueSize = 1000; /* size of the main message queue above */
+ iActionQueueDeqBatchSize = 16; /* default batch size */
iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */
iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */
iActionQDiscardMark = 9800; /* begin to discard messages */
@@ -255,7 +260,8 @@ actionConstructFinalize(action_t *pThis)
* to be run on multiple threads. So far, this is forbidden by the interface
* spec. -- rgerhards, 2008-01-30
*/
- CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
+ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize,
+ (rsRetVal (*)(void*,aUsrp_t*))actionCallDoActionMULTIQUEUE));
obj.SetName((obj_t*) pThis->pQueue, pszQName);
/* ... set some properties ... */
@@ -270,6 +276,7 @@ actionConstructFinalize(action_t *pThis)
qqueueSetpUsr(pThis->pQueue, pThis);
setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
+ setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", iActionQueueDeqBatchSize);
setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
@@ -305,87 +312,246 @@ finalize_it:
}
-/* set an action back to active state -- rgerhards, 2007-08-02
+
+/* set the global resume interval
+ */
+rsRetVal actionSetGlobalResumeInterval(int iNewVal)
+{
+ glbliActionResumeInterval = iNewVal;
+ return RS_RET_OK;
+}
+
+
+/* returns the action state name in human-readable form
+ * returned string must not be modified.
+ * rgerhards, 2009-05-07
+ */
+static uchar *getActStateName(action_t *pThis)
+{
+ switch(pThis->eState) {
+ case ACT_STATE_RDY:
+ return (uchar*) "rdy";
+ case ACT_STATE_ITX:
+ return (uchar*) "itx";
+ case ACT_STATE_RTRY:
+ return (uchar*) "rtry";
+ case ACT_STATE_SUSP:
+ return (uchar*) "susp";
+ case ACT_STATE_DIED:
+ return (uchar*) "died";
+ case ACT_STATE_COMM:
+ return (uchar*) "comm";
+ default:
+ return (uchar*) "ERROR/UNKNWON";
+ }
+}
+
+
+/* returns a suitable return code based on action state
+ * rgerhards, 2009-05-07
*/
-static rsRetVal actionResume(action_t *pThis)
+static rsRetVal getReturnCode(action_t *pThis)
{
DEFiRet;
ASSERT(pThis != NULL);
- pThis->bSuspended = 0;
+ switch(pThis->eState) {
+ case ACT_STATE_RDY:
+ iRet = RS_RET_OK;
+ break;
+ case ACT_STATE_ITX:
+ if(pThis->bHadAutoCommit) {
+ pThis->bHadAutoCommit = 0; /* auto-reset */
+ iRet = RS_RET_PREVIOUS_COMMITTED;
+ } else {
+ iRet = RS_RET_DEFER_COMMIT;
+ }
+ break;
+ case ACT_STATE_RTRY:
+ iRet = RS_RET_SUSPENDED;
+ break;
+ case ACT_STATE_SUSP:
+ iRet = RS_RET_SUSPENDED;
+ break;
+ case ACT_STATE_DIED:
+ iRet = RS_RET_DISABLE_ACTION;
+ break;
+ default:
+ DBGPRINTF("Invalid action engine state %d, program error\n",
+ (int) pThis->eState);
+ iRet = RS_RET_ERR;
+ break;
+ }
RETiRet;
}
-/* set the global resume interval
+/* set the action to a new state
+ * rgerhards, 2007-08-02
*/
-rsRetVal actionSetGlobalResumeInterval(int iNewVal)
+static inline void actionSetState(action_t *pThis, action_state_t newState)
{
- glbliActionResumeInterval = iNewVal;
- return RS_RET_OK;
+ pThis->eState = newState;
+ DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis));
}
+/* Handles the transient commit state. So far, this is
+ * mostly a dummy...
+ * rgerhards, 2007-08-02
+ */
+static void actionCommitted(action_t *pThis)
+{
+ actionSetState(pThis, ACT_STATE_RDY);
+}
-/* suspend an action -- rgerhards, 2007-08-02
+
+/* set action to "rtry" state.
+ * rgerhards, 2007-08-02
+ */
+static void actionRetry(action_t *pThis)
+{
+ actionSetState(pThis, ACT_STATE_RTRY);
+}
+
+
+/* Disable action, this means it will never again be usable
+ * until rsyslog is reloaded. Use only as a last resort, but
+ * depends on output module.
+ * rgerhards, 2007-08-02
+ */
+static void actionDisable(action_t *pThis)
+{
+ actionSetState(pThis, ACT_STATE_DIED);
+}
+
+
+/* Suspend action, this involves changing the acton state as well
+ * as setting the next retry time.
+ * if we have more than 10 retries, we prolong the
+ * retry interval. If something is really stalled, it will
+ * get re-tried only very, very seldom - but that saves
+ * CPU time. TODO: maybe a config option for that?
+ * rgerhards, 2007-08-02
*/
-static rsRetVal actionSuspend(action_t *pThis, time_t tNow)
+static inline void actionSuspend(action_t *pThis, time_t ttNow)
{
+ if(ttNow == NO_TIME_PROVIDED)
+ time(&ttNow);
+ pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
+ actionSetState(pThis, ACT_STATE_SUSP);
+ DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry);
+}
+
+
+/* actually do retry processing. Note that the function receives a timestamp so
+ * that we do not need to call the (expensive) time() API.
+ * Note that we do the full retry processing here, doing the configured number of
+ * iterations.
+ * rgerhards, 2009-05-07
+ */
+static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
+{
+ int iRetries;
+ int iSleepPeriod;
DEFiRet;
ASSERT(pThis != NULL);
- pThis->bSuspended = 1;
- pThis->ttResumeRtry = tNow + pThis->iResumeInterval;
- pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */
+
+RUNLOG_STR("actionDoRetry():");
+ iRetries = 0;
+ while(pThis->eState == ACT_STATE_RTRY) {
+ iRet = pThis->pMod->tryResume(pThis->pModData);
+ if(iRet == RS_RET_OK) {
+ actionSetState(pThis, ACT_STATE_RDY);
+RUNLOG_STR("tryResume succeeded");
+ } else if(iRet == RS_RET_SUSPENDED) {
+RUNLOG_STR("still suspended");;
+ /* max retries reached? */
+ if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
+ actionSuspend(pThis, ttNow);
+ } else {
+ ++pThis->iNbrResRtry;
+ ++iRetries;
+ iSleepPeriod = pThis->iResumeInterval;
+ ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */
+ srSleep(iSleepPeriod, 0);
+ }
+ } else if(iRet == RS_RET_DISABLE_ACTION) {
+ actionDisable(pThis);
+ }
+ }
+
+ if(pThis->eState == ACT_STATE_RDY) {
+ pThis->iNbrResRtry = 0;
+ }
RETiRet;
}
/* try to resume an action -- rgerhards, 2007-08-02
- * returns RS_RET_OK if resumption worked, RS_RET_SUSPEND if the
- * action is still suspended.
+ * changed to new action state engine -- rgerhards, 2009-05-07
*/
static rsRetVal actionTryResume(action_t *pThis)
{
DEFiRet;
- time_t ttNow;
+ time_t ttNow = NO_TIME_PROVIDED;
ASSERT(pThis != NULL);
- /* for resume handling, we must always obtain a fresh timestamp. We used
- * to use the action timestamp, but in this case we will never reach a
- * point where a resumption is actually tried, because the action timestamp
- * is always in the past. So we can not avoid doing a fresh time() call
- * here. -- rgerhards, 2009-03-18
- */
- time(&ttNow); /* cache "now" */
-
- /* first check if it is time for a re-try */
- if(ttNow > pThis->ttResumeRtry) {
- iRet = pThis->pMod->tryResume(pThis->pModData);
- if(iRet == RS_RET_SUSPENDED) {
- /* set new tryResume time */
- ++pThis->iNbrResRtry;
- /* if we have more than 10 retries, we prolong the
- * retry interval. If something is really stalled, it will
- * get re-tried only very, very seldom - but that saves
- * CPU time. TODO: maybe a config option for that?
- * rgerhards, 2007-08-02
- */
- pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
+RUNLOG_STR("actionTryResume()");
+ if(pThis->eState == ACT_STATE_SUSP) {
+ /* if we are suspended, we need to check if the timeout expired.
+ * for this handling, we must always obtain a fresh timestamp. We used
+ * to use the action timestamp, but in this case we will never reach a
+ * point where a resumption is actually tried, because the action timestamp
+ * is always in the past. So we can not avoid doing a fresh time() call
+ * here. -- rgerhards, 2009-03-18
+ */
+ time(&ttNow); /* cache "now" */
+ if(ttNow > pThis->ttResumeRtry) {
+ actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */
}
- } else {
- /* it's too early, we are still suspended --> indicate this */
- iRet = RS_RET_SUSPENDED;
}
- if(iRet == RS_RET_OK)
- actionResume(pThis);
+ if(pThis->eState == ACT_STATE_RTRY) {
+ if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */
+ time(&ttNow);
+ CHKiRet(actionDoRetry(pThis, ttNow));
+ }
+
+ DBGPRINTF("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n",
+ getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* prepare an action for performing work. This involves trying to recover it,
+ * depending on its current state.
+ * rgerhards, 2009-05-07
+ */
+static rsRetVal actionPrepare(action_t *pThis)
+{
+ DEFiRet;
+
+RUNLOG_STR("actionPrepare()");
+ assert(pThis != NULL);
+ if(pThis->eState == ACT_STATE_RTRY) {
+ CHKiRet(actionTryResume(pThis));
+ }
- dbgprintf("actionTryResume: iRet: %d, next retry (if applicable): %u [now %u]\n",
- iRet, (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
+ /* if we are now ready, we initialize the transaction and advance
+ * action state accordingly
+ */
+ if(pThis->eState == ACT_STATE_RDY) {
+ CHKiRet(pThis->pMod->mod.om.beginTransaction(pThis->pModData));
+ actionSetState(pThis, ACT_STATE_ITX);
+ }
+finalize_it:
RETiRet;
}
@@ -402,12 +568,11 @@ rsRetVal actionDbgPrint(action_t *pThis)
dbgprintf("\n\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData);
dbgprintf("\tRepeatedMsgReduction: %d\n", pThis->f_ReduceRepeated);
dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval);
- dbgprintf("\tSuspended: %d", pThis->bSuspended);
- if(pThis->bSuspended) {
- dbgprintf(" next retry: %u, number retries: %d", (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry);
+ if(pThis->eState == ACT_STATE_SUSP) {
+ dbgprintf("\tresume next retry: %u, number retries: %d",
+ (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry);
}
- dbgprintf("\n");
- dbgprintf("\tDisabled: %d\n", !pThis->bEnabled);
+ dbgprintf("\tState: %s\n", getActStateName(pThis));
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
dbgprintf("\n");
@@ -415,24 +580,16 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
-/* call the DoAction output plugin entry point
- * rgerhards, 2008-01-28
+/* prepare the calling parameters for doAction()
+ * rgerhards, 2009-05-07
*/
-#pragma GCC diagnostic ignored "-Wempty-body"
-rsRetVal
-actionCallDoAction(action_t *pAction, msg_t *pMsg)
+static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar ***pppMsgs)
{
- DEFiRet;
- int iRetries;
+ uchar **ppMsgs = *pppMsgs;
int i;
- int iArr;
- int iSleepPeriod;
- int bCallAction;
- int iCancelStateSave;
- uchar **ppMsgs; /* array of message pointers for doAction */
+ DEFiRet;
ASSERT(pAction != NULL);
-
/* create the array for doAction() message pointers */
if((ppMsgs = calloc(pAction->iNumTpls, sizeof(uchar *))) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -450,77 +607,230 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg)
default:assert(0); /* software bug if this happens! */
}
}
- iRetries = 0;
- /* We now must guard the output module against execution by multiple threads. The
- * plugin interface specifies that output modules must not be thread-safe (except
- * if they notify us they are - functionality not yet implemented...).
- * rgerhards, 2008-01-30
- */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pAction->mutActExec);
- pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
- pthread_setcancelstate(iCancelStateSave, NULL);
- do {
- /* on first invocation, this if should never be true. We just put it at the top
- * of the loop so that processing (and code) is simplified. This code is actually
- * triggered on the 2nd+ invocation. -- rgerhards, 2008-01-30
- */
- if(iRet == RS_RET_SUSPENDED) {
- /* ok, this calls for our retry logic... */
- ++iRetries;
- iSleepPeriod = pAction->iResumeInterval;
- srSleep(iSleepPeriod, 0);
- }
- /* first check if we are suspended and, if so, retry */
- if(actionIsSuspended(pAction)) {
- iRet = actionTryResume(pAction);
- if(iRet == RS_RET_OK)
- bCallAction = 1;
- else
- bCallAction = 0;
- } else {
- bCallAction = 1;
- }
-
- if(bCallAction) {
- /* call configured action */
- iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData);
- if(iRet == RS_RET_SUSPENDED) {
- dbgprintf("Action requested to be suspended, done that.\n");
- actionSuspend(pAction, getActNow(pAction));
- }
- }
- } while(iRet == RS_RET_SUSPENDED && (pAction->iResumeRetryCount == -1 || iRetries < pAction->iResumeRetryCount)); /* do...while! */
+finalize_it:
+ *pppMsgs = ppMsgs;
+ RETiRet;
+}
- if(iRet == RS_RET_DISABLE_ACTION) {
- dbgprintf("Action requested to be disabled, done that.\n");
- pAction->bEnabled = 0; /* that's it... */
- }
- pthread_cleanup_pop(1); /* unlock mutex */
+/* cleanup doAction calling parameters
+ * rgerhards, 2009-05-07
+ */
+static rsRetVal cleanupDoActionParams(action_t *pAction, uchar ***pppMsgs)
+{
+ uchar **ppMsgs = *pppMsgs;
+ int i;
+ int iArr;
+ DEFiRet;
-finalize_it:
- /* cleanup */
+ ASSERT(pAction != NULL);
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
if(ppMsgs[i] != NULL) {
switch(pAction->eParamPassing) {
case ACT_ARRAY_PASSING:
iArr = 0;
while(((char **)ppMsgs[i])[iArr] != NULL)
- d_free(((char **)ppMsgs[i])[iArr++]);
- d_free(ppMsgs[i]);
+ free(((char **)ppMsgs[i])[iArr++]);
+ free(ppMsgs[i]);
break;
case ACT_STRING_PASSING:
- d_free(ppMsgs[i]);
+ free(ppMsgs[i]);
break;
default:
assert(0);
}
}
}
- d_free(ppMsgs);
- msgDestruct(&pMsg); /* we are now finished with the message */
+ free(ppMsgs);
+ *pppMsgs = NULL;
+
+ RETiRet;
+}
+
+
+/* call the DoAction output plugin entry point
+ * Performance note: we build the action parameters here in this function. That
+ * means we do it while we hold the action look, potentially reducing concurrency
+ * (especially if the action queue is run in DIRECT mode). As an alternative, we
+ * may generate all params for the batch as whole before aquiring the action. However,
+ * that requires more memory, for large batches potentially a lot of memory. So for the
+ * time being, I am doing it here - the performance hit should be very minor and may even
+ * not be a hit because we may gain CPU cache locality gains with the "fewer memory"
+ * approach (I'd say that is rater likely).
+ * rgerhards, 2008-01-28
+ */
+rsRetVal
+actionCallDoAction(action_t *pThis, msg_t *pMsg)
+{
+ uchar **ppMsgs; /* array of message pointers for doAction */
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+ ISOBJ_TYPE_assert(pMsg, msg);
+
+ DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis));
+ CHKiRet(prepareDoActionParams(pThis, pMsg, &ppMsgs));
+
+ pThis->bHadAutoCommit = 0;
+ iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData);
+ switch(iRet) {
+ case RS_RET_OK:
+ actionCommitted(pThis);
+ break;
+ case RS_RET_DEFER_COMMIT:
+ /* we are done, action state remains the same */
+ break;
+ case RS_RET_PREVIOUS_COMMITTED:
+ /* action state remains the same, but we had a commit. */
+ pThis->bHadAutoCommit = 1;
+ break;
+ case RS_RET_SUSPENDED:
+ actionRetry(pThis);
+ break;
+ case RS_RET_DISABLE_ACTION:
+ actionDisable(pThis);
+ break;
+ default:/* permanent failure of this message - no sense in retrying. This is
+ * not yet handled (but easy TODO)
+ */
+ FINALIZE;
+ }
+ iRet = getReturnCode(pThis);
+
+finalize_it:
+ cleanupDoActionParams(pThis, &ppMsgs); /* iRet ignored! */
+
+ RETiRet;
+}
+
+
+/* process a message
+ * this readies the action and then calls doAction()
+ * rgerhards, 2008-01-28
+ */
+rsRetVal
+actionProcessMessage(action_t *pThis, msg_t *pMsg)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+ ISOBJ_TYPE_assert(pMsg, msg);
+
+RUNLOG_STR("inside actionProcessMsg()");
+ CHKiRet(actionPrepare(pThis));
+ if(pThis->eState == ACT_STATE_ITX)
+ CHKiRet(actionCallDoAction(pThis, pMsg));
+
+ iRet = getReturnCode(pThis);
+finalize_it:
+ RETiRet;
+}
+
+
+/* finish processing a batch. Most importantly, that means we commit if we
+ * need to do so.
+ * rgerhards, 2008-01-28
+ */
+static rsRetVal
+finishBatch(action_t *pThis)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ if(pThis->eState == ACT_STATE_RDY)
+ FINALIZE; /* nothing to do */
+
+ CHKiRet(actionPrepare(pThis));
+ if(pThis->eState == ACT_STATE_ITX) {
+ iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData);
+ switch(iRet) {
+ case RS_RET_OK:
+ actionCommitted(pThis);
+ break;
+ case RS_RET_SUSPENDED:
+ actionRetry(pThis);
+ break;
+ case RS_RET_DISABLE_ACTION:
+ actionDisable(pThis);
+ break;
+ case RS_RET_DEFER_COMMIT:
+ DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT "
+ "- ignored\n");
+ actionCommitted(pThis);
+ break;
+ case RS_RET_PREVIOUS_COMMITTED:
+ DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED "
+ "- ignored\n");
+ actionCommitted(pThis);
+ break;
+ default:/* permanent failure of this message - no sense in retrying. This is
+ * not yet handled (but easy TODO)
+ */
+ FINALIZE;
+ }
+ }
+ iRet = getReturnCode(pThis);
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* receive an array of to-process user pointers and submit them
+ * for processing.
+ * rgerhards, 2009-04-22
+ */
+rsRetVal
+actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, aUsrp_t *paUsrp)
+{
+ int i;
+ msg_t *pMsg;
+ rsRetVal localRet;
+ DEFiRet;
+
+ assert(paUsrp != NULL);
+
+ for(i = 0 ; i < paUsrp->nElem ; i++) {
+ pMsg = (msg_t*) paUsrp->pUsrp[i];
+dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
+ localRet = actionProcessMessage(pAction, pMsg);
+ dbgprintf("action call returned %d\n", localRet);
+ msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */
+ CHKiRet(localRet);
+ }
+ iRet = finishBatch(pAction);
+
+finalize_it:
+ RETiRet;
+}
+#pragma GCC diagnostic ignored "-Wempty-body"
+/* receive an array of to-process user pointers and submit them
+ * for processing.
+ * rgerhards, 2009-04-22
+ */
+rsRetVal
+actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp)
+{
+ int iCancelStateSave;
+ DEFiRet;
+
+ assert(paUsrp != NULL);
+
+ /* We now must guard the output module against execution by multiple threads. The
+ * plugin interface specifies that output modules must not be thread-safe (except
+ * if they notify us they are - functionality not yet implemented...).
+ * rgerhards, 2008-01-30
+ */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(&pAction->mutActExec);
+ pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+
+ iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, paUsrp);
+
+ pthread_cleanup_pop(1); /* unlock mutex */
RETiRet;
}
@@ -762,8 +1072,8 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
* should check from time to time if affairs have improved.
* rgerhards, 2007-07-24
*/
- if(pAction->bEnabled == 0) {
- ABORT_FINALIZE(RS_RET_OK);
+ if(pAction->eState == ACT_STATE_DIED) {
+ ABORT_FINALIZE(RS_RET_DISABLE_ACTION);
}
pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
@@ -832,6 +1142,7 @@ actionAddCfSysLineHdrl(void)
CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL));
@@ -948,7 +1259,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
dbgprintf("module is incompatible with RepeatedMsgReduction - turned off\n");
pAction->f_ReduceRepeated = 0;
}
- pAction->bEnabled = 1; /* action is enabled */
+ pAction->eState = ACT_STATE_RDY; /* action is enabled */
if(bSuspended)
actionSuspend(pAction, time(NULL)); /* "good" time call, only during init and unavoidable */
diff --git a/action.h b/action.h
index 2a1487a5..0a4ff15b 100644
--- a/action.h
+++ b/action.h
@@ -36,6 +36,15 @@
extern int glbliActionResumeRetryCount;
+typedef enum {
+ ACT_STATE_DIED = 0, /* action permanently failed and now disabled - MUST BE ZEO! */
+ ACT_STATE_RDY = 1, /* action ready, waiting for new transaction */
+ ACT_STATE_ITX = 2, /* transaction active, waiting for new data or commit */
+ ACT_STATE_COMM = 3, /* transaction finished (a transient state) */
+ ACT_STATE_RTRY = 4, /* failure occured, trying to restablish ready state */
+ ACT_STATE_SUSP = 5 /* suspended due to failure (return fail until timeout expired) */
+} action_state_t;
+
/* the following struct defines the action object data structure
*/
struct action_s {
@@ -45,8 +54,10 @@ struct action_s {
time_t tLastExec; /* time this action was last executed */
int bExecWhenPrevSusp;/* execute only when previous action is suspended? */
int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */
- short bEnabled; /* is the related action enabled (1) or disabled (0)? */
- short bSuspended; /* is the related action temporarily suspended? */
+ action_state_t eState; /* current state of action */
+ int bHadAutoCommit; /* did an auto-commit happen during doAction()? */
+ //short bEnabled; /* is the related action enabled (1) or disabled (0)? */
+ //short bSuspended; /* is the related action temporarily suspended? */
time_t ttResumeRtry; /* when is it time to retry the resume? */
int iResumeInterval;/* resume interval for this action */
int iResumeRetryCount;/* how often shall we retry a suspended action? (-1 --> eternal) */
diff --git a/doc/action-call.dot b/doc/action-call.dot
new file mode 100644
index 00000000..86c6834d
--- /dev/null
+++ b/doc/action-call.dot
@@ -0,0 +1,33 @@
+// This file is part of rsyslog.
+//
+// rsyslog action call state diagram
+//
+// see http://www.graphviz.org for how to obtain the graphviz processor
+// which is used to build the actual graph.
+//
+// generate the graph with
+// $ dot action-call.dot -Tpng >action-call.png
+
+digraph G {
+ label="\n\nrsyslog message states during action processing\nhttp://www.rsyslog.com";
+ //fontsize=20;
+
+ ok [label="ready for processing" color="green"];
+ mpf [label="message permanent failure" color="red"];
+ tf [label="temporary failure"]
+ cPen [label="commit pending"];
+ com [label="committed" color="red"];
+
+ tf -> tf [label="retry fails, i < n"];
+ tf -> mpf [label="retry fails, i = n"];
+ tf -> ok [label="retry succeeds"];
+ ok -> com [label="doAction RS_RET_OK"];
+ ok -> cPen [label="doAction COMMIT_PENDING"];
+ ok -> tf [label="doAction RS_RET_SUSPENDED"];
+ ok -> mpf [label="doAction RS_RET_DISABLED"];
+ cPen -> com [label="endTransaction RS_RET_OK"];
+ cPen -> tf [label="endTransaction _SUSPENDED"];
+
+ //{rank=same; tf cPen}
+ {rank=same; com mpf}
+}
diff --git a/doc/action_state.dot b/doc/action_state.dot
new file mode 100644
index 00000000..d56d9da0
--- /dev/null
+++ b/doc/action_state.dot
@@ -0,0 +1,33 @@
+// This file is part of rsyslog.
+//
+// rsyslog message state diagram
+//
+// see http://www.graphviz.org for how to obtain the graphviz processor
+// which is used to build the actual graph.
+//
+// generate the graph with
+// $ dot file.dot -Tpng >file.png
+
+digraph msgState {
+ compound=true; nodesep=1.0
+ //label="\n\nrsyslog action transaction states\nhttp://www.rsyslog.com";
+ //fontsize=20;
+
+ rdy [label="ready" group="main"];
+ itx [label="in Tx" group="main"];
+ comm [label="commit"]
+ rtry [label="retry"]
+ susp [label="suspended"]
+
+ rdy -> itx [label="transaction begins"]
+ itx -> itx [label="success"]
+ itx -> comm [label="commit\n(caller or auto)"]
+ itx -> rtry [label="error"]
+ comm -> rdy [label="success"]
+ comm -> rtry [label="error"]
+ rtry -> rdy [label="recovered"]
+ rtry -> susp [label="could not\nrecover"]
+ susp -> rtry [label="timeout expired"]
+
+ {rank=same; comm rtry}
+}
diff --git a/doc/batch_state.dot b/doc/batch_state.dot
new file mode 100644
index 00000000..0dd48b47
--- /dev/null
+++ b/doc/batch_state.dot
@@ -0,0 +1,28 @@
+// This file is part of rsyslog.
+//
+// rsyslog batch state diagram
+//
+// see http://www.graphviz.org for how to obtain the graphviz processor
+// which is used to build the actual graph.
+//
+// generate the graph with
+// $ dot file.dot -Tpng >file.png
+
+digraph msgState {
+ compound=true; nodesep=1.0
+ //label="\n\nrsyslog batch states\nhttp://www.rsyslog.com";
+ rankdir=LR
+
+ rdy [label="ready"];
+ bad [label="message-caused\nfailure"];
+ sub [label="submitted"]
+ disc [label="discarded" color="red"]
+
+ rdy -> sub [label="submitted to action"]
+ rdy -> bad [label="permanent fail"]
+ rdy -> disc [label="action requests discarding"]
+ sub -> rdy [label="next action or\naction-caused failure"]
+ bad -> rdy [label="next action"]
+
+ //{rank=same; comm rtry }
+}
diff --git a/doc/design.tex b/doc/design.tex
new file mode 100644
index 00000000..c9bfcdfc
--- /dev/null
+++ b/doc/design.tex
@@ -0,0 +1,494 @@
+\documentclass[a4paper,10pt]{article}
+\usepackage{amsmath}
+\usepackage{amsfonts}
+\usepackage{amssymb}
+\usepackage{graphicx}
+\usepackage{listings}
+\usepackage{algorithm,algorithmic}
+
+\newcommand{\IN}{\mathbb{N}}
+\newcommand{\MM}{\mathcal{M}}
+\newcommand{\QQ}{\mathcal{Q}}
+\newcommand{\AAA}{\mathcal{A}}
+\title{Rsyslog Design and Internals}
+\author{Rainer Gerhards}
+
+\begin{document}
+
+\maketitle
+
+\begin{abstract}
+This paper describes rsyslog design and internals. It is created to facilitate a discussion about the implementation of "batched queue processing". As such, it does not describe the full design of rsyslog but rather those elements that are relevant to queues. However, the document may be expanded in the future.
+\end{abstract}
+
+\tableofcontents
+
+\section{Preliminaries}
+\subsection{Notational Conventions}
+In general, in rsyslog there exists single objects $o$, which are used to build larger sets $O$, which form a superset $\mathcal{O}$ of all those objects that exist at a given time inside a running instance of rsyslog. As seen above, single objects are always described by lower case letters ($o$), larger sets by upper case letters ($O$) and the ``all-sets'' in caligraphic letters ($\mathcal{O}$). Often, objects $O_i, i \in \IN, i \le |\mathcal{O}|$ partition $\mathcal{O}$, but this is not necessarily the case.
+
+\section{Overall Design}
+From a high-level prespective, rsyslogd is ``just'' a high-performance message router. It accepts messages from various sources, applies user-configured filters to them, and routes potentially transformed messages to destinations based on these filters.
+\section{Objects}
+\subsection{Plugins}
+Plugins provide code potentially written by a third party to extend rsyslog.
+
+Conceptually, a plugin is a tupel of callable functions $(\phi_1, \phi_2, \ldots)$ which implement an interface. There are three different types of plugins: input, output and libraray. The plugin type denotes the primary interface implemented by the plugin. Additional interfaces may be implemented\footnote{This is not yet done in plugins, but is possible and assumed to be done at a later point in time}.
+
+In the context of this paper, the output plugin interface is most important. It implements three entry points:
+
+\paragraph{doAction()}
+is used to submit messages to the output plugin. The entry point may or may not commit the messages to their ultimate destination.
+
+\paragraph{beginTransaction()}
+is used to inform the plugin that a new transaction begins. It must prepare for processing.
+
+\paragraph{endTransaction()}
+is indicated that the upper layer \emph{needs} to close the transaction. If there is any uncommited data left, it must be commited or rolled back.
+
+Every instance of an output plugin is guaranteed \emph{not} to be called concurrently by multiple threads. Further, no context switch will happen between calls to $doAction()$ and $endTransaction()$.
+
+\subsection{State Sets}
+Several object have associated state based on a specific state set. These state sets are described together with the objects.
+
+As a general rule, individual state is associated with all intances $o$ of a class of objects. This state is called the object's \marginpar{state component} \emph{state component} $s$. If we want to obtain an object's state, we write $S(o)$. Please note that $S(o)$ is only defined for those objects that have a state component.
+
+\subsection{Messages}
+A message $m$ represents a a single syslog message inside the system. It is a tuple of attributes. Some of these attributes directly orginate from the message content, some others are meta-information taken from the context. For example, there is an meta-attribute ``time of reception'' which conveys when the message was received by rsyslog's input subsystem. We do not list attributes here, as there are many and it is not of importance which exactly they are.
+
+The set $\MM$ is composed of all messages that exist at a given time inside rsyslog.
+
+\subsection{Queue}
+A queue
+$$Q = (C, \Phi, M)$$
+is a triplet of a set of configuration parameters $C$, a set of callbacks $\Phi$ and a set of messages $M \subseteq \MM$.
+
+If we need to obtain the set of message from a queue, we write $M(Q)$. The elements of the set of configuration parameters are written as $C_{param}$ where $param$ is an abbreviation of the parameter's meaning. To obtain a specific parameter from a queue, we write $C_{param}(Q)$. The most important elements of $C$ are:
+
+\paragraph{$C_{type}$} which denotes the queue implementation type. Most importantly, this selects from a set of queue drivers (for example disk-only or in-memory driver), which affects the basic operation of the queue instance.
+
+\paragraph{$C_{mMsg}$} which denotes the upper bound on the cardinality of $M$.
+
+\paragraph{$C_{mBatch}$} which denotes the upper bound of the cardinality of message batches created for this queue.
+
+Be $\QQ = \{Q_m, Q_1, Q_2, \ldots, Q_{|\AAA|}\}$ the set of all queues that exist inside rsyslog after the configuration file has been processed, with $|\QQ| = |\AAA| + 1$.
+
+Then
+$$M_0 = \MM \setminus \bigcup_{i=1}^{|\QQ|} Q_i(M)$$
+\marginpar{at-risk-set}is the set of non-queued messages. The messages have either never been enqueued or have been dequeued but not finally been processed. This set represents the messages that may potentially be lost during an unclean shutdown of rsyslogd. This is why I call this set the ``\emph{at-risk-set}''.
+
+
+\subsection{Batches}
+A batch represents multiple processable messages. It is a unit of processing inside rsyslog's output system. Batches are used to dequeue a number of messages from a queue and then submit them to the lower action layer. Batches are natural \emph{transaction boundaries}, in the sense that multiple output transactions may be done on the messages inside a batch, but each transaction must end at the end of the batch. A batch is always associated to a specific queue $Q$.
+
+A batch
+$$B = (b_1, b_2, \ldots, b_n )$$
+is a $n$-tuple of \marginpar{processable message}processable messages
+$$b = (m, s)$$
+which are an ordered pair of a message $m$ and an associated processing state $s$. To denote the $n$-th message inside the batch, we write $m(b_n)$, to denote the status component of the $n$-th message, we write $S(b_n)$.
+
+\begin{figure}
+\begin{center}
+\includegraphics[scale=0.4]{batch_state.jpeg}
+\end{center}
+\caption{batch message processing states}
+\label{fig_batchmsg_states}
+\end{figure}
+
+The state set for the processing states is defined as follows:
+$$
+S_B = \{ rdy, bad, sub, disc \}
+$$
+
+With the semantics of the various states being the following:
+
+\begin{center}
+\begin{tabular}{|l|l|} \hline
+ State & Semantics \\\hline
+ rdy & ready for processing\\
+ bad & this message triggered an unrecoverable failure in action\\
+ & processing and must not be resubmitted to this action\\
+ sub & message submitted for processsing, result yet unknown \\
+ disc & action sucessfully processed, but must not be submitted \\
+ & to any further action in action unit \\\hline
+\end{tabular}
+\end{center}
+The associated state diagram is shown in figure \ref{fig_batchmsg_states} on page \pageref{fig_batchmsg_states}.
+
+Batch sizes vary. The actual cardinality is a function of the cardinality of $M(Q)$ at the time of batch creation and the queue configuration:
+
+$$1 \leq |B| \leq \max(C_{mBatch}(Q), |M(Q)|)$$
+
+\subsection{Action Unit}
+An action unit
+$$u = (f, a_1, \ldots, a_n), a_i \in \AAA \text{ for } i \in \IN, i \le n$$
+is a tuple consisting of a filter function $f$ and $n \in \IN$ actions. \emph{Does rsyslog still support nonsense action units with $n=0$? - check!}
+
+\subsection{Action}
+An action
+$$a = (a_C, a_\psi)$$
+is an ordered pair of a tuple of configuration attributes $a_C$, and a tuple of processing functions $a_\psi$. Be the set $\AAA$ composed of all actions that exist in rsyslog after the configuration file has been processed.
+
+
+\section{Processing}
+\subsection{Object States}
+Various objects keep state. Some of these objects, like messages, batches and actions seem to share state. However, thinking about shared state leads to very complex setup. As such, state is modelled for each object $o$ individually. Instead, the state function $S_O(o)$ can be used to obtain an obtain an individual objects state. That state can be used to modify the state diagrams of the other objects with which relationships exist.
+
+\subsubsection{Actions}
+Actions are provided by output plugins. An action enables the engine to write messages to some destination. It is important to note that ``destination'' is a very broad abstraction. A destination may be a file inside a local or remote file system, a database table or a remote syslog server in another network.
+
+Actions are transactional in the following sense: more than one message can be submitted to an action. The action does not necessarily process the submitted messages unless the caller ends the transaction. However, the action itself may also end the transaction and notify the caller. This is \emph{not} considered an error condition and \emph{must} be handeled gracefully by the caller. If an transaction aborts, the caller \emph{must} assume that none of the elements submitted since the begin of transaction have been processed. The action will try to backout anything that was already processed at the time the transaction failed. However, not all outputs work on actually transactional destination. As such, an action is permitted not to backout incomplete interim results. As such, after a transaction abort, some message duplication may occur. We call this the \emph{relaxed integrity condition} for actions.
+
+An output transaction is started by calling \emph{beginTransaction()} either explicitely or implicitely by a call to \emph{doAction()} without calling \emph{beginTransaction()} before. Then, one or more calls to \emph{doAction()} follow. When the caller intends to finish the transaction, it calls \emph{endTransaction()}. However, the transaction may also be terminated from the action itself in response to a \emph{doAction()} call.
+
+Mathematical, an action transaction builds a totally orderred set of uncommitted messages $M_u$. The order relation is defined over the sequence in which messages are being provided to \emph{doAction()}. At any time a commit is attempted, the full set $M_u$ is committed and may either succeeed completely or not at all (in the sense of the relaxed integrity condition described above).
+
+A commit is attempted when
+\begin{enumerate}
+\item the caller decides to call \emph{endTransaction()}
+\item or earlier if the action decides it needs to commit now (e.g. because of buffers filling up).
+\end{enumerate}
+
+In the seconds case, the action may decide to commit all message but the current one or all (this is depending on action logic). So if the action decideds to commit a transaction before the caller calls \emph{endTransaction()}, a set of commited messages $M_c$ is build and $M_u$ is modified. Be $n$ the $n$-th iterated \emph{doAction()} call and $m_n$ the current message of this call, then the sets are build as follows:
+
+\begin{algorithm}
+%\caption{}
+\begin{algorithmic}
+\IF{action commits $m_n$}
+ \STATE $M_c = M_u \cup m_n$
+ \STATE $M_u = \emptyset$
+\ELSE
+ \STATE $M_c = M_u$
+ \STATE $M_u = \{ m_n\}$
+\ENDIF
+\end{algorithmic}
+\end{algorithm}
+
+In other words, if anything is committed early, it is always the full set $M_u$, with or without the current message. The caller needs to know which messages are already commited. As \emph{doAction()} finishes one transaction and starts a new one in a single call, we can not use action state the let the caller know this happened. So we use our above finding and just convey back if the transacton is still continuing or the current message or all others before it were committed. The caller must then act accordingly. Please note that when an error happens, the whole transaction must still be considered failed. As such, ``partial commit'' states need not to be mixed with failure states.
+
+Please note that the above method leaves a small potential issue unaddressed: if the action does an early commit of $M_u \setminus m_n$, an error happens when adding $m_n$ to the new $M_u$ (like running out of resources), the action would need to convey both the successful transaction as well as the failure state. This is not possible with the current interface. We could use callbacks to provide such notification, but this complicates the code. So, if that situaton arises, the action must temporarily buffer the error condition and convey it as part of either the next \emph{doAction()} call or during \emph{endTransation()} processing. This can be done, for example, by advancing its internal state accordingly.
+
+The state set for a actions is defined as follows:
+$$
+S_A = \{ rdy, itx, comm, rtry, susp, died \}
+$$
+
+With the semantics of the various states being the following:
+
+\begin{center}
+\begin{tabular}{|l|l|} \hline
+ State & Semantics \\\hline
+ rdy & ready, waiting for transaction begin\\
+ itx & in transaction, accept more data \\
+ comm & transaction finished \\
+ rtry & action failed but may be able to recover \\
+ susp & action currently defunctional until timeout expires \\
+ died & unrecoverable error condition occured, no longer usable \\\hline
+\end{tabular}
+\end{center}
+
+In the associated state diagram in figure \ref{fig_action_states}, we do not include the \emph{died} state, because it is entered whenever a totally unrecoverable error state may occur. This is a very exceptional incident (which most output plugins do not even support), so we have kept the diagram simple.
+
+\begin{figure}
+\begin{center}
+\includegraphics[scale=0.5]{action_state.jpeg}
+\end{center}
+\caption{Action State Diagram}
+\label{fig_action_states}
+\end{figure}
+
+\emph{Note well} that the state diagram describes the action state. It does \emph{not} describe the transaction state. While action- and transaction state are closely related to each other, they are different entities.
+
+The return code of \emph{doAction()} and \emph{endTransaction()} is used to convey the transaction state. As such, it is a function of the actions's current state after processing the request. The mapping is as shown below:
+
+\begin{center}
+\begin{tabular}{|l|l|} \hline
+ State & Return Code (RS\_RET\_\ldots)\\\hline
+ rdy & OK \\
+ itx & COMMITTED (if there was an auto-commit without $m_n$)\\
+ & DEFER\_COMMIT (if there was no auto-commit)\\
+ comm & internal state, not to be exposed to upper layer \\
+ rtry & SUSPENDED \emph{(new code needed)} \\
+ susp & SUSPENDED \\
+ died & DISABLED \\\hline
+\end{tabular}
+\end{center}
+
+For the rest of this document, let's assume there is a function \emph{getReturnCode()} that implements this mapping.
+
+It is important to think about how retries are handled. There is a user-configured per-action upper number of retries $C_r$ and retry interval $C_i$. In \emph{rsyslog v3}, there is no concept of output transactions. As such, only single messages are processed. When a temporary action failure occurs, the action is re-tried $C_r$ times, where the action processing thread is waiting in a \emph{sleep()} $C_i$ operating system API call\footnote{a suitable API is used, not \emph{sleep()} itself}. If the action succeeds during the retry processing, everything continues as usual. If it does not succeed, two things happen:
+\begin{itemize}
+\item the message is flagged as ``action permanent failure'' (what may trigger backup processing)
+\item the action is actually suspended for $C_i$ seconds
+\end{itemize}
+If then a new message is sent to the action, and $C_i$ seconds have not yet elapsed, the action is flagged as having failed without being re-tried again\footnote{During the analysis for this paper, it was seen that actually $C_r$ retries are attempted in v3, but each of them will never actually re-try the action. This is a software bug, which does not cause any harm and thus will not be fixed in v3. The new implementation in v4 will obviously not inherit this problem}. This is done in an effort to reduce resource utilization and prevent the system from slowing down e.g. by too-many retries to a remote server that went offline.
+
+With transactional output mode in \emph{rsyslog v4}, the logic above can no longer work. First of all, retrying single actions does not help, because all of the current transaction needs to be resubmitted. As such, the upper layers need to be notified of failure. Then, they need to resubmit the batch. In that design, the lower layer needs to return immediately after detecting the failure. Recovery handling is now to be done when the next transaction is started. However, we must make sure that we do not do excessive retries. So retry processing is only to be carried out if it was not tried less than $C_i$ seconds ago.
+
+The required functionality can be implemeted by a \emph{prepareAction} function that readies the action for processing if there is need to do so. That function is then called in all entry points before anything else is done. Then, actual processing is carried out and the resulting action state be used to generate the return code for the upper-layer caller. Find below a rough pseudocode to do so:
+
+\lstset{language=python}
+\begin{lstlisting}
+def prepareAction():
+ if state == rtry:
+ try recovery (adjust state accordingly)
+ if state == rdy:
+ beginTransaction() [output plugin]
+
+def processMessage(message):
+ prepareAction()
+ if state == itx
+ doAction(message) [output plugin]
+ return getReturnCode()
+
+def doEndTransaction():
+ prepareAction()
+ if state == itx
+ endTransaction(); [output plugin]
+ return getReturnCode()
+\end{lstlisting}
+
+\subsection{Output Subsystem Layers}
+The rsyslog engine is organized in layers, where each layer is represented by the dominating object:
+
+\begin{figure}
+\includegraphics[scale=0.75]{rsyslog_output_layers.jpeg}
+\label{rsyslog output layers}
+\end{figure}
+
+If looking at the data flow, a queue dequeues batches of messages, which are than run through a generic action system and put into output plugins. Note that on the batch layer, only batches are supported as units of work, whereas the action layer is message-oriented but supports transactions of multiple messages. This is done by indicating when a transaction necessarily needs to end (that point being the end of batch from the batch layer).
+
+The plugins can be written by third parties and are roughly comparable to minidrivers. The generic action system provides all complexity of action processing wheras the output plugin provides a limited set of callbacks that enable the generic framework to talk to the actual destination system. As such, writing outputs is a very simple task. However, rsyslog does not limit the creation of very complex outputs, which may be able to offer superior performance for some destinations.
+
+\subsection{Output Failure}
+\subsubsection{Cases}
+When an output action is called, it may encounter a failure condition. In general, there are two different cases:
+\begin{enumerate}
+\item action caused failures
+\item message-content caused failures
+\end{enumerate}.
+
+Failures rooted in the action are things like broken network connections, file systems run out of space or database servers that are down. Most importantly, the failure is not related to message content. As such, it is appropriate to retry the action with the same message until it finally succeeds (assuming that someone restores the system in question to proper operation). We can not expect that the problem is cleared just by discarding the current message and re-trying with the next one.
+
+In my view, action caused failures are the far majority of all failures. For rsyslog versions 3 and below, all rsyslog-provided plugins consider failures to be action-caused and thus potentially recoverable by simple retry. With the only exception being fatal error conditions that render the whole action unusable.
+
+David Lang pointed out, that there may also exist error conditions that are not caused by the action (or the subsystem it talks to) itself, but rather by message data. He provided the following samples where message content can cause permanent issues with action execution:
+
+\begin{itemize}
+\item unicode text causing grief
+\item dynafile hits a read-only file
+\item basicly data-driven things that trigger bugs in the message delivery
+mechanism in some form.
+\end{itemize}
+
+As David Lang said ``In an ideal world these would never happen, but for most output types I can think of some form of corrupt input that could cause that message to fail.''.
+So this class of failure conditions actually exists. No matter how often the action retry mechanism is called, it will never succeeds (one may argue that the read-only dynafile is fixable, but we could replace that sample with an invalidly generated filename). The proper cure for these actions is to find the offending one and discard it.
+
+In conclusion, actions need to return different error states for these two different types of failures. Traditionally, RS\_RET\_SUSPENDED is returned when an action specific failure is hit. Most existing plugins also do this if a message-related failure occured, simply because they did not yet know that this situation exists. However, plugins also return different error codes, and at least these can be treated to mean message-permanent failures. To support this, a change to plugins is still required, because many simple return SUSPENDED state if anything went wrong (replacing the real error condition with SUSPENDED). A dedicated PROBABLE\_INVALID\_MSG return state is probably useful so that an output plugin can convey back that it consideres the message to be bad. On the other hand, this implies that the plugin must try to detect those, what means that the developer must think about all potential message-causes problems. That approach can be considered unreliable and as such it may be better not to provide such a dedicted state.
+
+\subsubsection{Handling of Failures}
+In spite of the two different failure cases, different handling is needed for them. The action-based failure cases can and must be handled on the action level. As transactions abort when a failure occurs, support from the upper ``batch layer'' is necessary in order to handle resending batches of messages.
+
+For message-caused failure cases, the offending message must be found and then be discarded. A complexity here is that while a failure-causing message is being searched for, an action-based failure might occur. In that case, first the action-based failure condition must be solved, before the search for the problem message can continue.
+
+One approach might be that when the action-layer conveys back an action-caused failure (SUSPENDED), the batch layer knows that it simply needs to restart the full transaction (but not start an ``invalid message search''). If a message-based error condition is conveyed back, the batch system can not restart the full batch. Instead, it needs to enter search mode, where it creates partitions of the original batch, and calls itself recursively (at least in theory) on each of the subsets.
+
+Then, the same handling applies until either a failing message has been found or all messages have been successfully processed. Note that in the recursive step, action-based failures are recovered by full batch resubmits. This solves the above-mentioned complexity in a consistent way.
+
+If a binary-search-like method is used to detect failing records\footnote{This was originally suggested by David Lang.}, recursion may not really be an issue, as the recursion depth is limited to $\log_2 |B|$ where $B$ is the message batch.
+
+A message-caused failure can be rooted in one or more messages. One important question is if it is expected that the failure is caused by a single or multiple messages. Both is possible, so it is a question of probability. If we assume that it is more probable that a single messages causes the problems, it is useful to immediately return back to full batch submission of transactions once a problem-causing message has been identified. But then, if there are multiple problem-causing messages inside the batch, we may need many more iterations.
+
+If, on the other hand, we assume that it is more probable that multiple messages cause problems, it may make sense to keep resubmitting only subsets of the batch. However, then the performance is suboptimal if actually only one message was problematic. A solution might be to pick a compromise, e.g. first assume that a single message is problematic, but assume the opposite as soon as a second message with problems has been found.
+
+A potential algorithm for processing $n \le |B|$ messages from batch $B$ is described below. In the pseudocode, a ``processable'' message is one that neither is already committed nor had a permanent failure with this action. The term ``mpf'' means ``message permanent failure'' for this action (this will later be described in a batch state set).
+
+\begin{small}
+\lstset{language=python}
+\begin{lstlisting}
+def submitBatch(B, n):
+ foreach processable message in
+ (first [at most] n messages of batch):
+ call processMessage
+ if action-caused failure:
+ retry full batch
+ if action-caused permanent failure:
+ mark all n messages as mpf
+ return
+ if auto-commit:
+ mark commited messages in batch as committed
+ if message-caused failure:
+ if n == 1:
+ mark message as mpf
+ return
+ else:
+ call submitBatch(B, n/2)
+ call submitBatch(B, n/2)
+\end{lstlisting}
+\end{small}
+
+After submitBatch() has completed, all messages are either committed or in mpf state.
+
+Note that an action-caused permanent failure occurs if an action-caused failure can not be resolved with the operator-configured number of retries. It will never occur if the user configured infinite retries. While an action is suspended, all calls will result in an action-caused permanent failure. Please keep in mind that these will be resubmitted to any backup actions inside the action unit, so the action's ability to cause permanent failure states is vital for a number of use cases (backup syslog server, to name just one).
+
+Batch processing inside an action unit thus can follow these strucuture:
+
+\begin{algorithm}
+\caption{processBatch(B)}
+\begin{algorithmic}
+\FORALL{action $a$ in action unit}
+ \IF{execute action only on messages that failed before}
+ \STATE $n = |\text{messages in batch in mpf state}|$
+ \STATE change mpf state back to ready
+ \ELSE
+ \STATE $n = |B \setminus \text{msgs with state discard}|$
+ \STATE change all message states $\ne$ discard to ready
+ \ENDIF
+ \IF{$n >0$ }
+ \STATE call submitBatch(B, n) for action $a$
+ \ENDIF
+\ENDFOR
+\end{algorithmic}
+\end{algorithm}
+
+\paragraph{Why is it Important to differentiate the failure cases?}
+This text originates from the mailing list and must be merged in. I provide it in the form it is, so it will not be forgotten (plus, it conveys the information).
+
+One may think that it is not necessary to differentiate between action-caused and message-caused failures. However, not doing so introduces subtle issues, because
+then you either
+
+A) do not need the batch logic at all (because the action is configured for
+infinite retries)
+
+Or
+
+B) you loose many messages if the action is not configured for infinite
+retries and you have a longer-duration outage e.g. on a database server.
+Let's say it is offline for a couple of hours, then you lose almost
+everything in that period
+
+To prevent this, you need two different retry methods.
+
+One may argue that it is hard to differentiate between the two failure cases. This is correct. Buit I think it mostly depends on the quality of the output module.
+
+First of all, ``mostly'' implies that there may be some other cases, where it
+really is impossible to differentiate between the two. In that case, I would
+treat the issue as an action-caused failure. There are two reasons for this:
+
+1) rsyslog v3 currently does this always and not even a single person
+complained about that so far. This is an empiric argument, and it does not
+mean it caused problems. But it carries the co-notation that this seems not
+to be too bad.
+
+2) If we would treat it as message-caused failure, we would no longer be able
+to handle extended outages of destination systems, which I consider a vitally
+important feature.
+
+When weighing the two, I know of lots of people who rely on 2), in sharp
+contrast to knowig noone having problems with 1). So my conclusion is that it is
+less problematic to define an otherwise undefinable failure reason to be
+action-caused. Even more so as I assume this problem only exists in the
+minority of cases.
+
+Now back to the quality of the output module: thinking about databases, their
+API is usually very good at conveying back if there was a SQL error or a
+connection abort. So while a SQL error may also be an indication of a
+configuration problem, I would strongly tend to treat it is a being
+message-caused. This is under the assumption that any reasonable responsive
+admin will hopefully test his configuration at least once before turning it
+into production. And config SQL errors should manifest immediately, so I
+expect these to be fixed before a configuration runs in production. So it is
+the chore of the output module to interpret the return code it received from
+its API and decide whether this is more likely action-caused or
+message-caused. For database outputs, I would assume that it is always easy
+to classify failures that can only be action-caused, especially in the
+dominating case of a failed network connection or a failed server.
+
+For other outputs it may not be as easy. But, for example, all stream network
+outputs can detect a broken connection, so this also is a sure fit.
+
+For dynafiles, it really depends on how hard the output module is tries to differentiate
+between the two failure cases. But I think you can go great length here, too.
+Especially if you do not only look at the create() return code, but, iff a
+failure occurs, you do more API calls to find out the cause.
+
+So I think the remaining problem is small enough to cause not too much issues
+(and if so, they are unavoidable in any case). In conclusion, the two failure states are not only necessary, but can sufficiently sure enough be detected.
+
+\subsection{Random Topics}
+I have begun to gather material from the mailing list in this section, because I feel it may be useful for others as well. Right now, the information is well hidden in the mailing list archives and there may be value in combining it all in one place.
+
+Due to the nature of this material, there is no specific organization between the subchapters and also formatting and language doesn't deny its rooting in the mailing list.
+
+\subsection{Reliability of Message Dequeueing}
+A batch is actually dequeued when it is taken off a queue. So if at that point we
+have a system power failure (for whatever reason), the messages are lost.
+While the rsyslog engine intends to be very reliable, it is not a complete
+transactional system. A slight risk remains. For this, you need to understand
+what happens when the batch is processed. I assume that we have no sudden,
+untrappable process termination. Then, if a batch cannot be processed, it is
+returned back to the top of queue. This is not yet implemented, but is how
+single messages (which you can think of an abstraction of a batch in the
+current code) are handled. If, for example, the engine shuts down, but an
+action takes longer than the configured shutdown timeout, the action is
+cancelled and the queue engine reclaims the unprocessed messages. They go
+into a special area inside the .qi file and are placed on top of the queue
+once the engine restarts.
+
+The only case where this not work is sudden process termination. I see two
+cases:
+
+a) a fatal software bug
+We cannot really address this. Even if the messages were remaining in the
+queue until finally processed, a software bug (maybe an invalid pointer) may
+affect the queue structures at large, possibly even at the risk of total loss
+of all data inside that queue. So this is an inevitable risk.
+
+b) sudden power fail
+... which can and should be mitigated at another level
+
+One may argue that there also is
+
+c) admin error
+e.g, kill -9 rsyslogd
+Here a fully transactional queue will probably help.
+
+However, I do not think that the risk involved justifies a far more complex
+fully transactional implementation of the queue object. Some risk always
+remains (what in the disaster case, even with a fully transactional queue?).
+
+And it is so complex to let the messages stay in queue because it is complex
+to work with such messages and disk queues. It would also cost a lot of
+performance, especially when done reliably (need to sync). We would then need
+to touch each element at least four times, twice as much as currently. Also,
+the hybrid disk/memory queues become very, very complex. There are more
+complexities around this, I just wanted to tell the most obvious.
+
+So, all in all, the idea is that messages are dequeued, processed and put
+back to the queue (think: ungetc()) when something goes wrong. Reasonable
+(but not more) effort is made to prevent message loss while the messages are
+in unprocessed state outside of the queue.
+
+\paragraph{More reliable can actually be less reliable}
+On the rsyslog mailing list, we had a discussion about how reliable rsyslog should be. It circles about a small potential window of message loss in the case of sudden power failure. Rsyslog can be configured to put all messages into a disk queue (instead of main memory), so these messages survive such a powerfail condition. However, messages dequeued and scheduled for processing during the power outage may be lost.
+
+I now consider a case where we have bursty UDP traffic and rsyslog is configured to use a disk-only queue (which obviously is much slower than an in-memory queue). Looking at processing speeds, the max burst rate is limited by using an ultra-reliable queue. To avoid using UDP messages, a second instance could be run that uses an in-memory queue and forwards received messages to the one in ultra-reliable mode (that is with the disk-only queue). So that second instance queues in memory until the (slower) reliable rsyslogd can now accept the message and put it into the reliable queue. Let's say that you have a burst of $r$ messages and that from these burst only $r/2$ can be enqueued (because the ultra reliable queue is so slow). So you lose $r/2$ messages.
+
+Now consider the case that you run rsyslog with just a reliable queue, one that is kept in memory but not able to cover the power failure scenario. Obviously, all messages in that queue are lost when power fails (or almost all to be precise). However, that system has a much broader bandwidth. So with it, there would never have been r messages inside the queue, because that system has a much higher sustained message rate (and thus the burst causes much less of trouble). Let's say the system is just twice as fast in this setup (I guess it usually would be *much* faster). Than, it would be able to process all r records.
+
+In that scenario, the ultra-reliable system loses $r/2$ messages, whereas the somewhat more "unreliable" system loses none - by virtue of being able to process messages as they arrive.
+
+Now extend that picture to messages residing inside the OS buffers or even those that are still queued in their sources because a stream transport blocked sending them.
+
+I know that each detail of this picture can be argued at length about.
+
+However, my opinion is that there is no "ultra-reliable" system in life, only various probabilities in losing messages. These probabilities often depend on each other, what makes calculating them very hard to impossible. Still, the probability of message loss in the system at large is just the product of the probabilities in each of its components. And reliability is just the inverse of that probability.
+
+This is where *I* conclude that it can make sense to permit a system to lose some messages under certain circumstances, if that influences the overall probability calculation towards the desired end result. In that sense, I tend to think that a fast, memory-queuing rsyslogd instance can be much more reliable compared to one that is configured as being ultra-reliable, where the rest of the system at large is badly influenced by this (the scenario above).
+
+However, I also know that for regulatory requirements, you often seem to need to prove that a system may not lose messages once it has received them, even at the cost of an overall increased probability of message loss.
+
+My view of reliability is much the same as my view of security: there is no such thing as "being totally secure", you can just reduce the probability that something bad happens. The worst thing in security is someone who thinks he is "totally secure" and as such is no longer actively looking at potential issues.
+
+The same I see for reliability. There is no thing like "being totally reliable" and it is a really bad idea to think you could ever be. Knowing this, one may begin to think about how to decrease the overall probability of message loss AND think about what rate is acceptable (and what to do with these cases, e.g. "how can they hurt").
+\end{document}
diff --git a/doc/dev_oplugins.html b/doc/dev_oplugins.html
index 5bfc974c..2e195028 100644
--- a/doc/dev_oplugins.html
+++ b/doc/dev_oplugins.html
@@ -144,19 +144,172 @@ array-passing capability not blindly be used.</b> In such cases, we can not guar
plugin from segfaulting and if the plugin (as currently always) is run within
rsyslog's process space, that results in a segfault for rsyslog. So do not do this.
<h3>Batching of Messages</h3>
-<p>With the current plugin interface, each message is passed via a separate call to the plugin.
-This is annoying and costs performance in some uses cases (primarily for database outputs).
-However, that's the way it (currently) is, no easy way around it. There are some ideas
-to implement batching capabilities inside the rsyslog core, but without that the only
-resort is to do it inside your plugin yourself. You are not prohibited from doing so.
-There are some consequences, though: most importantly, the rsyslog core is no longer
-intersted in messages that it passed to a plugin. As such, it will not try to make sure
-the message is not lost before it was ultimately processed (because rsyslog, due to
-doAction() returning successfully, thinks the message *was* ultimately processed).
-<p>When the rsyslog core receives batching capabilities, this will be implemented in
-a way that is fully compatible to the existing plugin interface. While we have not yet
-thought about the implementation, that will probably mean that some new interfaces
-or options be used to turn on batching capabilities.
+<p>Starting with rsyslog 4.3.x, batching of output messages is supported. Previously, only
+a single-message interface was supported.
+<p>With the <b>single message</b> plugin interface, each message is passed via a separate call to the plugin.
+Most importantly, the rsyslog engine assumes that each call to the plugin is a complete transaction
+and as such assumes that messages be properly commited after the plugin returns to the engine.
+<p>With the <b>batching</b> interface, rsyslog employs something along the line of
+&quot;transactions&quot;. Obviously, the rsyslog core can not make non-transactional outputs
+to be fully transactional. But what it can is support that the output tells the core which
+messages have been commited by the output and which not yet. The core can than take care
+of those uncommited messages when problems occur. For example, if a plugin has received
+50 messages but not yet told the core that it commited them, and then returns an error state, the
+core assumes that all these 50 messages were <b>not</b> written to the output. The core then
+requeues all 50 messages and does the usual retry processing. Once the output plugin tells the
+core that it is ready again to accept messages, the rsyslog core will provide it with these 50
+not yet commited messages again (actually, at this point, the rsyslog core no longer knows that
+it is re-submiting the messages). If, in contrary, the plugin had told rsyslog that 40 of these 50
+messages were commited (before it failed), then only 10 would have been requeued and resubmitted.
+<p>In order to provide an efficient implementation, there are some (mild) constraints in that
+transactional model: first of all, rsyslog itself specifies the ultimate transaction boundaries.
+That is, it tells the plugin when a transaction begins and when it must finish. The plugin
+is free to commit messages in between, but it <b>must</b> commit all work done when the core
+tells it that the transaction ends. All messages passed in between a begin and end transaction
+notification are called a batch of messages. They are passed in one by one, just as without
+transaction support. Note that batch sizes are variable within the range of 1 to a user configured
+maximum limit. Most importantly, that means that plugins may receive batches of single messages,
+so they are required to commit each message individually. If the plugin tries to be &quot;smarter&quot;
+than the rsyslog engine and does not commit messages in those cases (for example), the plugin
+puts message stream integrity at risk: once rsyslog has notified the plugin of transacton end,
+it discards all messages as it considers them committed and save. If now something goes wrong,
+the rsyslog core does not try to recover lost messages (and keep in mind that &quot;goes wrong&quot;
+includes such uncontrollable things like connection loss to a database server). So it is
+highly recommended to fully abide to the plugin interface details, even though you may
+think you can do it better. The second reason for that is that the core engine will
+have configuration settings that enable the user to tune commit rate to their use-case
+specific needs. And, as a relief: why would rsyslog ever decide to use batches of one?
+There is a trivial case and that is when we have very low activity so that no queue of
+messages builds up, in which case it makes sense to commit work as it arrives.
+(As a side-note, there are some valid cases where a timeout-based commit feature makes sense.
+This is also under evaluation and, once decided, the core will offer an interface plus a way
+to preserve message stream integrity for properly-crafted plugins).
+<p>The second restriction is that if a plugin makes commits in between (what is perfectly
+legal) those commits must be in-order. So if a commit is made for message ten out of 50,
+this means that messages one to nine are also commited. It would be possible to remove
+this restriction, but we have decided to deliberately introduce it to simpify things.
+<h3>Output Plugin Transaction Interface</h3>
+<p>In order to keep compatible with existing output plugins (and because it introduces
+no complexity), the transactional plugin interface is build on the traditional
+non-transactional one. Well... actually the traditional interface was transactional
+since its introduction, in the sense that each message was processed in its own
+transaction.
+<p>So the current <code>doAction()</b> entry point can be considered to have this
+structure (from the transactional interface point of view):
+<p><pre><code>
+doAction()
+ {
+ beginTransaction()
+ ProcessMessage()
+ endTransaction()
+ }
+ </code></pre>
+<p>For the <b>transactional interface</b>, we now move these implicit <code>beginTransaction()</code>
+and <code>endTransaction(()</code> call out of the message processing body, resulting is such
+a structure:
+<p><pre><code>
+beginTransaction()
+ {
+ /* prepare for transaction */
+ }
+
+doAction()
+ {
+ ProcessMessage()
+ /* maybe do partial commits */
+ }
+
+endTransaction()
+ {
+ /* commit (rest of) batch */
+ }
+</code></pre>
+<p>And this calling structure actually is the transactional interface! It is as simple as this.
+For the new interface, the core calls a <code>beginTransaction()</code> entry point inside the
+plugin at the start of the batch. Similarly, the core call <code>endTransaction()</code> at the
+end of the batch. The plugin must implement these entry points according to its needs.
+<p>But how does the core know when to use the old or the new calling interface? This is rather
+easy: when loading a plugin, the core queries the plugin for the <code>beginTransaction()</code>
+and <code>endTransaction()</code> entry points. If the plugin supports these, the new interface is
+used. If the plugin does not support them, the old interface is used and rsyslog implies that
+a commit is done after each message. Note that there is no special "downlevel" handling
+necessary to support this. In the case of the non-transactional interface, rsyslog considers
+each completed call to <code>doAction</code> as partial commit up to the current message.
+So implementation inside the core is very straightforward.
+<p>Actually, <b>we recommend that the transactional entry points only be defined by those
+plugins that actually need them</b>. All others should not define them in which case
+the default commit behaviour inside rsyslog will apply (thus removing complexity from the
+plugin).
+<p>In order to support partial commits, special return codes must be defined for
+<code>doAction</code>. All those return codes mean that processing completed successfully.
+But they convey additional information about the commit status as follows:
+<p>
+<table border="0">
+<tr>
+<td valign="top"><i>RS_RET_OK</i></td>
+<td>The record and all previous inside the batch has been commited.
+<i>Note:</i> this definition is what makes integrating plugins without the
+transaction being/end calls so easy - this is the traditional "success" return
+state and if every call returns it, there is no need for actually calling
+<code>endTransaction()</code>, because there is no transaction open).</td>
+</tr>
+<tr>
+<td valign="top"><i>RS_RET_DEFER_COMMIT</i></td>
+<td>The record has been processed, but is not yet commited. This is the
+expected state for transactional-aware plugins.</td>
+</tr>
+<tr>
+<td valign="top"><i>RS_RET_PREVIOUS_COMMITTED</i></td>
+<td>The <b>previous</b> record inside the batch has been committed, but the
+current one not yet. This state is introduced to support sources that fill up
+buffers and commit once a buffer is completely filled. That may occur halfway
+in the next record, so it may be important to be able to tell the
+engine the everything up to the previouos record is commited</td>
+</tr>
+</table>
+<p>Note that the typical <b>calling cycle</b> is <code>beginTransaction()</code>,
+followed by <i>n</i> times
+<code>doAction()</code></n> followed by <code>endTransaction()</code>. However, if either
+<code>beginTransaction()</code> or <code>doAction()</code> return back an error state
+(including RS_RET_SUSPENDED), then the transaction is considered aborted. In result, the
+remaining calls in this cycle (e.g. <code>endTransaction()</code>) are never made and a
+new cycle (starting with <code>beginTransaction()</code> is begun when processing resumes.
+So an output plugin must expect and handle those partial cycles gracefully.
+<p><b>The question remains how can a plugin know if the core supports batching?</b>
+First of all, even if the engine would not know it, the plugin would return with RS_RET_DEFER_COMMIT,
+what then would be treated as an error by the engine. This would effectively disable the
+output, but cause no further harm (but may be harm enough in itself).
+<p>The real solution is to enable the plugin to query the rsyslog core if this feature is
+supported or not. At the time of the introduction of batching, no such query-interface
+exists. So we introduce it with that release. What the means is if a rsyslog core can
+not provide this query interface, it is a core that was build before batching support
+was available. So the absence of a query interface indicates that the transactional
+interface is not available. One might now be tempted the think there is no need to do
+the actual check, but is is recommended to ask the rsyslog engine explicitely if
+the transactional interface is present and will be honored. This enables us to
+create versions in the future which have, for whatever reason we do not yet know, no
+support for this interface.
+<p>The logic to do these checks is contained in the <code>INITChkCoreFeature</code> macro,
+which can be used as follows:
+<p><pre><code>
+INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
+</code></pre>
+<p>Here, bCoreSupportsBatching is a plugin-defined integer which after execution is
+1 if batches (and thus the transational interface) is supported and 0 otherwise.
+CORE_FEATURE_BATCHING is the feature we are interested in. Future versions of rsyslog
+may contain additional feature-test-macros (you can see all of them in
+./runtime/rsyslog.h).
+<p>Note that the ompsql output plugin supports transactional mode in a hybrid way and
+thus can be considered good example code.
+
+<h2>Open Issues</h2>
+<ul>
+<li>Processing errors handling
+<li>reliable re-queue during error handling and queue termination
+</ul>
+
+
+
<h3>Licensing</h3>
<p>From the rsyslog point of view, plugins constitute separate projects. As such,
we think plugins are not required to be compatible with GPLv3. However, this is
diff --git a/doc/queues.html b/doc/queues.html
index 4a9509a0..f063e87c 100644
--- a/doc/queues.html
+++ b/doc/queues.html
@@ -332,6 +332,33 @@ in this regard - it was just not requested so far. So if you need more
fine-grained control, let us know and we'll probably implement it.
There are two configuration directives, both should be used together or
results are unpredictable:" <i>$&lt;object&gt;QueueDequeueTimeBegin &lt;hour&gt;</i>" and&nbsp;"<i>$&lt;object&gt;QueueDequeueTimeEnd &lt;hour&gt;</i>". The hour parameter must be specified in 24-hour format (so 10pm is 22). A use case for this parameter can be found in the <a href="http://wiki.rsyslog.com/index.php/OffPeakHours">rsyslog wiki</a>. </p>
+<h2>Performance</h2>
+<p>The locking involved with maintaining the queue has a potentially large
+performance impact. How large this is, and if it exists at all, depends much on
+the configuration and actual use case. However, the queue is able to work on
+so-called &quot;batches&quot; when dequeueing data elements. With batches,
+multiple data elements are dequeued at once (with a single locking call).
+The queue dequeues all available elements up to a configured upper
+limit (<i>&lt;object&gt;DequeueBatchSize &lt;number&gt;</i>). It is important
+to note that the actual upper limit is dictated by availability. The queue engine
+will never wait for a batch to fill. So even if a high upper limit is configured,
+batches may consist of fewer elements, even just one, if there are no more elements
+waiting in the queue.
+<p>Batching
+can improve performance considerably. Note, however, that it affects the
+order in which messages are passed to the queue worker threads, as each worker
+now receive as batch of messages. Also, the larger the batch size and the higher
+the maximum number of permitted worker threads, the more main memory is needed.
+For a busy server, large batch sizes (around 1,000 or even more elements) may be useful.
+Please note that with batching, the main memory must hold BatchSize * NumOfWorkers
+objects in memory (worst-case scenario), even if running in disk-only mode. So if you
+use the default 5 workers at the main message queue and set the batch size to 1,000, you need
+to be prepared that the main message queue holds up to 5,000 messages in main memory
+<b>in addition</b> to the configured queue size limits!
+<p>The queue object's default maximum batch size
+is eight, but there exists different defaults for the actual parts of
+rsyslog processing that utilize queues. So you need to check these object's
+defaults.
<h2>Terminating Queues</h2>
<p>Terminating a process sounds easy, but can be complex.
Terminating a running queue is in fact the most complex operation a queue
diff --git a/doc/rsyslog_conf_global.html b/doc/rsyslog_conf_global.html
index 43eacc43..c1f1bcac 100644
--- a/doc/rsyslog_conf_global.html
+++ b/doc/rsyslog_conf_global.html
@@ -58,6 +58,7 @@ default template for UDP and plain TCP forwarding action</li>
<li>$ActionGSSForwardDefaultTemplate [templateName] - sets a
new default template for GSS-API forwarding action</li>
<li>$ActionQueueCheckpointInterval &lt;number&gt;</li>
+<li>$ActionQueueDequeueBatchSize &lt;number&gt; [default 16]</li>
<li>$ActionQueueDequeueSlowdown &lt;number&gt; [number
is timeout in <i> micro</i>seconds (1000000us is 1sec!),
default 0 (no delay). Simple rate-limiting!]</li>
@@ -128,6 +129,7 @@ not recommended for use with rsyslog. To do a full restart, simply stop and star
compatibility reasons. If it is set to "off", a HUP will only close open files. This is a much quicker action and usually
the only one that is needed e.g. for log rotation. <b>It is recommended to set the setting to "off".</b></li>
<li><a href="rsconf1_includeconfig.html">$IncludeConfig</a></li><li>MainMsgQueueCheckpointInterval &lt;number&gt;</li>
+<li>$MainMsgQueueDequeueBatchSize &lt;number&gt; [default 32]</li>
<li>$MainMsgQueueDequeueSlowdown &lt;number&gt; [number
is timeout in <i> micro</i>seconds (1000000us is 1sec!),
default 0 (no delay). Simple rate-limiting!]</li>
diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c
index 5a8a62f6..9883fa89 100644
--- a/plugins/imtcp/imtcp.c
+++ b/plugins/imtcp/imtcp.c
@@ -245,14 +245,10 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus
iTCPSessMax = 200;
iStrmDrvrMode = 0;
iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
- if(pszInputName != NULL) {
- free(pszInputName);
- pszInputName = NULL;
- }
- if(pszStrmDrvrAuthMode != NULL) {
- free(pszStrmDrvrAuthMode);
- pszStrmDrvrAuthMode = NULL;
- }
+ free(pszInputName);
+ pszInputName = NULL;
+ free(pszStrmDrvrAuthMode);
+ pszStrmDrvrAuthMode = NULL;
return RS_RET_OK;
}
diff --git a/plugins/ompgsql/ompgsql.c b/plugins/ompgsql/ompgsql.c
index 6daac1c7..003cf6a8 100644
--- a/plugins/ompgsql/ompgsql.c
+++ b/plugins/ompgsql/ompgsql.c
@@ -170,6 +170,9 @@ tryExec(uchar *pszCmd, instanceData *pData)
int bHadError = 0;
/* try insert */
+BEGINfunc
+RUNLOG_VAR("%p", pData->f_hpgsql);
+RUNLOG_VAR("%s", pszCmd);
pgRet = PQexec(pData->f_hpgsql, (char*)pszCmd);
execState = PQresultStatus(pgRet);
if(execState != PGRES_COMMAND_OK && execState != PGRES_TUPLES_OK) {
@@ -178,6 +181,7 @@ tryExec(uchar *pszCmd, instanceData *pData)
}
PQclear(pgRet);
+ENDfunc
return(bHadError);
}
@@ -230,6 +234,14 @@ CODESTARTtryResume
}
ENDtryResume
+
+BEGINbeginTransaction
+CODESTARTbeginTransaction
+dbgprintf("ompgsql: beginTransaction\n");
+ iRet = writePgSQL((uchar*) "begin", pData); /* TODO: make user-configurable */
+ENDbeginTransaction
+
+
BEGINdoAction
CODESTARTdoAction
dbgprintf("\n");
@@ -237,6 +249,13 @@ CODESTARTdoAction
ENDdoAction
+BEGINendTransaction
+CODESTARTendTransaction
+ iRet = writePgSQL((uchar*) "commit;", pData); /* TODO: make user-configurable */
+dbgprintf("ompgsql: endTransaction\n");
+ENDendTransaction
+
+
BEGINparseSelectorAct
int iPgSQLPropErr = 0;
CODESTARTparseSelectorAct
@@ -314,6 +333,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
ENDqueryEtryPt
@@ -322,6 +342,8 @@ CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
+ DBGPRINTF("ompgsql: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not ");
ENDmodInit
/* vi:set ai:
*/
diff --git a/plugins/omtesting/omtesting.c b/plugins/omtesting/omtesting.c
index 411bcf88..8f6cdbe5 100644
--- a/plugins/omtesting/omtesting.c
+++ b/plugins/omtesting/omtesting.c
@@ -22,7 +22,7 @@
* NOTE: read comments in module-template.h to understand how this file
* works!
*
- * Copyright 2007 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007, 2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -46,12 +46,14 @@
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
+#include <time.h>
#include <string.h>
#include <ctype.h>
#include <assert.h>
#include "dirty.h"
#include "syslogd-types.h"
#include "module-template.h"
+#include "cfsysline.h"
MODULE_TYPE_OUTPUT
@@ -59,9 +61,18 @@ MODULE_TYPE_OUTPUT
*/
DEF_OMOD_STATIC_DATA
+static int bEchoStdout = 0; /* echo non-failed messages to stdout */
+
typedef struct _instanceData {
+ enum { MD_SLEEP, MD_FAIL, MD_RANDFAIL, MD_ALWAYS_SUSPEND }
+ mode;
+ int bEchoStdout;
int iWaitSeconds;
int iWaitUSeconds; /* milli-seconds (one million of a second, just to make sure...) */
+ int iCurrCallNbr;
+ int iFailFrequency;
+ int iResumeAfter;
+ int iCurrRetries;
} instanceData;
BEGINcreateInstance
@@ -85,19 +96,106 @@ CODESTARTisCompatibleWithFeature
ENDisCompatibleWithFeature
-BEGINtryResume
-CODESTARTtryResume
-ENDtryResume
+/* implement "fail" command in retry processing */
+static rsRetVal doFailOnResume(instanceData *pData)
+{
+ DEFiRet;
-BEGINdoAction
-CODESTARTdoAction
+ dbgprintf("fail retry curr %d, max %d\n", pData->iCurrRetries, pData->iResumeAfter);
+ if(++pData->iCurrRetries == pData->iResumeAfter) {
+ iRet = RS_RET_OK;
+ } else {
+ iRet = RS_RET_SUSPENDED;
+ }
+
+ RETiRet;
+}
+
+
+/* implement "fail" command */
+static rsRetVal doFail(instanceData *pData)
+{
+ DEFiRet;
+
+ dbgprintf("fail curr %d, frquency %d\n", pData->iCurrCallNbr, pData->iFailFrequency);
+ if(pData->iCurrCallNbr++ % pData->iFailFrequency == 0) {
+ pData->iCurrRetries = 0;
+ iRet = RS_RET_SUSPENDED;
+ }
+
+ RETiRet;
+}
+
+
+/* implement "sleep" command */
+static rsRetVal doSleep(instanceData *pData)
+{
+ DEFiRet;
struct timeval tvSelectTimeout;
dbgprintf("sleep(%d, %d)\n", pData->iWaitSeconds, pData->iWaitUSeconds);
tvSelectTimeout.tv_sec = pData->iWaitSeconds;
tvSelectTimeout.tv_usec = pData->iWaitUSeconds; /* milli seconds */
select(0, NULL, NULL, NULL, &tvSelectTimeout);
- //dbgprintf(":omtesting: end doAction(), iRet %d\n", iRet);
+ RETiRet;
+}
+
+
+/* implement "randomfail" command */
+static rsRetVal doRandFail(void)
+{
+ DEFiRet;
+ if((rand() >> 4) < (RAND_MAX >> 5)) { /* rougly same probability */
+ iRet = RS_RET_OK;
+ dbgprintf("omtesting randfail: succeeded this time\n");
+ } else {
+ iRet = RS_RET_SUSPENDED;
+ dbgprintf("omtesting randfail: failed this time\n");
+ }
+ RETiRet;
+}
+
+
+BEGINtryResume
+CODESTARTtryResume
+ dbgprintf("omtesting tryResume() called\n");
+ switch(pData->mode) {
+ case MD_SLEEP:
+ break;
+ case MD_FAIL:
+ iRet = doFailOnResume(pData);
+ break;
+ case MD_RANDFAIL:
+ iRet = doRandFail();
+ break;
+ case MD_ALWAYS_SUSPEND:
+ iRet = RS_RET_SUSPENDED;
+ }
+ dbgprintf("omtesting tryResume() returns iRet %d\n", iRet);
+ENDtryResume
+
+
+BEGINdoAction
+CODESTARTdoAction
+ dbgprintf("omtesting received msg '%s'\n", ppString[0]);
+ switch(pData->mode) {
+ case MD_SLEEP:
+ iRet = doSleep(pData);
+ break;
+ case MD_FAIL:
+ iRet = doFail(pData);
+ break;
+ case MD_RANDFAIL:
+ iRet = doRandFail();
+ case MD_ALWAYS_SUSPEND:
+ iRet = RS_RET_SUSPENDED;
+ }
+
+ if(iRet == RS_RET_OK && pData->bEchoStdout) {
+ fprintf(stdout, "%s", ppString[0]);
+ fflush(stdout);
+ }
+ dbgprintf(":omtesting: end doAction(), iRet %d\n", iRet);
ENDdoAction
@@ -113,7 +211,7 @@ BEGINparseSelectorAct
int i;
uchar szBuf[1024];
CODESTARTparseSelectorAct
-CODE_STD_STRING_REQUESTparseSelectorAct(0)
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* code here is quick and dirty - if you like, clean it up. But keep
* in mind it is just a testing aid ;) -- rgerhards, 2007-12-31
*/
@@ -135,6 +233,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(0)
if(isspace(*p))
++p;
+ dbgprintf("omtesting command: '%s'\n", szBuf);
if(!strcmp((char*) szBuf, "sleep")) {
/* parse seconds */
for(i = 0 ; *p && !isspace(*p) && ((unsigned) i < sizeof(szBuf) - 1) ; ++i) {
@@ -152,12 +251,43 @@ CODE_STD_STRING_REQUESTparseSelectorAct(0)
if(isspace(*p))
++p;
pData->iWaitUSeconds = atoi((char*) szBuf);
- }
- /* once there are other modes, here is the spot to add it! */
- else {
+ pData->mode = MD_SLEEP;
+ } else if(!strcmp((char*) szBuf, "fail")) {
+ /* "fail fail-freqency resume-after"
+ * fail-frequency specifies how often doAction() fails
+ * resume-after speicifes how fast tryResume() should come back with success
+ * all numbers being "times called"
+ */
+ /* parse fail-frequence */
+ for(i = 0 ; *p && !isspace(*p) && ((unsigned) i < sizeof(szBuf) - 1) ; ++i) {
+ szBuf[i] = *p++;
+ }
+ szBuf[i] = '\0';
+ if(isspace(*p))
+ ++p;
+ pData->iFailFrequency = atoi((char*) szBuf);
+ /* parse resume-after */
+ for(i = 0 ; *p && !isspace(*p) && ((unsigned) i < sizeof(szBuf) - 1) ; ++i) {
+ szBuf[i] = *p++;
+ }
+ szBuf[i] = '\0';
+ if(isspace(*p))
+ ++p;
+ pData->iResumeAfter = atoi((char*) szBuf);
+ pData->iCurrCallNbr = 1;
+ pData->mode = MD_FAIL;
+ } else if(!strcmp((char*) szBuf, "randfail")) {
+ pData->mode = MD_RANDFAIL;
+ } else if(!strcmp((char*) szBuf, "always_suspend")) {
+ pData->mode = MD_ALWAYS_SUSPEND;
+ } else {
dbgprintf("invalid mode '%s', doing 'sleep 1 0' - fix your config\n", szBuf);
}
+ pData->bEchoStdout = bEchoStdout;
+ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS,
+ (uchar*)"RSYSLOG_TraditionalForwardFormat"));
+
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -177,6 +307,10 @@ BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionomtestingechostdout", 0, eCmdHdlrBinary, NULL,
+ &bEchoStdout, STD_LOADABLE_MODULE_ID));
+ /* we seed the random-number generator in any case... */
+ srand(time(NULL));
ENDmodInit
/*
* vi:set ai:
diff --git a/runtime/conf.c b/runtime/conf.c
index 27ab8bb4..c776ef46 100644
--- a/runtime/conf.c
+++ b/runtime/conf.c
@@ -1080,7 +1080,7 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction)
dbgprintf("module is incompatible with RepeatedMsgReduction - turned off\n");
pAction->f_ReduceRepeated = 0;
}
- pAction->bEnabled = 1; /* action is enabled */
+ pAction->eState = ACT_STATE_RDY; /* action is enabled */
iNbrActions++; /* one more active action! */
}
break;
diff --git a/runtime/module-template.h b/runtime/module-template.h
index 6f7d877c..3e963199 100644
--- a/runtime/module-template.h
+++ b/runtime/module-template.h
@@ -39,7 +39,8 @@
#define DEF_OMOD_STATIC_DATA \
DEF_MOD_STATIC_DATA \
- DEFobjCurrIf(obj)
+ DEFobjCurrIf(obj) \
+ static __attribute__((unused)) int bCoreSupportsBatching;
#define DEF_IMOD_STATIC_DATA \
DEF_MOD_STATIC_DATA \
DEFobjCurrIf(obj)
@@ -160,6 +161,37 @@ static rsRetVal isCompatibleWithFeature(syslogFeature __attribute__((unused)) eF
RETiRet;\
}
+
+/* beginTransaction()
+ * introduced in v4.3.3 -- rgerhards, 2009-04-27
+ */
+#define BEGINbeginTransaction \
+static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\
+{\
+ DEFiRet;
+
+#define CODESTARTbeginTransaction /* currently empty, but may be extended */
+
+#define ENDbeginTransaction \
+ RETiRet;\
+}
+
+
+/* endTransaction()
+ * introduced in v4.3.3 -- rgerhards, 2009-04-27
+ */
+#define BEGINendTransaction \
+static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\
+{\
+ DEFiRet;
+
+#define CODESTARTendTransaction /* currently empty, but may be extended */
+
+#define ENDendTransaction \
+ RETiRet;\
+}
+
+
/* doAction()
*/
#define BEGINdoAction \
@@ -324,6 +356,18 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\
*pEtryPoint = tryResume;\
}
+
+/* the following definition is queryEtryPt block that must be added
+ * if an output module supports the transactional interface.
+ * rgerhards, 2009-04-27
+ */
+#define CODEqueryEtryPt_TXIF_OMOD_QUERIES \
+ else if(!strcmp((char*) name, "beginTransaction")) {\
+ *pEtryPoint = beginTransaction;\
+ } else if(!strcmp((char*) name, "endTransaction")) {\
+ *pEtryPoint = endTransaction;\
+ }
+
/* the following definition is the standard block for queryEtryPt for INPUT
* modules. This can be used if no specific handling (e.g. to cover version
* differences) is needed.
@@ -393,6 +437,32 @@ finalize_it:\
}
+/* now come some check functions, which enable a standard way of obtaining feature
+ * information from the core. feat is the to-be-tested feature and featVar is a
+ * variable that receives the result (0-not support, 1-supported).
+ * This must be a macro, so that it is put into the output's code. Otherwise, we
+ * would need to rely on a library entry point, which is what we intend to avoid ;)
+ * rgerhards, 2009-04-27
+ */
+#define INITChkCoreFeature(featVar, feat) \
+{ \
+ rsRetVal MACRO_Ret; \
+ rsRetVal (*pQueryCoreFeatureSupport)(int*, unsigned); \
+ int bSupportsIt; \
+ featVar = 0; \
+ MACRO_Ret = pHostQueryEtryPt((uchar*)"queryCoreFeatureSupport", &pQueryCoreFeatureSupport); \
+ if(MACRO_Ret == RS_RET_OK) { \
+ /* found entry point, so let's see if core supports it */ \
+ CHKiRet((*pQueryCoreFeatureSupport)(&bSupportsIt, feat)); \
+ if(bSupportsIt) \
+ featVar = 1; \
+ } else if(MACRO_Ret != RS_RET_ENTRY_POINT_NOT_FOUND) { \
+ ABORT_FINALIZE(MACRO_Ret); /* Something else went wrong, what is not acceptable */ \
+ } \
+}
+
+
+
/* definitions for host API queries */
#define CODEmodInit_QueryRegCFSLineHdlr \
CHKiRet(pHostQueryEtryPt((uchar*)"regCfSysLineHdlr", &omsdRegCFSLineHdlr));
diff --git a/runtime/modules.c b/runtime/modules.c
index 9fdb48e7..bfd87a71 100644
--- a/runtime/modules.c
+++ b/runtime/modules.c
@@ -68,6 +68,21 @@ static modInfo_t *pLoadedModulesLast = NULL; /* tail-pointer */
uchar *pModDir = NULL; /* read-only after startup */
+/* we provide a set of dummy functions for output modules that do not support the
+ * transactional interface. As they do not do this, they commit each message they
+ * receive, and as such the dummies can always return RS_RET_OK without causing
+ * harm. This simplifies things as in action processing we do not need to check
+ * if the transactional entry points exist.
+ */
+static rsRetVal dummyBeginTransaction()
+{
+ return RS_RET_OK;
+}
+static rsRetVal dummyEndTransaction()
+{
+ return RS_RET_OK;
+}
+
#ifdef DEBUG
/* we add some home-grown support to track our users (and detect who does not free us). In
* the long term, this should probably be migrated into debug.c (TODO). -- rgerhards, 2008-03-11
@@ -207,19 +222,38 @@ static void moduleDestruct(modInfo_t *pThis)
}
+/* This enables a module to query the core for specific features.
+ * rgerhards, 2009-04-22
+ */
+static rsRetVal queryCoreFeatureSupport(int *pBool, unsigned uFeat)
+{
+ DEFiRet;
+
+ if((pBool == NULL))
+ ABORT_FINALIZE(RS_RET_PARAM_ERROR);
+
+ *pBool = (uFeat & CORE_FEATURE_BATCHING) ? 1 : 0;
+
+finalize_it:
+ RETiRet;
+}
+
+
/* The following function is the queryEntryPoint for host-based entry points.
* Modules may call it to get access to core interface functions. Please note
* that utility functions can be accessed via shared libraries - at least this
* is my current shool of thinking.
* Please note that the implementation as a query interface allows to take
* care of plug-in interface version differences. -- rgerhards, 2007-07-31
+ * ... but often it better not to use a new interface. So we now add core
+ * functions here that a plugin may request. -- rgerhards, 2009-04-22
*/
static rsRetVal queryHostEtryPt(uchar *name, rsRetVal (**pEtryPoint)())
{
DEFiRet;
if((name == NULL) || (pEtryPoint == NULL))
- return RS_RET_PARAM_ERROR;
+ ABORT_FINALIZE(RS_RET_PARAM_ERROR);
if(!strcmp((char*) name, "regCfSysLineHdlr")) {
*pEtryPoint = regCfSysLineHdlr;
@@ -227,6 +261,8 @@ static rsRetVal queryHostEtryPt(uchar *name, rsRetVal (**pEtryPoint)())
*pEtryPoint = objGetObjInterface;
} else if(!strcmp((char*) name, "OMSRgetSupportedTplOpts")) {
*pEtryPoint = OMSRgetSupportedTplOpts;
+ } else if(!strcmp((char*) name, "queryCoreFeatureSupport")) {
+ *pEtryPoint = queryCoreFeatureSupport;
} else {
*pEtryPoint = NULL; /* to be on the safe side */
ABORT_FINALIZE(RS_RET_ENTRY_POINT_NOT_FOUND);
@@ -402,6 +438,18 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_
localRet = (*pNew->modQueryEtryPt)((uchar*)"doHUP", &pNew->doHUP);
if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND)
ABORT_FINALIZE(localRet);
+
+ localRet = (*pNew->modQueryEtryPt)((uchar*)"beginTransaction", &pNew->mod.om.beginTransaction);
+ if(localRet == RS_RET_MODULE_ENTRY_POINT_NOT_FOUND)
+ pNew->mod.om.beginTransaction = dummyBeginTransaction;
+ else if(localRet != RS_RET_OK)
+ ABORT_FINALIZE(localRet);
+
+ localRet = (*pNew->modQueryEtryPt)((uchar*)"endTransaction", &pNew->mod.om.endTransaction);
+ if(localRet == RS_RET_MODULE_ENTRY_POINT_NOT_FOUND)
+ pNew->mod.om.beginTransaction = dummyEndTransaction;
+ else if(localRet != RS_RET_OK)
+ ABORT_FINALIZE(localRet);
break;
case eMOD_LIB:
break;
diff --git a/runtime/modules.h b/runtime/modules.h
index 372529ee..e33bbbe1 100644
--- a/runtime/modules.h
+++ b/runtime/modules.h
@@ -110,7 +110,9 @@ typedef struct modInfo_s {
struct {/* data for output modules */
/* below: perform the configured action
*/
+ rsRetVal (*beginTransaction)(void*);
rsRetVal (*doAction)(uchar**, unsigned, void*);
+ rsRetVal (*endTransaction)(void*);
rsRetVal (*parseSelectorAct)(uchar**, void**,omodStringRequest_t**);
} om;
struct { /* data for library modules */
diff --git a/runtime/queue.c b/runtime/queue.c
index 4e017e84..c2df928b 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -8,7 +8,11 @@
* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
* if you are getting aquainted to the object.
*
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ * NOTE: as of 2009-04-22, I have begin to remove the qqueue* prefix from static
+ * function names - this makes it really hard to read and does not provide much
+ * benefit, at least I (now) think so...
+ *
+ * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -63,12 +67,13 @@ DEFobjCurrIf(glbl)
/* forward-definitions */
rsRetVal qqueueChkPersist(qqueue_t *pThis);
static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
-static rsRetVal qqueueRateLimiter(qqueue_t *pThis);
+static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
+static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static int qqueueIsIdleDA(qqueue_t *pThis);
-static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
-static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2);
-static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
+static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
+static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2);
+static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -369,9 +374,10 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
@@ -483,9 +489,7 @@ static rsRetVal qDestructFixedArray(qqueue_t *pThis)
ASSERT(pThis != NULL);
queueDrain(pThis); /* discard any remaining queue entries */
-
- if(pThis->tVars.farray.pBuf != NULL)
- free(pThis->tVars.farray.pBuf);
+ free(pThis->tVars.farray.pBuf);
RETiRet;
}
@@ -607,28 +611,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
{
DEFiRet;
-
iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
-#if 0
- qLinkedList_t *pEntry;
-
- ASSERT(pThis != NULL);
- if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
-
- pEntry->pNext = NULL;
- pEntry->pUsr = pUsr;
-
- if(pThis->tVars.linklist.pRoot == NULL) {
- pThis->tVars.linklist.pRoot = pThis->tVars.linklist.pLast = pEntry;
- } else {
- pThis->tVars.linklist.pLast->pNext = pEntry;
- pThis->tVars.linklist.pLast = pEntry;
- }
-
-finalize_it:
-#endif
RETiRet;
}
@@ -636,24 +619,6 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
-#if 0
- qLinkedList_t *pEntry;
-
- ASSERT(pThis != NULL);
- ASSERT(pThis->tVars.linklist.pRoot != NULL);
-
- pEntry = pThis->tVars.linklist.pRoot;
- *ppUsr = pEntry->pUsr;
-
- if(pThis->tVars.linklist.pRoot == pThis->tVars.linklist.pLast) {
- pThis->tVars.linklist.pRoot = NULL;
- pThis->tVars.linklist.pLast = NULL;
- } else {
- pThis->tVars.linklist.pRoot = pEntry->pNext;
- }
- free(pEntry);
-
-#endif
RETiRet;
}
@@ -760,7 +725,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
while(iUngottenObjs > 0) {
/* fill the queue from disk */
CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL));
- qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
+ UngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
--iUngottenObjs; /* one less */
}
@@ -931,6 +896,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
+ aUsrp_t aUsrp;
+ obj_t *pMsgp;
DEFiRet;
ASSERT(pThis != NULL);
@@ -940,8 +907,13 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
* mode the consumer probably has a lot to convey (which get's lost in the other modes
* because they are asynchronous. But direct mode is deliberately synchronous.
* rgerhards, 2008-02-12
+ * We use our knowledge about the aUsrp_t structure below, but without that, we
+ * pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
- iRet = pThis->pConsumer(pThis->pUsr, pUsr);
+ pMsgp = (obj_t*) pUsr;
+ aUsrp.nElem = 1; /* there always is only one in direct mode */
+ aUsrp.pUsrp = &pMsgp;
+ iRet = pThis->pConsumer(pThis->pUsr, &aUsrp);
RETiRet;
}
@@ -961,7 +933,7 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute_
* rgerhards, 2008-01-20
*/
static rsRetVal
-qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
+UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
@@ -991,7 +963,7 @@ qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
* rgerhards, 2008-01-29
*/
static rsRetVal
-qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
+GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
@@ -1047,7 +1019,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
if(pThis->iUngottenObjs > 0) {
- iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
+ iRet = GetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
ATOMIC_DEC(pThis->iQueueSize);
@@ -1275,7 +1247,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1305,6 +1277,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->pConsumer = pConsumer;
pThis->iNumWorkerThreads = iWorkerThreads;
pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
+ pThis->iDeqBatchSize = 8; /* conservative default, should still provide good performance */
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
@@ -1354,7 +1327,7 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static rsRetVal
-qqueueConsumerCancelCleanup(void *arg1, void *arg2)
+ConsumerCancelCleanup(void *arg1, void *arg2)
{
DEFiRet;
@@ -1366,7 +1339,7 @@ qqueueConsumerCancelCleanup(void *arg1, void *arg2)
if(pUsr != NULL) {
/* make sure the data element is not lost */
dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
- CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX));
+ CHKiRet(UngetObj(pThis, pUsr, LOCK_MUTEX));
}
finalize_it:
@@ -1415,38 +1388,68 @@ finalize_it:
}
+/* dequeue as many user points as are available, until we hit the configured
+ * upper limit of pointers.
+ * This must only be called when the queue mutex is LOOKED, otherwise serious
+ * malfunction will happen.
+ */
+static inline rsRetVal
+DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize)
+{
+ int nDequeued;
+ int iQueueSize;
+ void *pUsr;
+ rsRetVal localRet;
+ DEFiRet;
+
+ nDequeued = 0;
+ do {
+dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
+ CHKiRet(qqueueDel(pThis, &pUsr));
+ qqueueChkPersist(pThis); /* is is questionable if we should really need to call this every time... */
+ iQueueSize = qqueueGetOverallQueueSize(pThis);
+
+ /* check if we should discard this element */
+ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr);
+ if(localRet == RS_RET_QUEUE_FULL)
+ continue;
+ else if(localRet != RS_RET_OK)
+ ABORT_FINALIZE(localRet);
+
+ /* all well, use this element */
+ pWti->paUsrp->pUsrp[nDequeued++] = pUsr;
+ } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize);
+
+ //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
+ pWti->paUsrp->nElem = nDequeued;
+ *iRemainingQueueSize = iQueueSize;
+
+finalize_it:
+ RETiRet;
+}
+
+
/* dequeue the queued object for the queue consumers.
* rgerhards, 2008-10-21
+ * I made a radical change - we now dequeue multiple elements, and store these objects in
+ * an array of user pointers. We expect that this increases performance.
+ * rgerhards, 2009-04-22
*/
static rsRetVal
-qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
- void *pUsr;
- int iQueueSize;
- int bRunsDA; /* cache for early mutex release */
+ int iQueueSize = 0; /* keep the compiler happy... */
- /* dequeue element (still protected from mutex) */
- iRet = qqueueDel(pThis, &pUsr);
- qqueueChkPersist(pThis);
- iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */
- bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
-
- /* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY
- * if we could successfully obtain a user pointer. Otherwise, we would bring the
- * cancel cleanup handler into big troubles (and we did ;)). Note that we can
- * NOT set the variable further below, as this may lead to an object leak. We
- * may get cancelled before we reach that part of the code, so the only
- * solution is to do it here. -- rgerhards, 2008-02-27
- */
- if(iRet == RS_RET_OK) {
- pWti->pUsrp = pUsr;
- }
+ /* dequeue element batch (still protected from mutex) */
+ iRet = DequeueConsumableElements(pThis, pWti, &iQueueSize);
/* awake some flow-controlled sources if we can do this right now */
/* TODO: this could be done better from a performance point of view -- do it only if
* we have someone waiting for the condition (or only when we hit the watermark right
* on the nail [exact value]) -- rgerhards, 2008-03-14
+ * now that we dequeue batches of pointers, this is much less an issue...
+ * rgerhards, 2009-04-22
*/
if(iQueueSize < pThis->iFullDlyMrk) {
pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk);
@@ -1456,37 +1459,16 @@ qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
- /* rgerhards, 2008-09-30: I reversed the order of cond_signal und mutex_unlock
- * as of the pthreads recommendation on predictable scheduling behaviour. I don't see
- * any problems caused by this, but I add this comment in case some will be seen
- * in the next time.
- */
pthread_cond_signal(&pThis->notFull);
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
- /* do actual processing (the lengthy part, runs in parallel)
- * If we had a problem while dequeing, we do not call the consumer,
- * but we otherwise ignore it. This is in the hopes that it will be
- * self-healing. However, this is really not a good thing.
- * rgerhards, 2008-01-03
- */
- if(iRet != RS_RET_OK)
- FINALIZE;
-
- /* we are running in normal, non-disk-assisted mode do a quick check if we need to drain the queue.
- * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
- * provide real-time creation of spool files.
- * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong.
- */
- CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
-
-finalize_it:
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
dbgoprint((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things "
"may happen\n", iRet);
}
+
RETiRet;
}
@@ -1529,7 +1511,7 @@ finalize_it:
* but you get the idea from the code above.
*/
static rsRetVal
-qqueueRateLimiter(qqueue_t *pThis)
+RateLimiter(qqueue_t *pThis)
{
DEFiRet;
int iDelay;
@@ -1592,19 +1574,20 @@ qqueueRateLimiter(qqueue_t *pThis)
* rgerhards, 2008-01-21
*/
static rsRetVal
-qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
+ CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
*/
+//TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that...
if(pThis->iDeqSlowdown) {
dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n",
pThis->iDeqSlowdown);
@@ -1616,7 +1599,7 @@ finalize_it:
}
-/* This is a special consumer to feed the disk-queue in disk-assited mode.
+/* This is a special consumer to feed the disk-queue in disk-assisted mode.
* When active, our own queue more or less acts as a memory buffer to the disk.
* So this consumer just needs to drain the memory queue and submit entries
* to the disk queue. The disk queue will then call the actual consumer from
@@ -1626,15 +1609,18 @@ finalize_it:
* rgerhards, 2008-01-14
*/
static rsRetVal
-qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
+ CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ /* iterate over returned results and enqueue them in DA queue */
+ for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i]));
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
@@ -1692,12 +1678,24 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
* the DA queue
*/
static int
-qqueueChkStopWrkrReg(qqueue_t *pThis)
+ChkStooWrkrReg(qqueue_t *pThis)
{
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0);
}
+/* return the configured "deq max at once" interval
+ * rgerhards, 2009-04-22
+ */
+static rsRetVal
+GetDeqBatchSize(qqueue_t *pThis, int *pVal)
+{
+ DEFiRet;
+ assert(pVal != NULL);
+ *pVal = pThis->iDeqBatchSize;
+ RETiRet;
+}
+
/* must only be called when the queue mutex is locked, else results
* are not stable! DA queue version
*/
@@ -1712,7 +1710,7 @@ qqueueIsIdleDA(qqueue_t *pThis)
* are not stable! Regular queue version
*/
static int
-qqueueIsIdleReg(qqueue_t *pThis)
+IsIdleReg(qqueue_t *pThis)
{
#if 0 /* enable for performance testing */
int ret;
@@ -1740,7 +1738,7 @@ qqueueIsIdleReg(qqueue_t *pThis)
* I am telling this, because I, too, always get confused by those...
*/
static rsRetVal
-qqueueRegOnWrkrShutdown(qqueue_t *pThis)
+RegOnWrkrShutdown(qqueue_t *pThis)
{
DEFiRet;
@@ -1761,7 +1759,7 @@ qqueueRegOnWrkrShutdown(qqueue_t *pThis)
* hook to indicate in the parent queue (if we are a child) that we are not done yet.
*/
static rsRetVal
-qqueueRegOnWrkrStartup(qqueue_t *pThis)
+RegOnWrkrStartup(qqueue_t *pThis)
{
DEFiRet;
@@ -1815,10 +1813,10 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, "
- "full delay %d, light delay %d starting\n",
+ "full delay %d, light delay %d, deq batch size %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1,
- pThis->iFullDlyMrk, pThis->iLightDlyMrk);
+ pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
@@ -1829,13 +1827,14 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
- CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown));
+ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg));
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
@@ -1945,7 +1944,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
* to the regular files. -- rgerhards, 2008-01-29
*/
while(pThis->iUngottenObjs > 0) {
- CHKiRet(qqueueGetUngottenObj(pThis, &pUsr));
+ CHKiRet(GetUngottenObj(pThis, &pUsr));
CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
objDestruct(pUsr);
}
@@ -2061,11 +2060,8 @@ CODESTARTobjDestruct(qqueue)
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
- if(pThis->pszFilePrefix != NULL)
- free(pThis->pszFilePrefix);
-
- if(pThis->pszSpoolDir != NULL)
- free(pThis->pszSpoolDir);
+ free(pThis->pszFilePrefix);
+ free(pThis->pszSpoolDir);
ENDobjDestruct(qqueue)
@@ -2079,8 +2075,8 @@ qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
{
DEFiRet;
- if(pThis->pszFilePrefix != NULL)
- free(pThis->pszFilePrefix);
+ free(pThis->pszFilePrefix);
+ pThis->pszFilePrefix = NULL;
if(pszPrefix == NULL) /* just unset the prefix! */
ABORT_FINALIZE(RS_RET_OK);
@@ -2296,6 +2292,7 @@ DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
DEFpropSetMeth(qqueue, pUsr, void*)
DEFpropSetMeth(qqueue, iDeqSlowdown, int)
+DEFpropSetMeth(qqueue, iDeqBatchSize, int)
DEFpropSetMeth(qqueue, sizeOnDiskMax, int64)
diff --git a/runtime/queue.h b/runtime/queue.h
index a267862d..8a60254b 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -54,6 +54,7 @@ typedef struct qWrkThrd_s {
pthread_mutex_t mut;
} qWrkThrd_t; /* type for queue worker threads */
+
/* the queue object */
typedef struct queue_s {
BEGINobjInstance;
@@ -84,6 +85,7 @@ typedef struct queue_s {
int toActShutdown; /* timeout for long-running action shutdown in ms */
int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
int toEnq; /* enqueue timeout */
+ int iDeqBatchSize; /* max number of elements that shall be dequeued at once */
/* rate limiting settings (will be expanded) */
int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */
/* end rate limiting */
@@ -97,12 +99,11 @@ typedef struct queue_s {
* applied to detect user configuration errors (and tell me how should we detect what
* the user really wanted...). -- rgerhards, 2008-04-02
*/
- /* ane dequeue time window */
- rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */
+ /* end dequeue time window */
+ rsRetVal (*pConsumer)(void *,aUsrp_t*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
- * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer
- * to message)
- * rgerhards, 2008-01-28
+ * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
+ * is pointer to an array of message message pointers)
*/
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
@@ -183,7 +184,7 @@ rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*));
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*));
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int);
@@ -201,6 +202,7 @@ PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int);
PROTOTYPEpropSetMeth(qqueue, pUsr, void*);
PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int);
PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64);
+PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int);
#define qqueueGetID(pThis) ((unsigned long) pThis)
#endif /* #ifndef QUEUE_H_INCLUDED */
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index fd5a5371..ee941b2b 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -58,8 +58,18 @@
#endif
+/* the rsyslog core provides information about present feature to plugins
+ * asking it. Below are feature-test macros which must be used to query
+ * features. Note that this must be powers of two, so that multiple queries
+ * can be combined. -- rgerhards, 2009-04-27
+ */
+#define CORE_FEATURE_BATCHING 1
+/*#define CORE_FEATURE_whatever 2 ... and so on ... */
+
+
/* define some base data types */
typedef unsigned char uchar;/* get rid of the unhandy "unsigned char" */
+typedef struct aUsrp_s aUsrp_t;
typedef struct thrdInfo thrdInfo_t;
typedef struct obj_s obj_t;
typedef struct filed selector_t;/* TODO: this so far resides in syslogd.c, think about modularization */
@@ -267,6 +277,8 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_ERR_FORK = -2118, /**< error during fork() */
RS_RET_ERR_WRITE_PIPE = -2119, /**< error writing to pipe */
RS_RET_RSCORE_TOO_OLD = -2120, /**< rsyslog core is too old for ... (eg this plugin) */
+ RS_RET_DEFER_COMMIT = -2121, /**< output plugin status: not yet committed (an OK state!) */
+ RS_RET_PREVIOUS_COMMITTED = -2122, /**< output plugin status: previous record was committed (an OK state!) */
RS_RET_FILENAME_INVALID = -2140, /**< filename invalid, not found, no access, ... */
/* RainerScript error messages (range 1000.. 1999) */
diff --git a/runtime/wti.c b/runtime/wti.c
index 544bffa7..346ef7aa 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -201,8 +201,9 @@ CODESTARTobjDestruct(wti)
pthread_cond_destroy(&pThis->condExitDone);
pthread_mutex_destroy(&pThis->mut);
- if(pThis->pszDbgHdr != NULL)
- free(pThis->pszDbgHdr);
+ free(pThis->paUsrp->pUsrp);
+ free(pThis->paUsrp);
+ free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -222,15 +223,21 @@ rsRetVal
wtiConstructFinalize(wti_t *pThis)
{
DEFiRet;
+ int iDeqBatchSize;
ISOBJ_TYPE_assert(pThis, wti);
dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
/* initialize our thread instance descriptor */
- pThis->pUsrp = NULL;
pThis->tCurrCmd = eWRKTHRD_STOPPED;
+ /* we now alloc the array for user pointers. We obtain the max from the queue itself. */
+ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
+ CHKmalloc(pThis->paUsrp = calloc(1, sizeof(aUsrp_t)));
+ CHKmalloc(pThis->paUsrp->pUsrp = calloc((size_t)iDeqBatchSize, sizeof(void*)));
+
+finalize_it:
RETiRet;
}
@@ -314,7 +321,8 @@ wtiWorkerCancelCleanup(void *arg)
DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
/* call user supplied handler (that one e.g. requeues the element) */
- pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp);
+// MULTIQUEUE: need to change here!
+ pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->paUsrp->pUsrp[0]);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pWtp->mut);
@@ -366,7 +374,7 @@ wtiWorker(wti_t *pThis)
ISOBJ_TYPE_assert(pWtp, wtp);
dbgSetThrdName(pThis->pszDbgHdr);
- pThis->pUsrp = NULL;
+ pThis->paUsrp->nElem = 0; /* flag no elements present */ // MULTIQUEUE: do we really need this any longer (cnacel handeler)?
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
diff --git a/runtime/wti.h b/runtime/wti.h
index 6b60b833..85c98fe6 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -1,6 +1,6 @@
/* Definition of the worker thread instance (wti) class.
*
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008, 2009 by Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -28,13 +28,25 @@
#include "wtp.h"
#include "obj.h"
+/* the user pointer array object
+ * This object is used to dequeue multiple user pointers which are than handed over
+ * to processing. The size of elements is fixed after queue creation, but may be
+ * modified by config variables (better said: queue properties).
+ * rgerhards, 2009-04-22
+ */
+struct aUsrp_s {
+ int nElem; /* actual number of element in this entry */
+ obj_t **pUsrp; /* actual elements (array!) */
+};
+
+
/* the worker thread instance class */
typedef struct wti_s {
BEGINobjInstance;
int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */
pthread_t thrdID; /* thread ID */
qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
- obj_t *pUsrp; /* pointer to an object meaningful for current user pointer (e.g. queue pUsr data elemt) */
+ aUsrp_t *paUsrp; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
pthread_cond_t condExitDone; /* signaled when the thread exit is done (once per thread existance) */
pthread_mutex_t mut;
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 04eb974f..9891a55c 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -78,7 +78,7 @@ wtpGetDbgHdr(wtp_t *pThis)
/* Not implemented dummy function for constructor */
-static rsRetVal NotImplementedDummy() { return RS_RET_OK; }
+static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; }
/* Standard-Constructor for the wtp object
*/
BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */
@@ -88,6 +88,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pthread_cond_init(&pThis->condThrdTrm, NULL);
/* set all function pointers to "not implemented" dummy so that we can safely call them */
pThis->pfChkStopWrkr = NotImplementedDummy;
+ pThis->pfGetDeqBatchSize = NotImplementedDummy;
pThis->pfIsIdle = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
pThis->pfOnIdle = NotImplementedDummy;
@@ -117,7 +118,7 @@ wtpConstructFinalize(wtp_t *pThis)
*/
if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
-
+
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
CHKiRet(wtiConstruct(&pThis->pWrkr[i]));
pWti = pThis->pWrkr[i];
@@ -151,8 +152,7 @@ CODESTARTobjDestruct(wtp)
pthread_mutex_destroy(&pThis->mut);
pthread_mutex_destroy(&pThis->mutThrdShutdwn);
- if(pThis->pszDbgHdr != NULL)
- free(pThis->pszDbgHdr);
+ free(pThis->pszDbgHdr);
ENDobjDestruct(wtp)
@@ -584,6 +584,7 @@ DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t)
DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t)
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
+DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int))
DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
diff --git a/runtime/wtp.h b/runtime/wtp.h
index b9cb07c5..88bd9197 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -67,10 +67,11 @@ typedef struct wtp_s {
int bThrdStateChanged; /* at least one thread state has changed if 1 */
/* end sync variables */
/* user objects */
- void *pUsr; /* pointer to user object */
+ void *pUsr; /* pointer to user object (in this case, the queue the wtp belongs to) */
pthread_mutex_t *pmutUsr;
pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */
rsRetVal (*pfChkStopWrkr)(void *pUsr, int);
+ rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */
rsRetVal (*pfRateLimiter)(void *pUsr);
rsRetVal (*pfIsIdle)(void *pUsr, int);
rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int);
@@ -104,6 +105,7 @@ int wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex);
PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));
+PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*));
PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int));
PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 0f4cbce1..b4509dee 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -1,6 +1,6 @@
TESTRUNS = rt_init rscript
check_PROGRAMS = $(TESTRUNS) ourtail nettester tcpflood chkseq
-TESTS = $(TESTRUNS) cfg.sh parsertest.sh omod-if-array.sh manytcp.sh diskqueue.sh
+TESTS = $(TESTRUNS) cfg.sh parsertest.sh omod-if-array.sh da-mainmsg-q.sh diskqueue.sh manytcp.sh
TESTS_ENVIRONMENT = RSYSLOG_MODDIR='$(abs_top_builddir)'/runtime/.libs/
DISTCLEANFILES=rsyslog.pid
test_files = testbench.h runtime-dummy.c
@@ -29,6 +29,8 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
parsertest.sh \
diskqueue.sh \
testsuites/diskqueue.conf \
+ da-mainmsg-q.sh \
+ testsuites/da-mainmsg-q.conf \
manytcp.sh \
testsuites/manytcp.conf \
omod-if-array.sh \
diff --git a/tests/da-mainmsg-q.sh b/tests/da-mainmsg-q.sh
new file mode 100755
index 00000000..2ea6278e
--- /dev/null
+++ b/tests/da-mainmsg-q.sh
@@ -0,0 +1,62 @@
+# Test for DA mode on the main message queue
+# This test checks if DA mode operates correctly. To do so,
+# it uses a small in-memory queue size, so that DA mode is initiated
+# rather soon, and disk spooling used. There is some uncertainty (based
+# on machine speeds), but in general the test should work rather well.
+# We add a few messages after the initial run, just so that we can
+# check everything recovers from DA mode correctly.
+# added 2009-04-22 by Rgerhards
+# This file is part of the rsyslog project, released under GPLv3
+echo "testing main message queue in DA mode (going to disk)"
+rm -f work rsyslog.out.log
+rm -rf test-spool
+mkdir test-spool
+rm -f work rsyslog.out.log rsyslog.out.log.save # work files
+../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/da-mainmsg-q.conf &
+sleep 1
+echo "rsyslogd started with pid " `cat rsyslog.pid`
+#
+# part1: send first 50 messages (in memory, only)
+#
+./tcpflood 127.0.0.1 13514 1 50
+if [ "$?" -ne "0" ]; then
+ echo "error during tcpflood! see rsyslog.out.log.save for what was written"
+ cp rsyslog.out.log rsyslog.out.log.save
+fi
+ls -l test-spool
+sleep 1 # we need this so that rsyslogd can receive all outstanding messages
+#
+# part 2: send bunch of messages. This should trigger DA mode
+#
+# 20000 messages should be enough - the disk test is slow enough ;)
+./tcpflood 127.0.0.1 13514 2 20000 50
+if [ "$?" -ne "0" ]; then
+ echo "error during tcpflood! see rsyslog.out.log.save for what was written"
+ cp rsyslog.out.log rsyslog.out.log.save
+fi
+ls -l test-spool
+sleep 5 # we need this so that rsyslogd can receive all outstanding messages
+#
+# send another handful
+#
+ls -l test-spool
+./tcpflood 127.0.0.1 13514 1 50 20050
+if [ "$?" -ne "0" ]; then
+ echo "error during tcpflood! see rsyslog.out.log.save for what was written"
+ cp rsyslog.out.log rsyslog.out.log.save
+fi
+sleep 1 # we need this so that rsyslogd can receive all outstanding messages
+#
+# clean up and check test result
+#
+kill `cat rsyslog.pid`
+rm -f work
+sort < rsyslog.out.log > work
+./chkseq work 0 20099
+if [ "$?" -ne "0" ]; then
+ # rm -f work rsyslog.out.log
+ echo "sequence error detected"
+ exit 1
+fi
+rm -f work rsyslog.out.log
+rm -rf test-spool
diff --git a/tests/tcpflood.c b/tests/tcpflood.c
index 8dbc201b..2ca796ca 100644
--- a/tests/tcpflood.c
+++ b/tests/tcpflood.c
@@ -6,6 +6,7 @@
* argv[2] target port
* argv[3] number of connections
* argv[4] number of messages to send (connection is random)
+ * argv[5] initial message number (optional)
*
* Part of the testbench for rsyslog.
*
@@ -51,6 +52,7 @@ static int targetPort;
static int numMsgsToSend; /* number of messages to send */
static int numConnections; /* number of connections to create */
static int *sockArray; /* array of sockets to use */
+static int msgNum = 0; /* initial message number to start with */
/* open a single tcp connection
@@ -154,8 +156,6 @@ int sendMessages(void)
int lenBuf;
int lenSend;
char buf[2048];
- char msgBuf[128];
- size_t lenMsg;
srand(time(NULL)); /* seed is good enough for our needs */
@@ -168,19 +168,20 @@ int sendMessages(void)
socknum = i - (numMsgsToSend - numConnections);
else
socknum = rand() % numConnections;
- lenBuf = sprintf(buf, "<167>Mar 1 01:00:00 172.20.245.8 tag msgnum:%8.8d:\n", i);
+ lenBuf = sprintf(buf, "<167>Mar 1 01:00:00 172.20.245.8 tag msgnum:%8.8d:\n", msgNum);
lenSend = send(sockArray[socknum], buf, lenBuf, 0);
if(lenSend != lenBuf) {
printf("\r%5.5d\n", i);
fflush(stdout);
perror("send test data");
- printf("send() failed at socket %d, index %d\n", socknum, i);
+ printf("send() failed at socket %d, index %d, msgNum %d\n", socknum, i, msgNum);
fflush(stderr);
return(1);
}
if(i % 100 == 0) {
printf("\r%5.5d", i);
}
+ ++msgNum;
}
printf("\r%5.5d messages sent\n", i);
@@ -262,9 +263,9 @@ int main(int argc, char *argv[])
setvbuf(stdout, buf, _IONBF, 48);
- if(argc != 5) {
+ if(argc != 5 && argc != 6) {
printf("Invalid call of tcpflood\n");
- printf("Usage: tcpflood target-host target-port num-connections num-messages\n");
+ printf("Usage: tcpflood target-host target-port num-connections num-messages [initial msgnum]\n");
exit(1);
}
@@ -272,6 +273,8 @@ int main(int argc, char *argv[])
targetPort = atoi(argv[2]);
numConnections = atoi(argv[3]);
numMsgsToSend = atoi(argv[4]);
+ if(argc == 6)
+ msgNum = atoi(argv[5]);
if(openConnections() != 0) {
printf("error opening connections\n");
diff --git a/tests/testsuites/da-mainmsg-q.conf b/tests/testsuites/da-mainmsg-q.conf
new file mode 100644
index 00000000..fbda27d4
--- /dev/null
+++ b/tests/testsuites/da-mainmsg-q.conf
@@ -0,0 +1,21 @@
+# Test for DA mode in main message queue (see .sh file for details)
+# rgerhards, 2009-04-22
+$ModLoad ../plugins/imtcp/.libs/imtcp
+$MainMsgQueueTimeoutShutdown 10000
+$InputTCPServerRun 13514
+
+$ErrorMessagesToStderr off
+
+# set spool locations and switch queue to disk assisted mode
+$WorkDirectory test-spool
+$MainMsgQueueSize 200 # this *should* trigger moving on to DA mode...
+# note: we must set QueueSize sufficiently high, so that 70% (light delay mark)
+# is high enough above HighWatermark!
+$MainMsgQueueHighWatermark 80
+$MainMsgQueueLowWatermark 40
+$MainMsgQueueFilename mainq
+$MainMsgQueueType linkedlist
+
+$template outfmt,"%msg:F,58:2%\n"
+$template dynfile,"rsyslog.out.log" # trick to use relative path names!
+:msg, contains, "msgnum:" ?dynfile;outfmt
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 3a751a30..866c0173 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -129,6 +129,7 @@
#include "omfile.h"
#include "omdiscard.h"
#include "threads.h"
+#include "wti.h"
#include "queue.h"
#include "stream.h"
#include "conf.h"
@@ -304,6 +305,7 @@ static int iMainMsgQtoWrkShutdown = 60000; /* timeout for worker thread shutdo
static int iMainMsgQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
static int iMainMsgQDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
static int64 iMainMsgQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */
+static int iMainMsgQueDeqBatchSize = 32; /* dequeue batch size */
static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
static int iMainMsgQueueDeqtWinFromHr = 0; /* hour begin of time frame when queue is to be dequeued */
static int iMainMsgQueueDeqtWinToHr = 25; /* hour begin of time frame when queue is to be dequeued */
@@ -370,6 +372,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
bMainMsgQSaveOnShutdown = 1;
MainMsgQueType = QUEUETYPE_FIXED_ARRAY;
iMainMsgQueMaxDiskSpace = 0;
+ iMainMsgQueDeqBatchSize = 32;
glbliActionResumeRetryCount = 0;
return RS_RET_OK;
@@ -1203,22 +1206,29 @@ processMsg(msg_t *pMsg)
/* The consumer of dequeued messages. This function is called by the
* queue engine on dequeueing of a message. It runs on a SEPARATE
- * THREAD.
- * Please note: the message object is destructed by the queue itself!
+ * THREAD. It receives an array of pointers, which it must iterate
+ * over. We do not do any further batching, as this is of no benefit
+ * for the main queue.
*/
static rsRetVal
-msgConsumer(void __attribute__((unused)) *notNeeded, void *pUsr)
+msgConsumer(void __attribute__((unused)) *notNeeded, aUsrp_t *paUsrp)
{
+ int i;
+ msg_t *pMsg;
DEFiRet;
- msg_t *pMsg = (msg_t*) pUsr;
- assert(pMsg != NULL);
+ assert(paUsrp != NULL);
- if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
- parseMsg(pMsg);
+ for(i = 0 ; i < paUsrp->nElem ; i++) {
+ pMsg = (msg_t*) paUsrp->pUsrp[i];
+dbgprintf("msgConsumer..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
+ if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
+ parseMsg(pMsg);
+ }
+ processMsg(pMsg);
+ msgDestruct(&pMsg);
}
- processMsg(pMsg);
- msgDestruct(&pMsg);
+dbgprintf("DONE msgConsumer..MULTIQUEUE:\n");
RETiRet;
}
@@ -2683,6 +2693,7 @@ init(void)
setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize);
setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace);
+ setQPROP(qqueueSetiDeqBatchSize, "$MainMsgQueueDequeueBatchSize", iMainMsgQueDeqBatchSize);
setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName);
setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt);
setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown );
@@ -3063,6 +3074,7 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iMainMsgQDeqSlowdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeuebatchsize", 0, eCmdHdlrSize, NULL, &iMainMsgQueDeqBatchSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxDiskSpace, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iMainMsgQueueDeqtWinFromHr, NULL));