summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c200
-rw-r--r--action.h2
-rw-r--r--debug.h2
-rw-r--r--obj-types.h25
-rw-r--r--plugins/imklog/imklog.c6
-rw-r--r--plugins/imudp/imudp.c2
-rw-r--r--queue.c51
-rw-r--r--sync.c8
-rw-r--r--sync.h4
-rw-r--r--syslogd.c195
-rw-r--r--syslogd.h15
11 files changed, 270 insertions, 240 deletions
diff --git a/action.c b/action.c
index 8a269e23..a3cd9370 100644
--- a/action.c
+++ b/action.c
@@ -72,7 +72,7 @@ static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA ena
*/
rsRetVal actionDestruct(action_t *pThis)
{
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
if(pThis->pMod != NULL)
pThis->pMod->freeInstance(pThis->pModData);
@@ -99,7 +99,7 @@ rsRetVal actionConstruct(action_t **ppThis)
DEFiRet;
action_t *pThis;
- assert(ppThis != NULL);
+ ASSERT(ppThis != NULL);
if((pThis = (action_t*) calloc(1, sizeof(action_t))) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -122,10 +122,13 @@ actionConstructFinalize(action_t *pThis)
{
DEFiRet;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
/* create queue */
+RUNLOG_VAR("%d", ActionQueType);
CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, 10, (rsRetVal (*)(void*,void*))actionCallDoAction));
+RUNLOG_VAR("%p", pThis->pQueue);
+RUNLOG_VAR("%x", pThis->pQueue->iObjCooCKiE );
/* ... set some properties ... */
@@ -170,7 +173,7 @@ static rsRetVal actionResume(action_t *pThis)
{
DEFiRet;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
pThis->bSuspended = 0;
RETiRet;
@@ -192,7 +195,7 @@ rsRetVal actionSuspend(action_t *pThis)
{
DEFiRet;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
pThis->bSuspended = 1;
pThis->ttResumeRtry = time(NULL) + pThis->iResumeInterval;
pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */
@@ -209,7 +212,7 @@ rsRetVal actionTryResume(action_t *pThis)
DEFiRet;
time_t ttNow;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
ttNow = time(NULL); /* do the system call just once */
@@ -274,7 +277,9 @@ rsRetVal
actionDoAction(action_t *pAction)
{
DEFiRet;
- iRet = queueEnqObj(pAction->pQueue, (void*) pAction->f_pMsg);
+RUNLOG_VAR("%p", pAction->f_pMsg);
+ ISOBJ_TYPE_assert(pAction->f_pMsg, Msg);
+ iRet = queueEnqObj(pAction->pQueue, (void*) MsgAddRef(pAction->f_pMsg));
RETiRet;
}
@@ -290,7 +295,7 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg)
int i;
int iSleepPeriod;
- assert(pAction != NULL);
+ ASSERT(pAction != NULL);
/* here we must loop to process all requested strings */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
@@ -371,6 +376,185 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
}
+/* rgerhards 2004-11-09: fprintlog() is the actual driver for
+ * the output channel. It receives the channel description (f) as
+ * well as the message and outputs them according to the channel
+ * semantics. The message is typically already contained in the
+ * channel save buffer (f->f_prevline). This is not only the case
+ * when a message was already repeated but also when a new message
+ * arrived.
+ * rgerhards 2007-08-01: interface changed to use action_t
+ * rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE
+ * CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do
+ * not do this here. Failing to do so results in all kinds of
+ * "interesting" problems!
+ * RGERHARDS, 2008-01-29:
+ * This is now the action caller and has been renamed.
+ */
+rsRetVal
+actionWriteToAction(action_t *pAction)
+{
+ msg_t *pMsgSave; /* to save current message pointer, necessary to restore
+ it in case it needs to be updated (e.g. repeated msgs) */
+ DEFiRet;
+
+ pMsgSave = NULL; /* indicate message poiner not saved */
+ /* first check if this is a regular message or the repeation of
+ * a previous message. If so, we need to change the message text
+ * to "last message repeated n times" and then go ahead and write
+ * it. Please note that we can not modify the message object, because
+ * that would update it in other selectors as well. As such, we first
+ * need to create a local copy of the message, which we than can update.
+ * rgerhards, 2007-07-10
+ */
+ if(pAction->f_prevcount > 1) {
+ msg_t *pMsg;
+ uchar szRepMsg[64];
+ snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times",
+ pAction->f_prevcount);
+
+ if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) {
+ /* it failed - nothing we can do against it... */
+ dbgprintf("Message duplication failed, dropping repeat message.\n");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ /* We now need to update the other message properties.
+ * ... RAWMSG is a problem ... Please note that digital
+ * signatures inside the message are also invalidated.
+ */
+ getCurrTime(&(pMsg->tRcvdAt));
+ getCurrTime(&(pMsg->tTIMESTAMP));
+ MsgSetMSG(pMsg, (char*)szRepMsg);
+ MsgSetRawMsg(pMsg, (char*)szRepMsg);
+
+ pMsgSave = pAction->f_pMsg; /* save message pointer for later restoration */
+ pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */
+ }
+
+ dbgprintf("Called action, logging to %s", modGetStateName(pAction->pMod));
+
+ time(&pAction->f_time); /* we need this for message repeation processing */
+
+ /* When we reach this point, we have a valid, non-disabled action.
+ * So let's execute it. -- rgerhards, 2007-07-24
+ */
+ iRet = actionDoAction(pAction);
+
+finalize_it:
+ if(pMsgSave != NULL) {
+ /* we had saved the original message pointer. That was
+ * done because we needed to create a temporary one
+ * (most often for "message repeated n time" handling). If so,
+ * we need to restore the original one now, so that procesing
+ * can continue as normal. We also need to discard the temporary
+ * one, as we do not like memory leaks ;) Please note that the original
+ * message object will be discarded by our callers, so this is nothing
+ * of our business. rgerhards, 2007-07-10
+ */
+ MsgDestruct(&pAction->f_pMsg);
+ pAction->f_pMsg = pMsgSave; /* restore it */
+ }
+
+ RETiRet;
+}
+
+
+
+
+
+/* call the configured action. Does all necessary housekeeping.
+ * rgerhards, 2007-08-01
+ */
+rsRetVal
+actionCallAction(action_t *pAction, msg_t *pMsg)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ISOBJ_TYPE_assert(pMsg, Msg);
+ ASSERT(pAction != NULL);
+
+ /* Make sure nodbody else modifies/uses this action object. Right now, this
+ * is important because of "message repeated n times" processing and potentially
+ * multiple worker threads. -- rgerhards, 2007-12-11
+ */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ LockObj(pAction);
+ pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+
+ /* first, we need to check if this is a disabled
+ * entry. If so, we must not further process it.
+ * rgerhards 2005-09-26
+ * In the future, disabled modules may be re-probed from time
+ * to time. They are in a perfectly legal state, except that the
+ * doAction method indicated that it wanted to be disabled - but
+ * we do not consider this is a solution for eternity... So we
+ * should check from time to time if affairs have improved.
+ * rgerhards, 2007-07-24
+ */
+ if(pAction->bEnabled == 0) {
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ if(actionIsSuspended(pAction)) {
+ CHKiRet(actionTryResume(pAction));
+ }
+
+ /* don't output marks to recently written files */
+ if ((pMsg->msgFlags & MARK) && (time(NULL) - pAction->f_time) < MarkInterval / 2) {
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ /* suppress duplicate messages
+ */
+ if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL &&
+ (pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) &&
+ !strcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) &&
+ !strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) &&
+ !strcmp(getPROCID(pMsg), getPROCID(pAction->f_pMsg)) &&
+ !strcmp(getAPPNAME(pMsg), getAPPNAME(pAction->f_pMsg))) {
+ pAction->f_prevcount++;
+ dbgprintf("msg repeated %d times, %ld sec of %d.\n",
+ pAction->f_prevcount, time(NULL) - pAction->f_time,
+ repeatinterval[pAction->f_repeatcount]);
+ /* use current message, so we have the new timestamp (means we need to discard previous one) */
+ MsgDestruct(&pAction->f_pMsg);
+ pAction->f_pMsg = MsgAddRef(pMsg);
+ /* If domark would have logged this by now, flush it now (so we don't hold
+ * isolated messages), but back off so we'll flush less often in the future.
+ */
+ if(time(NULL) > REPEATTIME(pAction)) {
+ iRet = actionWriteToAction(pAction);
+ BACKOFF(pAction);
+ }
+ } else {
+ /* new message, save it */
+ /* first check if we have a previous message stored
+ * if so, emit and then discard it first
+ */
+ if(pAction->f_pMsg != NULL) {
+ if(pAction->f_prevcount > 0)
+ actionWriteToAction(pAction);
+ /* we do not care about iRet above - I think it's right but if we have
+ * some troubles, you know where to look at ;) -- rgerhards, 2007-08-01
+ */
+ MsgDestruct(&pAction->f_pMsg);
+ }
+ pAction->f_pMsg = MsgAddRef(pMsg);
+ /* call the output driver */
+ iRet = actionWriteToAction(pAction);
+ }
+
+finalize_it:
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ UnlockObj(pAction);
+ pthread_cleanup_pop(0); /* remove mutex cleanup handler */
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ RETiRet;
+}
+
/* add our cfsysline handlers
* rgerhards, 2008-01-28
diff --git a/action.h b/action.h
index e725ae19..4f1ba825 100644
--- a/action.h
+++ b/action.h
@@ -78,6 +78,8 @@ rsRetVal actionSuspend(action_t *pThis);
rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
rsRetVal actionDoAction(action_t *pAction);
+rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg);
+rsRetVal actionWriteToAction(action_t *pAction);
#if 1
#define actionIsSuspended(pThis) ((pThis)->bSuspended == 1)
diff --git a/debug.h b/debug.h
index 19e91350..9c28eca1 100644
--- a/debug.h
+++ b/debug.h
@@ -99,9 +99,11 @@ void dbgPrintAllDebugInfo(void);
#ifdef RTINST
# define BEGINfunc static dbgFuncDB_t dbgFuncDB=dbgFuncDB_t_INITIALIZER; int dbgCALLStaCK_POP_POINT = dbgEntrFunc(&dbgFuncDB,__LINE__);
# define ENDfunc dbgExitFunc(&dbgFuncDB, dbgCALLStaCK_POP_POINT);
+# define ASSERT(x) do { if(!(x)) dbgPrintAllDebugInfo(); assert(x); } while(0);
#else
# define BEGINfunc
# define ENDfunc
+# define ASSERT(x)
#endif
#ifdef RTINST
# define RUNLOG dbgSetExecLocation(dbgCALLStaCK_POP_POINT, __LINE__); dbgprintf("%s:%d: %s: log point\n", __FILE__, __LINE__, __func__)
diff --git a/obj-types.h b/obj-types.h
index 1e607d0c..01842a83 100644
--- a/obj-types.h
+++ b/obj-types.h
@@ -30,6 +30,11 @@
#include "stringbuf.h"
+/* base object data, present in all objects */
+typedef struct objData_s {
+ uchar *pName;
+} objData_t;
+
/* property types */
typedef enum { /* do NOT start at 0 to detect uninitialized types after calloc() */
PROPTYPE_PSZ = 1,
@@ -85,9 +90,11 @@ typedef struct objInfo_s {
rsRetVal (*objMethods[OBJ_NUM_METHODS])();
} objInfo_t;
+/* TODO: move obj_t at front of struct */
typedef struct obj { /* the dummy struct that each derived class can be casted to */
objInfo_t *pObjInfo;
#ifndef NDEBUG /* this means if debug... */
+ objData_t objData;
unsigned int iObjCooCKiE; /* must always be 0xBADEFEE for a valid object */
#endif
} obj_t;
@@ -98,25 +105,21 @@ typedef struct obj { /* the dummy struct that each derived class can be casted t
#ifndef NDEBUG /* this means if debug... */
# define BEGINobjInstance \
objInfo_t *pObjInfo; \
+ objData_t objData; \
unsigned int iObjCooCKiE; /* prevent name conflict, thus the strange name */
# define ISOBJ_assert(pObj) \
do { \
- if(pObj == NULL) dbgPrintAllDebugInfo(); \
- assert((pObj) != NULL); \
- if(((obj_t*)(pObj))->iObjCooCKiE != (unsigned) 0xBADEFEE) dbgPrintAllDebugInfo(); \
- assert((unsigned) ((obj_t*)(pObj))->iObjCooCKiE == (unsigned) 0xBADEFEE); \
+ ASSERT((pObj) != NULL); \
+ ASSERT((unsigned) ((obj_t*)(pObj))->iObjCooCKiE == (unsigned) 0xBADEFEE); \
} while(0);
# define ISOBJ_TYPE_assert(pObj, objType) \
do { \
- if(pObj == NULL) dbgPrintAllDebugInfo(); \
- assert(pObj != NULL); \
- if(((obj_t*)(pObj))->iObjCooCKiE != (unsigned) 0xBADEFEE) dbgPrintAllDebugInfo(); \
- assert((unsigned) pObj->iObjCooCKiE == (unsigned) 0xBADEFEE); \
- if(objGetObjID(pObj) != OBJ##objType) dbgPrintAllDebugInfo(); \
- assert(objGetObjID(pObj) == OBJ##objType); \
+ ASSERT(pObj != NULL); \
+ ASSERT((unsigned) pObj->iObjCooCKiE == (unsigned) 0xBADEFEE); \
+ ASSERT(objGetObjID(pObj) == OBJ##objType); \
} while(0);
#else /* non-debug mode, no checks but much faster */
-# define BEGINobjInstance objInfo_t *pObjInfo;
+# define BEGINobjInstance objInfo_t *pObjInfo; objData_t objData;
# define ISOBJ_TYPE_assert(pObj, objType)
# define ISOBJ_assert(pObj)
#endif
diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c
index 427d1b5e..d4007899 100644
--- a/plugins/imklog/imklog.c
+++ b/plugins/imklog/imklog.c
@@ -145,7 +145,7 @@ static rsRetVal writeSyslogV(int iPRI, const char *szFmt, va_list va)
logmsg(iPRI, pMsg, INTERNAL_MSG);
finalize_it:
- return iRet;
+ RETiRet;
}
/* And now the same with variable arguments */
@@ -211,7 +211,7 @@ rsRetVal Syslog(int priority, char *fmt, ...)
va_end(ap);
}
- return iRet;
+ RETiRet;
}
@@ -623,7 +623,7 @@ CODESTARTrunInput
break;
}
}
- return iRet;
+ RETiRet;
ENDrunInput
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index baa193d5..d68e3772 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -117,7 +117,7 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal)
finalize_it:
free(pNewVal); /* in any case, this is no longer needed */
- return iRet;
+ RETiRet;
}
diff --git a/queue.c b/queue.c
index 7651957a..39c9639e 100644
--- a/queue.c
+++ b/queue.c
@@ -115,7 +115,7 @@ queueTurnOffDAMode(queue_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, queue);
- assert(pThis->bRunsDA);
+ ASSERT(pThis->bRunsDA);
/* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
* to wait for its startup... -- rgerhards, 2008-01-25
@@ -386,7 +386,7 @@ static rsRetVal qConstructFixedArray(queue_t *pThis)
{
DEFiRet;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
if(pThis->iMaxQueueSize == 0)
ABORT_FINALIZE(RS_RET_QSIZE_ZERO);
@@ -409,7 +409,7 @@ static rsRetVal qDestructFixedArray(queue_t *pThis)
{
DEFiRet;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
if(pThis->tVars.farray.pBuf != NULL)
free(pThis->tVars.farray.pBuf);
@@ -421,7 +421,7 @@ static rsRetVal qAddFixedArray(queue_t *pThis, void* in)
{
DEFiRet;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
pThis->tVars.farray.pBuf[pThis->tVars.farray.tail] = in;
pThis->tVars.farray.tail++;
if (pThis->tVars.farray.tail == pThis->iMaxQueueSize)
@@ -434,7 +434,7 @@ static rsRetVal qDelFixedArray(queue_t *pThis, void **out)
{
DEFiRet;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
*out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.head];
pThis->tVars.farray.head++;
@@ -450,7 +450,7 @@ static rsRetVal qConstructLinkedList(queue_t *pThis)
{
DEFiRet;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
pThis->tVars.linklist.pRoot = 0;
pThis->tVars.linklist.pLast = 0;
@@ -479,7 +479,7 @@ static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr)
DEFiRet;
qLinkedList_t *pEntry;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
@@ -503,8 +503,8 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
DEFiRet;
qLinkedList_t *pEntry;
- assert(pThis != NULL);
- assert(pThis->tVars.linklist.pRoot != NULL);
+ ASSERT(pThis != NULL);
+ ASSERT(pThis->tVars.linklist.pRoot != NULL);
pEntry = pThis->tVars.linklist.pRoot;
*ppUsr = pEntry->pUsr;
@@ -654,7 +654,7 @@ static rsRetVal qConstructDisk(queue_t *pThis)
DEFiRet;
int bRestarted = 0;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
/* and now check if there is some persistent information that needs to be read in */
iRet = queueTryLoadPersistedInfo(pThis);
@@ -704,7 +704,7 @@ static rsRetVal qDestructDisk(queue_t *pThis)
{
DEFiRet;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
strmDestruct(&pThis->tVars.disk.pWrite);
strmDestruct(&pThis->tVars.disk.pRead);
@@ -719,7 +719,7 @@ static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
{
DEFiRet;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite));
CHKiRet(strmFlush(pThis->tVars.disk.pWrite));
@@ -750,7 +750,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
DEFiRet;
rsRetVal iRetLocal;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
/* calling the consumer is quite different here than it is from a worker thread */
iRetLocal = pThis->pConsumer(pThis->pUsr, pUsr);
@@ -781,7 +781,7 @@ queueAdd(queue_t *pThis, void *pUsr)
{
DEFiRet;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
CHKiRet(pThis->qAdd(pThis, pUsr));
++pThis->iQueueSize;
@@ -799,7 +799,7 @@ queueDel(queue_t *pThis, void *pUsr)
{
DEFiRet;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
/* we do NOT abort if we encounter an error, because otherwise the queue
* will not be decremented, what will most probably result in an endless loop.
@@ -835,7 +835,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
rsRetVal iRetLocal;
ISOBJ_TYPE_assert(pThis, queue);
- assert(pThis->pqParent == NULL); /* detect invalid calling sequence */
+ ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis));
@@ -1052,9 +1052,9 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
DEFiRet;
queue_t *pThis;
- assert(ppThis != NULL);
- assert(pConsumer != NULL);
- assert(iWorkerThreads >= 0);
+ ASSERT(ppThis != NULL);
+ ASSERT(pConsumer != NULL);
+ ASSERT(iWorkerThreads >= 0);
if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -1106,7 +1106,9 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
}
finalize_it:
+RUNLOG_VAR("%x", pThis->iObjCooCKiE );
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
+RUNLOG_VAR("%x", pThis->iObjCooCKiE );
RETiRet;
}
@@ -1159,6 +1161,7 @@ static int queueChkDiscardMsg(queue_t *pThis, int iQueueSize, int bRunsDA, void
int iSeverity;
ISOBJ_TYPE_assert(pThis, queue);
+RUNLOG_VAR("%p", pUsr);
ISOBJ_assert(pUsr);
if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) {
@@ -1363,7 +1366,7 @@ queueRegOnWrkrShutdown(queue_t *pThis)
if(pThis->pqParent != NULL) {
RUNLOG_VAR("%p", pThis->pqParent);
RUNLOG_VAR("%p", pThis->pqParent->pWtpDA);
- assert(pThis->pqParent->pWtpDA != NULL);
+ ASSERT(pThis->pqParent->pWtpDA != NULL);
pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
#if 0
@@ -1413,7 +1416,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
uchar pszBuf[64];
size_t lenBuf;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
/* finalize some initializations that could not yet be done because it is
* influenced by properties which might have been set after queueConstruct ()
@@ -1519,7 +1522,7 @@ static rsRetVal queuePersist(queue_t *pThis)
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
- assert(pThis != NULL);
+ ASSERT(pThis != NULL);
if(pThis->qType != QUEUETYPE_DISK) {
if(pThis->iQueueSize > 0) {
@@ -1614,7 +1617,7 @@ rsRetVal queueDestruct(queue_t **ppThis)
DEFiRet;
queue_t *pThis;
- assert(ppThis != NULL);
+ ASSERT(ppThis != NULL);
pThis = *ppThis;
ISOBJ_TYPE_assert(pThis, queue);
@@ -1891,7 +1894,7 @@ static rsRetVal queueSetProperty(queue_t *pThis, property_t *pProp)
DEFiRet;
ISOBJ_TYPE_assert(pThis, queue);
- assert(pProp != NULL);
+ ASSERT(pProp != NULL);
if(isProp("iQueueSize")) {
pThis->iQueueSize = pProp->val.vInt;
diff --git a/sync.c b/sync.c
index af2119a3..913469a0 100644
--- a/sync.c
+++ b/sync.c
@@ -60,7 +60,9 @@ SyncObjExit(pthread_mutex_t **mut)
void
lockObj(pthread_mutex_t *mut)
{
- pthread_mutex_lock(mut);
+ BEGINfunc
+ d_pthread_mutex_lock(mut);
+ ENDfunc
}
/* unlock an object. The synchronization tool (mutex) must be passed in.
@@ -68,6 +70,8 @@ lockObj(pthread_mutex_t *mut)
void
unlockObj(pthread_mutex_t *mut)
{
- pthread_mutex_unlock(mut);
+ BEGINfunc
+ d_pthread_mutex_unlock(mut);
+ ENDfunc
}
#endif /* #ifndef NDEBUG */
diff --git a/sync.h b/sync.h
index 384c0d9c..204737dc 100644
--- a/sync.h
+++ b/sync.h
@@ -39,8 +39,8 @@
* are better to trace in the stackframe.
*/
#ifdef NDEBUG
-#define LockObj(x) pthread_mutex_lock((x)->Sync_mut)
-#define UnlockObj(x) pthread_mutex_unlock((x)->Sync_mut)
+#define LockObj(x) d_pthread_mutex_lock((x)->Sync_mut)
+#define UnlockObj(x) d_pthread_mutex_unlock((x)->Sync_mut)
#else
#define LockObj(x) lockObj((x)->Sync_mut)
#define UnlockObj(x) unlockObj((x)->Sync_mut)
diff --git a/syslogd.c b/syslogd.c
index c9ba1c53..f5d9adb7 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -295,17 +295,12 @@ static int bFinished = 0; /* used by termination signal handler, read-only excep
* termination.
*/
-/*
- * Intervals at which we flush out "message repeated" messages,
+/* Intervals at which we flush out "message repeated" messages,
* in seconds after previous message is logged. After each flush,
* we move to the next interval until we reach the largest.
+ * TODO: this shall go into action object! -- rgerhards, 2008-01-29
*/
-int repeatinterval[] = { 30, 60 }; /* # of secs before flush */
-#define MAXREPEAT ((int)((sizeof(repeatinterval) / sizeof(repeatinterval[0])) - 1))
-#define REPEATTIME(f) ((f)->f_time + repeatinterval[(f)->f_repeatcount])
-#define BACKOFF(f) { if (++(f)->f_repeatcount > MAXREPEAT) \
- (f)->f_repeatcount = MAXREPEAT; \
- }
+int repeatinterval[2] = { 30, 60 }; /* # of secs before flush */
#define LIST_DELIMITER ':' /* delimiter between two hosts */
@@ -386,7 +381,7 @@ uchar *pszWorkDir = NULL;/* name of rsyslog's spool directory (without trailing
static unsigned int Forwarding = 0;
char LocalHostName[MAXHOSTNAMELEN+1];/* our hostname - read-only after startup */
char *LocalDomain; /* our local domain name - read-only after startup */
-static int MarkInterval = 20 * 60; /* interval between marks in seconds - read-only after startup */
+int MarkInterval = 20 * 60; /* interval between marks in seconds - read-only after startup */
int family = PF_UNSPEC; /* protocol family (IPv4, IPv6 or both), set via cmdline */
int send_to_all = 0; /* send message to all IPv4/IPv6 addresses */
static int NoFork = 0; /* don't fork - don't run in daemon mode - read-only after startup */
@@ -558,7 +553,6 @@ static uchar template_StdPgSQLFmt[] = "\"insert into SystemEvents (Message, Faci
/* up to the next comment, prototypes that should be removed by reordering */
/* Function prototypes. */
static char **crunch_list(char *list);
-static rsRetVal fprintlog(action_t *pAction);
static void reapchild();
static void debug_switch();
static rsRetVal cfline(uchar *line, selector_t **pfCurr);
@@ -1677,99 +1671,6 @@ static void callActionMutClean(void *arg)
}
-/* call the configured action. Does all necessary housekeeping.
- * rgerhards, 2007-08-01
- */
-static rsRetVal callAction(msg_t *pMsg, action_t *pAction)
-{
- DEFiRet;
- int iCancelStateSave;
-
- assert(pMsg != NULL);
- assert(pAction != NULL);
-
- /* Make sure nodbody else modifies/uses this action object. Right now, this
- * is important because of "message repeated n times" processing, later it will
- * become important when we (possibly) have multiple worker threads.
- * rgerhards, 2007-12-11
- */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- LockObj(pAction);
- pthread_cleanup_push(callActionMutClean, pAction->Sync_mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
-
- /* first, we need to check if this is a disabled
- * entry. If so, we must not further process it.
- * rgerhards 2005-09-26
- * In the future, disabled modules may be re-probed from time
- * to time. They are in a perfectly legal state, except that the
- * doAction method indicated that it wanted to be disabled - but
- * we do not consider this is a solution for eternity... So we
- * should check from time to time if affairs have improved.
- * rgerhards, 2007-07-24
- */
- if(pAction->bEnabled == 0) {
- ABORT_FINALIZE(RS_RET_OK);
- }
-
- if(actionIsSuspended(pAction)) {
- CHKiRet(actionTryResume(pAction));
- }
-
- /* don't output marks to recently written files */
- if ((pMsg->msgFlags & MARK) && (time(NULL) - pAction->f_time) < MarkInterval / 2) {
- ABORT_FINALIZE(RS_RET_OK);
- }
-
- /* suppress duplicate messages
- */
- if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL &&
- (pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) &&
- !strcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) &&
- !strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) &&
- !strcmp(getPROCID(pMsg), getPROCID(pAction->f_pMsg)) &&
- !strcmp(getAPPNAME(pMsg), getAPPNAME(pAction->f_pMsg))) {
- pAction->f_prevcount++;
- dbgprintf("msg repeated %d times, %ld sec of %d.\n",
- pAction->f_prevcount, time(NULL) - pAction->f_time,
- repeatinterval[pAction->f_repeatcount]);
- /* use current message, so we have the new timestamp (means we need to discard previous one) */
- MsgDestruct(&pAction->f_pMsg);
- pAction->f_pMsg = MsgAddRef(pMsg);
- /* If domark would have logged this by now, flush it now (so we don't hold
- * isolated messages), but back off so we'll flush less often in the future.
- */
- if(time(NULL) > REPEATTIME(pAction)) {
- iRet = fprintlog(pAction);
- BACKOFF(pAction);
- }
- } else {
- /* new message, save it */
- /* first check if we have a previous message stored
- * if so, emit and then discard it first
- */
- if(pAction->f_pMsg != NULL) {
- if(pAction->f_prevcount > 0)
- fprintlog(pAction);
- /* we do not care about iRet above - I think it's right but if we have
- * some troubles, you know where to look at ;) -- rgerhards, 2007-08-01
- */
- MsgDestruct(&pAction->f_pMsg);
- }
- pAction->f_pMsg = MsgAddRef(pMsg);
- /* call the output driver */
- iRet = fprintlog(pAction);
- }
-
-finalize_it:
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- UnlockObj(pAction);
- pthread_cleanup_pop(0); /* remove mutex cleanup handler */
- pthread_setcancelstate(iCancelStateSave, NULL);
- RETiRet;
-}
-
-
/* helper to processMsg(), used to call the configured actions. It is
* executed from within llExecFunc() of the action list.
* rgerhards, 2007-08-02
@@ -1792,7 +1693,7 @@ DEFFUNC_llExecFunc(processMsgDoActions)
ABORT_FINALIZE(RS_RET_OK);
}
- iRetMod = callAction(pDoActData->pMsg, pAction);
+ iRetMod = actionCallAction(pAction, pDoActData->pMsg);
if(iRetMod == RS_RET_DISCARDMSG) {
ABORT_FINALIZE(RS_RET_DISCARDMSG);
} else if(iRetMod == RS_RET_SUSPENDED) {
@@ -2333,88 +2234,6 @@ logmsg(int pri, msg_t *pMsg, int flags)
}
-/* rgerhards 2004-11-09: fprintlog() is the actual driver for
- * the output channel. It receives the channel description (f) as
- * well as the message and outputs them according to the channel
- * semantics. The message is typically already contained in the
- * channel save buffer (f->f_prevline). This is not only the case
- * when a message was already repeated but also when a new message
- * arrived.
- * rgerhards 2007-08-01: interface changed to use action_t
- * rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE
- * CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do
- * not do this here. Failing to do so results in all kinds of
- * "interesting" problems!
- */
-rsRetVal
-fprintlog(action_t *pAction)
-{
- msg_t *pMsgSave; /* to save current message pointer, necessary to restore
- it in case it needs to be updated (e.g. repeated msgs) */
- DEFiRet;
-
- pMsgSave = NULL; /* indicate message poiner not saved */
- /* first check if this is a regular message or the repeation of
- * a previous message. If so, we need to change the message text
- * to "last message repeated n times" and then go ahead and write
- * it. Please note that we can not modify the message object, because
- * that would update it in other selectors as well. As such, we first
- * need to create a local copy of the message, which we than can update.
- * rgerhards, 2007-07-10
- */
- if(pAction->f_prevcount > 1) {
- msg_t *pMsg;
- uchar szRepMsg[64];
- snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times",
- pAction->f_prevcount);
-
- if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) {
- /* it failed - nothing we can do against it... */
- dbgprintf("Message duplication failed, dropping repeat message.\n");
- ABORT_FINALIZE(RS_RET_ERR);
- }
-
- /* We now need to update the other message properties.
- * ... RAWMSG is a problem ... Please note that digital
- * signatures inside the message are also invalidated.
- */
- getCurrTime(&(pMsg->tRcvdAt));
- getCurrTime(&(pMsg->tTIMESTAMP));
- MsgSetMSG(pMsg, (char*)szRepMsg);
- MsgSetRawMsg(pMsg, (char*)szRepMsg);
-
- pMsgSave = pAction->f_pMsg; /* save message pointer for later restoration */
- pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */
- }
-
- dbgprintf("Called fprintlog, logging to %s", modGetStateName(pAction->pMod));
-
- time(&pAction->f_time); /* we need this for message repeation processing */
-
- /* When we reach this point, we have a valid, non-disabled action.
- * So let's execute it. -- rgerhards, 2007-07-24
- */
- iRet = actionDoAction(pAction);
-
-finalize_it:
- if(pMsgSave != NULL) {
- /* we had saved the original message pointer. That was
- * done because we needed to create a temporary one
- * (most often for "message repeated n time" handling). If so,
- * we need to restore the original one now, so that procesing
- * can continue as normal. We also need to discard the temporary
- * one, as we do not like memory leaks ;) Please note that the original
- * message object will be discarded by our callers, so this is nothing
- * of our business. rgerhards, 2007-07-10
- */
- MsgDestruct(&pAction->f_pMsg);
- pAction->f_pMsg = pMsgSave; /* restore it */
- }
-
- RETiRet;
-}
-
-
static void
reapchild()
{
@@ -2445,7 +2264,7 @@ DEFFUNC_llExecFunc(flushRptdMsgsActions)
dbgprintf("flush %s: repeated %d times, %d sec.\n",
modGetStateName(pAction->pMod), pAction->f_prevcount,
repeatinterval[pAction->f_repeatcount]);
- fprintlog(pAction);
+ actionWriteToAction(pAction);
BACKOFF(pAction);
}
UnlockObj(pAction);
@@ -3006,7 +2825,7 @@ DEFFUNC_llExecFunc(freeSelectorsActions)
/* flush any pending output */
if(pAction->f_prevcount) {
- fprintlog(pAction);
+ actionWriteToAction(pAction);
}
return RS_RET_OK; /* never fails ;) */
diff --git a/syslogd.h b/syslogd.h
index 55a5a648..17b45161 100644
--- a/syslogd.h
+++ b/syslogd.h
@@ -81,6 +81,19 @@ extern int DisableDNS;
extern char **StripDomains;
extern char *LocalDomain;
extern int bDropMalPTRMsgs;
-extern char ctty[];
+extern char ctty[];
+extern int MarkInterval;
+
+/* Intervals at which we flush out "message repeated" messages,
+ * in seconds after previous message is logged. After each flush,
+ * we move to the next interval until we reach the largest.
+ * TODO: move this to action object!
+ */
+extern int repeatinterval[2];
+#define MAXREPEAT ((int)((sizeof(repeatinterval) / sizeof(repeatinterval[0])) - 1))
+#define REPEATTIME(f) ((f)->f_time + repeatinterval[(f)->f_repeatcount])
+#define BACKOFF(f) { if (++(f)->f_repeatcount > MAXREPEAT) \
+ (f)->f_repeatcount = MAXREPEAT; \
+ }
#endif /* #ifndef SYSLOGD_H_INCLUDED */