summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog16
-rw-r--r--action.c149
-rw-r--r--action.h6
-rw-r--r--queue.c7
-rw-r--r--queue.h11
-rw-r--r--syslogd.c30
6 files changed, 206 insertions, 13 deletions
diff --git a/ChangeLog b/ChangeLog
index 0d9cf9ec..8e3047d3 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,6 +1,22 @@
---------------------------------------------------------------------------
Version 3.10.4 (rgerhards), 2008-01-??
- implemented the $ActionResumeRetryCount config directive
+- added $ActionQueueFilename config directive
+- added $ActionQueueSize config directive
+- added $ActionQueueHighWaterMark config directive
+- added $ActionQueueLowWaterMark config directive
+- added $ActionQueueDiscardMark config directive
+- added $ActionQueueDiscardSeverity config directive
+- added $ActionQueueCheckpointInterval config directive
+- added $ActionQueueType config directive
+- added $ActionQueueWorkerThreads config directive
+- added $ActionQueueTimeoutshutdown config directive
+- added $ActionQueueTimeoutActionCompletion config directive
+- added $ActionQueueTimeoutenQueue config directive
+- added $ActionQueueTimeoutworkerThreadShutdown config directive
+- added $ActionQueueWorkerThreadMinimumMessages config directive
+- added $ActionQueueMaxFileSize config directive
+- added $ActionQueueSaveonShutdown config directive
---------------------------------------------------------------------------
Version 3.10.3 (rgerhards), 2008-01-28
- fixed a bug with standard template definitions (not a big deal) - thanks
diff --git a/action.c b/action.c
index 2154fba2..8a269e23 100644
--- a/action.c
+++ b/action.c
@@ -29,6 +29,8 @@
#include <assert.h>
#include <stdarg.h>
#include <stdlib.h>
+#include <string.h>
+#include <strings.h>
#include <time.h>
#include "syslogd.h"
@@ -36,13 +38,35 @@
#include "action.h"
#include "modules.h"
#include "sync.h"
+#include "cfsysline.h"
#include "srUtils.h"
+/* forward definitions */
+rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg);
+
/* object static data (once for all instances) */
static int glbliActionResumeInterval = 30;
int glbliActionResumeRetryCount = 0; /* how often should suspended actions be retried? */
+/* main message queue and its configuration parameters */
+static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
+static int iActionQueueSize = 10000; /* size of the main message queue above */
+static int iActionQHighWtrMark = 8000; /* high water mark for disk-assisted queues */
+static int iActionQLowWtrMark = 2000; /* low water mark for disk-assisted queues */
+static int iActionQDiscardMark = 9800; /* begin to discard messages */
+static int iActionQDiscardSeverity = 4; /* discard warning and above */
+static int iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
+static uchar *pszActionQFName = NULL; /* prefix for the main message queue file */
+static size_t iActionQueMaxFileSize = 1024*1024;
+static int iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
+static int iActionQtoQShutdown = 0; /* queue shutdown */
+static int iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
+static int iActionQtoEnq = 2000; /* timeout for queue enque */
+static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
+static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
+static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+
/* destructs an action descriptor object
* rgerhards, 2007-08-01
*/
@@ -91,6 +115,55 @@ finalize_it:
}
+/* action construction finalizer
+ */
+rsRetVal
+actionConstructFinalize(action_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+ /* create queue */
+ CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, 10, (rsRetVal (*)(void*,void*))actionCallDoAction));
+
+
+ /* ... set some properties ... */
+# define setQPROP(func, directive, data) \
+ CHKiRet_Hdlr(func(pThis->pQueue, data)) { \
+ logerrorInt("Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
+ }
+# define setQPROPstr(func, directive, data) \
+ CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \
+ logerrorInt("Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
+ }
+
+ queueSetpUsr(pThis->pQueue, pThis);
+ setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
+ setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
+ setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
+ setQPROP(queueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
+ setQPROP(queueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
+ setQPROP(queueSettoWrkShutdown, "$ActionQueueTimeoutWorkerThreadShutdown", iActionQtoWrkShutdown);
+ setQPROP(queueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq);
+ setQPROP(queueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark);
+ setQPROP(queueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark);
+ setQPROP(queueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark);
+ setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
+ setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
+ setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
+
+# undef setQPROP
+# undef setQPROPstr
+
+ CHKiRet(queueStart(pThis->pQueue));
+ dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue);
+
+finalize_it:
+ RETiRet;
+}
+
+
/* set an action back to active state -- rgerhards, 2007-08-02
*/
static rsRetVal actionResume(action_t *pThis)
@@ -194,11 +267,23 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
+/* schedule the message for processing
+ * rgerhards, 2008-01-28
+ */
+rsRetVal
+actionDoAction(action_t *pAction)
+{
+ DEFiRet;
+ iRet = queueEnqObj(pAction->pQueue, (void*) pAction->f_pMsg);
+ RETiRet;
+}
+
+
/* call the DoAction output plugin entry point
* rgerhards, 2008-01-28
*/
rsRetVal
-actionCallDoAction(action_t *pAction)
+actionCallDoAction(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
int iRetries;
@@ -209,7 +294,7 @@ actionCallDoAction(action_t *pAction)
/* here we must loop to process all requested strings */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
- CHKiRet(tplToString(pAction->ppTpl[i], pAction->f_pMsg, &pAction->ppMsgs[i]));
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &pAction->ppMsgs[i]));
}
iRetries = 0;
@@ -256,6 +341,66 @@ finalize_it:
RETiRet;
}
+/* set the action message queue mode
+ * TODO: probably move this into queue object, merge with MainMsgQueue!
+ * rgerhards, 2008-01-28
+ */
+static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszType)
+{
+ DEFiRet;
+
+ if (!strcasecmp((char *) pszType, "fixedarray")) {
+ ActionQueType = QUEUETYPE_FIXED_ARRAY;
+ dbgprintf("action queue type set to FIXED_ARRAY\n");
+ } else if (!strcasecmp((char *) pszType, "linkedlist")) {
+ ActionQueType = QUEUETYPE_LINKEDLIST;
+ dbgprintf("action queue type set to LINKEDLIST\n");
+ } else if (!strcasecmp((char *) pszType, "disk")) {
+ ActionQueType = QUEUETYPE_DISK;
+ dbgprintf("action queue type set to DISK\n");
+ } else if (!strcasecmp((char *) pszType, "direct")) {
+ ActionQueType = QUEUETYPE_DIRECT;
+ dbgprintf("action queue type set to DIRECT (no queueing at all)\n");
+ } else {
+ logerrorSz("unknown actionqueue parameter: %s", (char *) pszType);
+ iRet = RS_RET_INVALID_PARAMS;
+ }
+ free(pszType); /* no longer needed */
+
+ RETiRet;
+}
+
+
+
+/* add our cfsysline handlers
+ * rgerhards, 2008-01-28
+ */
+rsRetVal
+actionAddCfSysLineHdrl(void)
+{
+ DEFiRet;
+
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iActionQtoActShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iActionQtoEnq, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutworkerthreadshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoWrkShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL));
+
+finalize_it:
+ RETiRet;
+}
+
/*
* vi:set ai:
diff --git a/action.h b/action.h
index 0355b834..e725ae19 100644
--- a/action.h
+++ b/action.h
@@ -27,6 +27,7 @@
#include "syslogd-types.h"
#include "sync.h"
+#include "queue.h"
/* external data - this is to be removed when we change the action
* object interface (will happen some time..., at latest when the
@@ -60,6 +61,7 @@ struct action_s {
* content later). This is preserved after the message has been
* processed - it is also used to detect duplicates.
*/
+ queue_t *pQueue; /* action queue */
SYNC_OBJ_TOOL; /* required for mutex support */
};
typedef struct action_s action_t;
@@ -68,12 +70,14 @@ typedef struct action_s action_t;
/* function prototypes
*/
rsRetVal actionConstruct(action_t **ppThis);
+rsRetVal actionConstructFinalize(action_t *pThis);
rsRetVal actionDestruct(action_t *pThis);
+rsRetVal actionAddCfSysLineHdrl(void);
rsRetVal actionTryResume(action_t *pThis);
rsRetVal actionSuspend(action_t *pThis);
rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
-rsRetVal actionCallDoAction(action_t *pAction);
+rsRetVal actionDoAction(action_t *pAction);
#if 1
#define actionIsSuspended(pThis) ((pThis)->bSuspended == 1)
diff --git a/queue.c b/queue.c
index a89c7de9..7651957a 100644
--- a/queue.c
+++ b/queue.c
@@ -753,7 +753,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
assert(pThis != NULL);
/* calling the consumer is quite different here than it is from a worker thread */
- iRetLocal = pThis->pConsumer(pUsr);
+ iRetLocal = pThis->pConsumer(pThis->pUsr, pUsr);
if(iRetLocal != RS_RET_OK)
dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n",
queueGetID(pThis), iRetLocal);
@@ -1047,7 +1047,7 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
* to modify some parameters before the queue is actually started.
*/
rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*))
{
DEFiRet;
queue_t *pThis;
@@ -1240,7 +1240,7 @@ queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(pThis->pConsumer(pWti->pUsrp));
+ CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
finalize_it:
dbgprintf("Queue %p: regular consumer returns %d\n", pThis, iRet);
@@ -1877,6 +1877,7 @@ DEFpropSetMeth(queue, iDiscardSeverity, int);
DEFpropSetMeth(queue, bIsDA, int);
DEFpropSetMeth(queue, iMinMsgsPerWrkr, int);
DEFpropSetMeth(queue, bSaveOnShutdown, int);
+DEFpropSetMeth(queue, pUsr, void*);
/* This function can be used as a generic way to set properties. Only the subset
diff --git a/queue.h b/queue.h
index a05c6341..e48d9796 100644
--- a/queue.h
+++ b/queue.h
@@ -68,6 +68,7 @@ typedef struct queue_s {
int iMinMsgsPerWrkr;/* minimum nbr of msgs per worker thread, if more, a new worker is started until max wrkrs */
wtp_t *pWtpDA;
wtp_t *pWtpReg;
+ void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
int iHighWtrMrk; /* high water mark for disk-assisted memory queues */
@@ -79,7 +80,12 @@ typedef struct queue_s {
int toActShutdown; /* timeout for long-running action shutdown in ms */
int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
int toEnq; /* enqueue timeout */
- rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */
+ rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */
+ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
+ * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer
+ * to message)
+ * rgerhards, 2008-01-28
+ */
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
rsRetVal (*qDestruct)(struct queue_s *pThis);
@@ -148,7 +154,7 @@ rsRetVal queueStart(queue_t *pThis);
rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize);
rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*));
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*));
PROTOTYPEObjClassInit(queue);
PROTOTYPEpropSetMeth(queue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(queue, toQShutdown, long);
@@ -161,6 +167,7 @@ PROTOTYPEpropSetMeth(queue, iDiscardMrk, int);
PROTOTYPEpropSetMeth(queue, iDiscardSeverity, int);
PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int);
PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int);
+PROTOTYPEpropSetMeth(queue, pUsr, void*);
#define queueGetID(pThis) ((unsigned long) pThis)
#endif /* #ifndef QUEUE_H_INCLUDED */
diff --git a/syslogd.c b/syslogd.c
index d60abade..c9ba1c53 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -1848,7 +1848,7 @@ processMsg(msg_t *pMsg)
* Please note: the message object is destructed by the queue itself!
*/
static rsRetVal
-msgConsumer(void *pUsr)
+msgConsumer(void __attribute__((unused)) *notNeeded, void *pUsr)
{
DEFiRet;
msg_t *pMsg = (msg_t*) pUsr;
@@ -2394,7 +2394,7 @@ fprintlog(action_t *pAction)
/* When we reach this point, we have a valid, non-disabled action.
* So let's execute it. -- rgerhards, 2007-07-24
*/
- iRet = actionCallDoAction(pAction);
+ iRet = actionDoAction(pAction);
finalize_it:
if(pMsgSave != NULL) {
@@ -3993,6 +3993,10 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr
if(bSuspended)
actionSuspend(pAction);
+ CHKiRet(actionConstructFinalize(pAction));
+
+ /* TODO: if we exit here, we have a memory leak... */
+
*ppAction = pAction; /* finally store the action pointer */
finalize_it:
@@ -4558,11 +4562,11 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutworkerthreadshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoWrkShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeinterval", 0, eCmdHdlrInt, setActionResumeInterval, NULL, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"controlcharacterescapeprefix", 0, eCmdHdlrGetChar, NULL, &cCCEscapeChar, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscapeCCOnRcv, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"dropmsgswithmaliciousdnsptrrecords", 0, eCmdHdlrBinary, NULL, &bDropMalPTRMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"droptrailinglfonreception", 0, eCmdHdlrBinary, NULL, &bDropTrailingLF, NULL));
@@ -4579,6 +4583,11 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"moddir", 0, eCmdHdlrGetWord, NULL, &pModDir, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL));
+ /* now add other modules handlers (we should work on that to be able to do it in ClassInit(), but so far
+ * that is not possible). -- rgerhards, 2008-01-28
+ */
+ CHKiRet(actionAddCfSysLineHdrl());
+
finalize_it:
RETiRet;
}
@@ -4713,9 +4722,8 @@ finalize_it:
/* This is the main entry point into rsyslogd. Over time, we should try to
* modularize it a bit more...
*/
-int main(int argc, char **argv)
+int realMain(int argc, char **argv)
{
-dbgClassInit();
DEFiRet;
register int i;
@@ -5000,6 +5008,18 @@ finalize_it:
}
+/* This is the main entry point into rsyslogd. This must be a function in its own
+ * right in order to intialize the debug system in a portable way (otherwise we would
+ * need to have a statement before variable definitions.
+ * rgerhards, 20080-01-28
+ */
+int main(int argc, char **argv)
+{
+ dbgClassInit();
+ return realMain(argc, argv);
+}
+
+
/*
* vi:set ai:
*/