diff options
-rw-r--r-- | ChangeLog | 2 | ||||
-rw-r--r-- | action.c | 139 | ||||
-rw-r--r-- | action.h | 19 | ||||
-rw-r--r-- | tests/Makefile.am | 14 | ||||
-rwxr-xr-x | tests/diag.sh | 4 | ||||
-rwxr-xr-x | tests/failover-async.sh | 12 | ||||
-rwxr-xr-x | tests/failover-basic.sh | 12 | ||||
-rwxr-xr-x | tests/failover-no-basic.sh | 19 | ||||
-rwxr-xr-x | tests/failover-no-rptd.sh | 19 | ||||
-rwxr-xr-x | tests/failover-rptd.sh | 12 | ||||
-rw-r--r-- | tests/testsuites/failover-async.conf | 9 | ||||
-rw-r--r-- | tests/testsuites/failover-basic.conf | 8 | ||||
-rw-r--r-- | tests/testsuites/failover-no-basic.conf | 9 | ||||
-rw-r--r-- | tests/testsuites/failover-no-rptd.conf | 9 | ||||
-rw-r--r-- | tests/testsuites/failover-rptd.conf | 10 | ||||
-rw-r--r-- | tools/syslogd.c | 2 |
16 files changed, 247 insertions, 52 deletions
@@ -130,6 +130,8 @@ Version 5.7.0 [V5-DEVEL] (rgerhards), 2010-09-16 * sd-systemd API added as part of rsyslog runtime library --------------------------------------------------------------------------- Version 5.6.5 [V5-STABLE] (rgerhards), 2011-03-?? +- bugfix: failover did not work correctly if repeated msg reduction was on + affected directive was: $ActionExecOnlyWhenPreviousIsSuspended on - bugfix: omlibdbi did not use password from rsyslog.con closes: http://bugzilla.adiscon.com/show_bug.cgi?id=203 - bugfix(kind of): tell users that config graph can currently not be @@ -4,7 +4,44 @@ * * File begun on 2007-08-06 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH. + * Some notes on processing (this hopefully makes it easier to find + * the right code in question): For performance reasons, this module + * uses different methods of message submission based on the user-selected + * configuration. This code is similar, but can not be abstracted because + * of the performanse-affecting differences in it. As such, it is often + * necessary to triple-check that everything works well in *all* modes. + * The different modes (and calling sequence) are: + * + * if set iExecEveryNthOccur > 1 || f_ReduceRepeated || iSecsExecOnceInterval + * - doSubmitToActionQComplexBatch + * - helperSubmitToActionQComplexBatch + * - doActionCallAction + * handles duplicate message processing, but in essence calls + * - actionWriteToAction + * - qqueueEnqObj + * (now queue engine processing) + * if(pThis->bWriteAllMarkMsgs == FALSE) - this is the DEFAULT + * - doSubmitToActionQNotAllMarkBatch + * - doSubmitToActionQBatch (and from here like in the else case below!) + * else + * - doSubmitToActionQBatch + * - doSubmitToActionQ + * - qqueueEnqObj + * (now queue engine processing) + * + * Note that bWriteAllMakrMsgs on or off creates almost the same processing. + * The difference ist that if WriteAllMarkMsgs is not set, we need to + * preprocess the batch and drop mark messages which are not yet due for + * writing. + * + * After dequeue, processing is as follows: + * - processBatchMain + * - processAction + * - submitBatch + * - tryDoAction + * - + * + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -899,7 +936,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) if(*(pBatch->pbShutdownImmediate)) ABORT_FINALIZE(RS_RET_FORCE_TERM); if( pBatch->pElem[i].bFilterOK - && pBatch->pElem[i].state != BATCH_STATE_DISC + && pBatch->pElem[i].state != BATCH_STATE_DISC//) { && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, @@ -1164,11 +1201,33 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT } +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. This is also utilized to submit messages in complex case once + * the complex logic has been applied ;) + * rgerhards, 2010-06-08 + */ +static inline rsRetVal +doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); + else + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + + RETiRet; +} + + /* This function builds up a batch of messages to be (later) * submitted to the action queue. + * Note: this function is also called from syslogd itself as part of its + * flush processing. If so, pBatch will be NULL and idxBtch undefined. */ rsRetVal -actionWriteToAction(action_t *pAction) +actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch) { msg_t *pMsgSave; /* to save current message pointer, necessary to restore it in case it needs to be updated (e.g. repeated msgs) */ @@ -1239,7 +1298,7 @@ actionWriteToAction(action_t *pAction) pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */ } - DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); + DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); /* now check if we need to drop the message because otherwise the action would be too * frequently called. -- rgerhards, 2008-04-08 @@ -1257,14 +1316,43 @@ actionWriteToAction(action_t *pAction) FINALIZE; } - /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ + /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) + * rgerhards, 2008-09-17 */ pAction->tLastExec = getActNow(pAction); /* re-init time flags */ pAction->f_time = pAction->f_pMsg->ttGenTime; /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg)); + if( pBatch != NULL + && (pAction->bExecWhenPrevSusp == 1 && pBatch->pElem[idxBtch].bPrevWasSuspended)) { + /* in that case, we need to create a special batch which reflects the + * suspended state. Otherwise, that information would be dropped inside + * the queue engine. TODO: in later releases (v6?) create a better + * solution than what we do here. However, for v5 this sounds much too + * intrusive. -- rgerhardsm, 2011-03-16 + * (Code is copied over from queue.c and slightly modified) + */ + batch_t singleBatch; + batch_obj_t batchObj; + int i; + memset(&batchObj, 0, sizeof(batch_obj_t)); + memset(&singleBatch, 0, sizeof(batch_t)); + batchObj.state = BATCH_STATE_RDY; + batchObj.pUsrp = (obj_t*) pAction->f_pMsg; + batchObj.bPrevWasSuspended = 1; + batchObj.bFilterOK = 1; + singleBatch.nElem = 1; /* there always is only one in direct mode */ + singleBatch.pElem = &batchObj; + + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, &singleBatch); + + for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { + free(batchObj.staticActStrings[i]); + } + } else { /* standard case, just submit */ + iRet = doSubmitToActionQ(pAction, pAction->f_pMsg); + } if(iRet == RS_RET_OK) pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ @@ -1292,10 +1380,12 @@ finalize_it: * pthread_cleanup_push() POSIX macro... */ static inline rsRetVal -doActionCallAction(action_t *pAction, msg_t *pMsg) +doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) { + msg_t *pMsg; DEFiRet; + pMsg = (msg_t*)(pBatch->pElem[idxBtch].pUsrp); pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ /* don't output marks to recently written outputs */ @@ -1322,7 +1412,7 @@ doActionCallAction(action_t *pAction, msg_t *pMsg) * isolated messages), but back off so we'll flush less often in the future. */ if(getActNow(pAction) > REPEATTIME(pAction)) { - iRet = actionWriteToAction(pAction); + iRet = actionWriteToAction(pAction, pBatch, idxBtch); BACKOFF(pAction); } } else {/* new message, save it */ @@ -1331,7 +1421,7 @@ doActionCallAction(action_t *pAction, msg_t *pMsg) */ if(pAction->f_pMsg != NULL) { if(pAction->f_prevcount > 0) - actionWriteToAction(pAction); + actionWriteToAction(pAction, pBatch, idxBtch); /* we do not care about iRet above - I think it's right but if we have * some troubles, you know where to look at ;) -- rgerhards, 2007-08-01 */ @@ -1339,33 +1429,21 @@ doActionCallAction(action_t *pAction, msg_t *pMsg) } pAction->f_pMsg = MsgAddRef(pMsg); /* call the output driver */ - iRet = actionWriteToAction(pAction); + iRet = actionWriteToAction(pAction, pBatch, idxBtch); } finalize_it: - RETiRet; -} - -/* This submits the message to the action queue in case we do NOT need to handle repeat - * message processing. That case permits us to gain lots of freedom during processing - * and thus speed. - * rgerhards, 2010-06-08 - */ -static inline rsRetVal -doSubmitToActionQ(action_t *pAction, msg_t *pMsg) -{ - DEFiRet; - - if(pAction->pQueue->qType == QUEUETYPE_DIRECT) - iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); - else - iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + /* we need to update the batch to handle failover processing correctly */ + if(iRet == RS_RET_OK) { + pBatch->pElem[idxBtch].bPrevWasSuspended = 0; + } else if(iRet == RS_RET_ACTION_FAILED) { + pBatch->pElem[idxBtch].bPrevWasSuspended = 1; + } RETiRet; } - /* This submits the message to the action queue in case where we need to handle * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop * for the synchronization. Here, we just modify the filter condition to be false when @@ -1482,8 +1560,9 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { if( pBatch->pElem[i].bFilterOK - && pBatch->pElem[i].state != BATCH_STATE_DISC) { - doActionCallAction(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + && pBatch->pElem[i].state != BATCH_STATE_DISC + && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { + doActionCallAction(pAction, pBatch, i); } } @@ -100,26 +100,9 @@ rsRetVal actionDestruct(action_t *pThis); rsRetVal actionDbgPrint(action_t *pThis); rsRetVal actionSetGlobalResumeInterval(int iNewVal); rsRetVal actionDoAction(action_t *pAction); -rsRetVal actionWriteToAction(action_t *pAction); +rsRetVal actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch); rsRetVal actionCallHUPHdlr(action_t *pAction); rsRetVal actionClassInit(void); rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, int bSuspended); -#if 1 -#define actionIsSuspended(pThis) ((pThis)->bSuspended == 1) -#else -/* The function is a debugging aid */ -inline int actionIsSuspended(action_t *pThis) -{ - int i; - ASSERT(pThis != NULL); - i = pThis->bSuspended == 1; - dbgprintf("in IsSuspend(), returns %d\n", i); - return i; -} -#endif - #endif /* #ifndef ACTION_H_INCLUDED */ -/* - * vi:set ai: - */ diff --git a/tests/Makefile.am b/tests/Makefile.am index c64d44e9..c1818f36 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -52,6 +52,11 @@ TESTS += \ discard-rptdmsg.sh \ discard-allmark.sh \ discard.sh \ + failover-basic.sh \ + failover-rptd.sh \ + failover-no-rptd.sh \ + failover-no-basic.sh \ + queue-persist.sh queue-persist.sh \ linkedlistqueue.sh endif @@ -80,7 +85,6 @@ TESTS += \ endif endif - if ENABLE_IMPTCP TESTS += \ manyptcp.sh \ @@ -260,6 +264,14 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \ omod-if-array.sh \ discard.sh \ testsuites/discard.conf \ + failover-no-rptd.sh \ + testsuites/failover-no-rptd.conf \ + failover-no-basic.sh \ + testsuites/failover-no-basic.conf \ + failover-rptd.sh \ + testsuites/failover-rptd.conf \ + failover-basic.sh \ + testsuites/failover-basic.conf \ discard-rptdmsg.sh \ discard-rptdmsg-vg.sh \ testsuites/discard-rptdmsg.conf \ diff --git a/tests/diag.sh b/tests/diag.sh index 82ff4054..8fbe42ed 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -10,7 +10,7 @@ #valgrind="valgrind --tool=helgrind --log-fd=1" #valgrind="valgrind --tool=exp-ptrcheck --log-fd=1" #set -o xtrace -#export RSYSLOG_DEBUG="debug nostdout" +#export RSYSLOG_DEBUG="debug nologfuncflow nostdout" #export RSYSLOG_DEBUGLOG="log" case $1 in 'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason @@ -31,7 +31,7 @@ case $1 in rm -f work rsyslog.out.log rsyslog2.out.log rsyslog.out.log.save # common work files rm -rf test-spool test-logdir rm -f rsyslog.out.*.log rsyslog.random.data work-presort rsyslog.pipe - rm -f rsyslog.input + rm -f rsyslog.input stat-file1 echo ------------------------------------------------------------------------------- ;; 'startup') # start rsyslogd with default params. $2 is the config file name to use diff --git a/tests/failover-async.sh b/tests/failover-async.sh new file mode 100755 index 00000000..f17bc0ff --- /dev/null +++ b/tests/failover-async.sh @@ -0,0 +1,12 @@ +# This file is part of the rsyslog project, released under GPLv3 +echo =============================================================================== +echo \[failover-async.sh\]: async test for failover functionality +source $srcdir/diag.sh init +source $srcdir/diag.sh startup failover-async.conf +source $srcdir/diag.sh injectmsg 0 5 +echo doing shutdown +source $srcdir/diag.sh shutdown-when-empty +echo wait on shutdown +source $srcdir/diag.sh wait-shutdown +source $srcdir/diag.sh seq-check 0 4999 +source $srcdir/diag.sh exit diff --git a/tests/failover-basic.sh b/tests/failover-basic.sh new file mode 100755 index 00000000..031ea25b --- /dev/null +++ b/tests/failover-basic.sh @@ -0,0 +1,12 @@ +# This file is part of the rsyslog project, released under GPLv3 +echo =============================================================================== +echo \[failover-basic.sh\]: basic test for failover functionality +source $srcdir/diag.sh init +source $srcdir/diag.sh startup failover-basic.conf +source $srcdir/diag.sh injectmsg 0 5000 +echo doing shutdown +source $srcdir/diag.sh shutdown-when-empty +echo wait on shutdown +source $srcdir/diag.sh wait-shutdown +source $srcdir/diag.sh seq-check 0 4999 +source $srcdir/diag.sh exit diff --git a/tests/failover-no-basic.sh b/tests/failover-no-basic.sh new file mode 100755 index 00000000..6177e10d --- /dev/null +++ b/tests/failover-no-basic.sh @@ -0,0 +1,19 @@ +# This file is part of the rsyslog project, released under GPLv3 +echo =============================================================================== +echo \[failover-no-basic.sh\]: basic test for failover functionality - no failover +source $srcdir/diag.sh init +source $srcdir/diag.sh startup failover-no-basic.conf +source $srcdir/diag.sh injectmsg 0 5000 +echo doing shutdown +source $srcdir/diag.sh shutdown-when-empty +echo wait on shutdown +source $srcdir/diag.sh wait-shutdown +# now we need our custom logic to see if the result file is empty +# (what it should be!) +cmp rsyslog.out.log /dev/null +if [ $? -eq 1 ] +then + echo "ERROR, output file not empty" + exit 1 +fi +source $srcdir/diag.sh exit diff --git a/tests/failover-no-rptd.sh b/tests/failover-no-rptd.sh new file mode 100755 index 00000000..6abeba44 --- /dev/null +++ b/tests/failover-no-rptd.sh @@ -0,0 +1,19 @@ +# This file is part of the rsyslog project, released under GPLv3 +echo =============================================================================== +echo \[failover-no-rptd.sh\]: rptd test for failover functionality - no failover +source $srcdir/diag.sh init +source $srcdir/diag.sh startup failover-no-rptd.conf +source $srcdir/diag.sh injectmsg 0 5000 +echo doing shutdown +source $srcdir/diag.sh shutdown-when-empty +echo wait on shutdown +source $srcdir/diag.sh wait-shutdown +# now we need our custom logic to see if the result file is empty +# (what it should be!) +cmp rsyslog.out.log /dev/null +if [ $? -eq 1 ] +then + echo "ERROR, output file not empty" + exit 1 +fi +source $srcdir/diag.sh exit diff --git a/tests/failover-rptd.sh b/tests/failover-rptd.sh new file mode 100755 index 00000000..8a313e9e --- /dev/null +++ b/tests/failover-rptd.sh @@ -0,0 +1,12 @@ +# This file is part of the rsyslog project, released under GPLv3 +echo =============================================================================== +echo \[failover-rptd.sh\]: rptd test for failover functionality +source $srcdir/diag.sh init +source $srcdir/diag.sh startup failover-rptd.conf +source $srcdir/diag.sh injectmsg 0 5000 +echo doing shutdown +source $srcdir/diag.sh shutdown-when-empty +echo wait on shutdown +source $srcdir/diag.sh wait-shutdown +source $srcdir/diag.sh seq-check 0 4999 +source $srcdir/diag.sh exit diff --git a/tests/testsuites/failover-async.conf b/tests/testsuites/failover-async.conf new file mode 100644 index 00000000..76445de3 --- /dev/null +++ b/tests/testsuites/failover-async.conf @@ -0,0 +1,9 @@ +# see the equally-named .sh file for details +$IncludeConfig diag-common.conf + +$template outfmt,"%msg:F,58:2%\n" +# note: the target server shall not be available! + +$ActionQueueType LinkedList +:msg, contains, "msgnum:" @@127.0.0.1:13514 +& ./rsyslog.out.log;outfmt diff --git a/tests/testsuites/failover-basic.conf b/tests/testsuites/failover-basic.conf new file mode 100644 index 00000000..a858769c --- /dev/null +++ b/tests/testsuites/failover-basic.conf @@ -0,0 +1,8 @@ +# see the equally-named .sh file for details +$IncludeConfig diag-common.conf + +$template outfmt,"%msg:F,58:2%\n" +# note: the target server shall not be available! +:msg, contains, "msgnum:" @@127.0.0.1:13514 +$ActionExecOnlyWhenPreviousIsSuspended on +& ./rsyslog.out.log;outfmt diff --git a/tests/testsuites/failover-no-basic.conf b/tests/testsuites/failover-no-basic.conf new file mode 100644 index 00000000..b40ef7d7 --- /dev/null +++ b/tests/testsuites/failover-no-basic.conf @@ -0,0 +1,9 @@ +# see the equally-named .sh file for details +$IncludeConfig diag-common.conf + +$RepeatedMsgReduction off + +# second action should never execute +:msg, contains, "msgnum:" /dev/null +$ActionExecOnlyWhenPreviousIsSuspended on +& ./rsyslog.out.log diff --git a/tests/testsuites/failover-no-rptd.conf b/tests/testsuites/failover-no-rptd.conf new file mode 100644 index 00000000..a46ce116 --- /dev/null +++ b/tests/testsuites/failover-no-rptd.conf @@ -0,0 +1,9 @@ +# see the equally-named .sh file for details +$IncludeConfig diag-common.conf + +$RepeatedMsgReduction on + +# second action should never execute +:msg, contains, "msgnum:" /dev/null +$ActionExecOnlyWhenPreviousIsSuspended on +& ./rsyslog.out.log diff --git a/tests/testsuites/failover-rptd.conf b/tests/testsuites/failover-rptd.conf new file mode 100644 index 00000000..d3553dbb --- /dev/null +++ b/tests/testsuites/failover-rptd.conf @@ -0,0 +1,10 @@ +# see the equally-named .sh file for details +$IncludeConfig diag-common.conf + +$RepeatedMsgReduction on + +$template outfmt,"%msg:F,58:2%\n" +# note: the target server shall not be available! +:msg, contains, "msgnum:" @@127.0.0.1:13514 +$ActionExecOnlyWhenPreviousIsSuspended on +& ./rsyslog.out.log;outfmt diff --git a/tools/syslogd.c b/tools/syslogd.c index dbbdbfec..c9734d14 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -799,7 +799,7 @@ DEFFUNC_llExecFunc(flushRptdMsgsActions) DBGPRINTF("flush %s: repeated %d times, %d sec.\n", module.GetStateName(pAction->pMod), pAction->f_prevcount, repeatinterval[pAction->f_repeatcount]); - actionWriteToAction(pAction); + actionWriteToAction(pAction, NULL, 0); BACKOFF(pAction); } UnlockObj(pAction); |