diff options
-rw-r--r-- | action.c | 200 | ||||
-rw-r--r-- | action.h | 2 | ||||
-rw-r--r-- | debug.h | 2 | ||||
-rw-r--r-- | obj-types.h | 25 | ||||
-rw-r--r-- | plugins/imklog/imklog.c | 6 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 2 | ||||
-rw-r--r-- | queue.c | 51 | ||||
-rw-r--r-- | sync.c | 8 | ||||
-rw-r--r-- | sync.h | 4 | ||||
-rw-r--r-- | syslogd.c | 195 | ||||
-rw-r--r-- | syslogd.h | 15 |
11 files changed, 270 insertions, 240 deletions
@@ -72,7 +72,7 @@ static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA ena */ rsRetVal actionDestruct(action_t *pThis) { - assert(pThis != NULL); + ASSERT(pThis != NULL); if(pThis->pMod != NULL) pThis->pMod->freeInstance(pThis->pModData); @@ -99,7 +99,7 @@ rsRetVal actionConstruct(action_t **ppThis) DEFiRet; action_t *pThis; - assert(ppThis != NULL); + ASSERT(ppThis != NULL); if((pThis = (action_t*) calloc(1, sizeof(action_t))) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -122,10 +122,13 @@ actionConstructFinalize(action_t *pThis) { DEFiRet; - assert(pThis != NULL); + ASSERT(pThis != NULL); /* create queue */ +RUNLOG_VAR("%d", ActionQueType); CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, 10, (rsRetVal (*)(void*,void*))actionCallDoAction)); +RUNLOG_VAR("%p", pThis->pQueue); +RUNLOG_VAR("%x", pThis->pQueue->iObjCooCKiE ); /* ... set some properties ... */ @@ -170,7 +173,7 @@ static rsRetVal actionResume(action_t *pThis) { DEFiRet; - assert(pThis != NULL); + ASSERT(pThis != NULL); pThis->bSuspended = 0; RETiRet; @@ -192,7 +195,7 @@ rsRetVal actionSuspend(action_t *pThis) { DEFiRet; - assert(pThis != NULL); + ASSERT(pThis != NULL); pThis->bSuspended = 1; pThis->ttResumeRtry = time(NULL) + pThis->iResumeInterval; pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */ @@ -209,7 +212,7 @@ rsRetVal actionTryResume(action_t *pThis) DEFiRet; time_t ttNow; - assert(pThis != NULL); + ASSERT(pThis != NULL); ttNow = time(NULL); /* do the system call just once */ @@ -274,7 +277,9 @@ rsRetVal actionDoAction(action_t *pAction) { DEFiRet; - iRet = queueEnqObj(pAction->pQueue, (void*) pAction->f_pMsg); +RUNLOG_VAR("%p", pAction->f_pMsg); + ISOBJ_TYPE_assert(pAction->f_pMsg, Msg); + iRet = queueEnqObj(pAction->pQueue, (void*) MsgAddRef(pAction->f_pMsg)); RETiRet; } @@ -290,7 +295,7 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) int i; int iSleepPeriod; - assert(pAction != NULL); + ASSERT(pAction != NULL); /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { @@ -371,6 +376,185 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT } +/* rgerhards 2004-11-09: fprintlog() is the actual driver for + * the output channel. It receives the channel description (f) as + * well as the message and outputs them according to the channel + * semantics. The message is typically already contained in the + * channel save buffer (f->f_prevline). This is not only the case + * when a message was already repeated but also when a new message + * arrived. + * rgerhards 2007-08-01: interface changed to use action_t + * rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE + * CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do + * not do this here. Failing to do so results in all kinds of + * "interesting" problems! + * RGERHARDS, 2008-01-29: + * This is now the action caller and has been renamed. + */ +rsRetVal +actionWriteToAction(action_t *pAction) +{ + msg_t *pMsgSave; /* to save current message pointer, necessary to restore + it in case it needs to be updated (e.g. repeated msgs) */ + DEFiRet; + + pMsgSave = NULL; /* indicate message poiner not saved */ + /* first check if this is a regular message or the repeation of + * a previous message. If so, we need to change the message text + * to "last message repeated n times" and then go ahead and write + * it. Please note that we can not modify the message object, because + * that would update it in other selectors as well. As such, we first + * need to create a local copy of the message, which we than can update. + * rgerhards, 2007-07-10 + */ + if(pAction->f_prevcount > 1) { + msg_t *pMsg; + uchar szRepMsg[64]; + snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times", + pAction->f_prevcount); + + if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) { + /* it failed - nothing we can do against it... */ + dbgprintf("Message duplication failed, dropping repeat message.\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + + /* We now need to update the other message properties. + * ... RAWMSG is a problem ... Please note that digital + * signatures inside the message are also invalidated. + */ + getCurrTime(&(pMsg->tRcvdAt)); + getCurrTime(&(pMsg->tTIMESTAMP)); + MsgSetMSG(pMsg, (char*)szRepMsg); + MsgSetRawMsg(pMsg, (char*)szRepMsg); + + pMsgSave = pAction->f_pMsg; /* save message pointer for later restoration */ + pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */ + } + + dbgprintf("Called action, logging to %s", modGetStateName(pAction->pMod)); + + time(&pAction->f_time); /* we need this for message repeation processing */ + + /* When we reach this point, we have a valid, non-disabled action. + * So let's execute it. -- rgerhards, 2007-07-24 + */ + iRet = actionDoAction(pAction); + +finalize_it: + if(pMsgSave != NULL) { + /* we had saved the original message pointer. That was + * done because we needed to create a temporary one + * (most often for "message repeated n time" handling). If so, + * we need to restore the original one now, so that procesing + * can continue as normal. We also need to discard the temporary + * one, as we do not like memory leaks ;) Please note that the original + * message object will be discarded by our callers, so this is nothing + * of our business. rgerhards, 2007-07-10 + */ + MsgDestruct(&pAction->f_pMsg); + pAction->f_pMsg = pMsgSave; /* restore it */ + } + + RETiRet; +} + + + + + +/* call the configured action. Does all necessary housekeeping. + * rgerhards, 2007-08-01 + */ +rsRetVal +actionCallAction(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + int iCancelStateSave; + + ISOBJ_TYPE_assert(pMsg, Msg); + ASSERT(pAction != NULL); + + /* Make sure nodbody else modifies/uses this action object. Right now, this + * is important because of "message repeated n times" processing and potentially + * multiple worker threads. -- rgerhards, 2007-12-11 + */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + LockObj(pAction); + pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); + pthread_setcancelstate(iCancelStateSave, NULL); + + /* first, we need to check if this is a disabled + * entry. If so, we must not further process it. + * rgerhards 2005-09-26 + * In the future, disabled modules may be re-probed from time + * to time. They are in a perfectly legal state, except that the + * doAction method indicated that it wanted to be disabled - but + * we do not consider this is a solution for eternity... So we + * should check from time to time if affairs have improved. + * rgerhards, 2007-07-24 + */ + if(pAction->bEnabled == 0) { + ABORT_FINALIZE(RS_RET_OK); + } + + if(actionIsSuspended(pAction)) { + CHKiRet(actionTryResume(pAction)); + } + + /* don't output marks to recently written files */ + if ((pMsg->msgFlags & MARK) && (time(NULL) - pAction->f_time) < MarkInterval / 2) { + ABORT_FINALIZE(RS_RET_OK); + } + + /* suppress duplicate messages + */ + if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL && + (pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) && + !strcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) && + !strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) && + !strcmp(getPROCID(pMsg), getPROCID(pAction->f_pMsg)) && + !strcmp(getAPPNAME(pMsg), getAPPNAME(pAction->f_pMsg))) { + pAction->f_prevcount++; + dbgprintf("msg repeated %d times, %ld sec of %d.\n", + pAction->f_prevcount, time(NULL) - pAction->f_time, + repeatinterval[pAction->f_repeatcount]); + /* use current message, so we have the new timestamp (means we need to discard previous one) */ + MsgDestruct(&pAction->f_pMsg); + pAction->f_pMsg = MsgAddRef(pMsg); + /* If domark would have logged this by now, flush it now (so we don't hold + * isolated messages), but back off so we'll flush less often in the future. + */ + if(time(NULL) > REPEATTIME(pAction)) { + iRet = actionWriteToAction(pAction); + BACKOFF(pAction); + } + } else { + /* new message, save it */ + /* first check if we have a previous message stored + * if so, emit and then discard it first + */ + if(pAction->f_pMsg != NULL) { + if(pAction->f_prevcount > 0) + actionWriteToAction(pAction); + /* we do not care about iRet above - I think it's right but if we have + * some troubles, you know where to look at ;) -- rgerhards, 2007-08-01 + */ + MsgDestruct(&pAction->f_pMsg); + } + pAction->f_pMsg = MsgAddRef(pMsg); + /* call the output driver */ + iRet = actionWriteToAction(pAction); + } + +finalize_it: + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + UnlockObj(pAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ + pthread_setcancelstate(iCancelStateSave, NULL); + RETiRet; +} + /* add our cfsysline handlers * rgerhards, 2008-01-28 @@ -78,6 +78,8 @@ rsRetVal actionSuspend(action_t *pThis); rsRetVal actionDbgPrint(action_t *pThis); rsRetVal actionSetGlobalResumeInterval(int iNewVal); rsRetVal actionDoAction(action_t *pAction); +rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg); +rsRetVal actionWriteToAction(action_t *pAction); #if 1 #define actionIsSuspended(pThis) ((pThis)->bSuspended == 1) @@ -99,9 +99,11 @@ void dbgPrintAllDebugInfo(void); #ifdef RTINST # define BEGINfunc static dbgFuncDB_t dbgFuncDB=dbgFuncDB_t_INITIALIZER; int dbgCALLStaCK_POP_POINT = dbgEntrFunc(&dbgFuncDB,__LINE__); # define ENDfunc dbgExitFunc(&dbgFuncDB, dbgCALLStaCK_POP_POINT); +# define ASSERT(x) do { if(!(x)) dbgPrintAllDebugInfo(); assert(x); } while(0); #else # define BEGINfunc # define ENDfunc +# define ASSERT(x) #endif #ifdef RTINST # define RUNLOG dbgSetExecLocation(dbgCALLStaCK_POP_POINT, __LINE__); dbgprintf("%s:%d: %s: log point\n", __FILE__, __LINE__, __func__) diff --git a/obj-types.h b/obj-types.h index 1e607d0c..01842a83 100644 --- a/obj-types.h +++ b/obj-types.h @@ -30,6 +30,11 @@ #include "stringbuf.h" +/* base object data, present in all objects */ +typedef struct objData_s { + uchar *pName; +} objData_t; + /* property types */ typedef enum { /* do NOT start at 0 to detect uninitialized types after calloc() */ PROPTYPE_PSZ = 1, @@ -85,9 +90,11 @@ typedef struct objInfo_s { rsRetVal (*objMethods[OBJ_NUM_METHODS])(); } objInfo_t; +/* TODO: move obj_t at front of struct */ typedef struct obj { /* the dummy struct that each derived class can be casted to */ objInfo_t *pObjInfo; #ifndef NDEBUG /* this means if debug... */ + objData_t objData; unsigned int iObjCooCKiE; /* must always be 0xBADEFEE for a valid object */ #endif } obj_t; @@ -98,25 +105,21 @@ typedef struct obj { /* the dummy struct that each derived class can be casted t #ifndef NDEBUG /* this means if debug... */ # define BEGINobjInstance \ objInfo_t *pObjInfo; \ + objData_t objData; \ unsigned int iObjCooCKiE; /* prevent name conflict, thus the strange name */ # define ISOBJ_assert(pObj) \ do { \ - if(pObj == NULL) dbgPrintAllDebugInfo(); \ - assert((pObj) != NULL); \ - if(((obj_t*)(pObj))->iObjCooCKiE != (unsigned) 0xBADEFEE) dbgPrintAllDebugInfo(); \ - assert((unsigned) ((obj_t*)(pObj))->iObjCooCKiE == (unsigned) 0xBADEFEE); \ + ASSERT((pObj) != NULL); \ + ASSERT((unsigned) ((obj_t*)(pObj))->iObjCooCKiE == (unsigned) 0xBADEFEE); \ } while(0); # define ISOBJ_TYPE_assert(pObj, objType) \ do { \ - if(pObj == NULL) dbgPrintAllDebugInfo(); \ - assert(pObj != NULL); \ - if(((obj_t*)(pObj))->iObjCooCKiE != (unsigned) 0xBADEFEE) dbgPrintAllDebugInfo(); \ - assert((unsigned) pObj->iObjCooCKiE == (unsigned) 0xBADEFEE); \ - if(objGetObjID(pObj) != OBJ##objType) dbgPrintAllDebugInfo(); \ - assert(objGetObjID(pObj) == OBJ##objType); \ + ASSERT(pObj != NULL); \ + ASSERT((unsigned) pObj->iObjCooCKiE == (unsigned) 0xBADEFEE); \ + ASSERT(objGetObjID(pObj) == OBJ##objType); \ } while(0); #else /* non-debug mode, no checks but much faster */ -# define BEGINobjInstance objInfo_t *pObjInfo; +# define BEGINobjInstance objInfo_t *pObjInfo; objData_t objData; # define ISOBJ_TYPE_assert(pObj, objType) # define ISOBJ_assert(pObj) #endif diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c index 427d1b5e..d4007899 100644 --- a/plugins/imklog/imklog.c +++ b/plugins/imklog/imklog.c @@ -145,7 +145,7 @@ static rsRetVal writeSyslogV(int iPRI, const char *szFmt, va_list va) logmsg(iPRI, pMsg, INTERNAL_MSG); finalize_it: - return iRet; + RETiRet; } /* And now the same with variable arguments */ @@ -211,7 +211,7 @@ rsRetVal Syslog(int priority, char *fmt, ...) va_end(ap); } - return iRet; + RETiRet; } @@ -623,7 +623,7 @@ CODESTARTrunInput break; } } - return iRet; + RETiRet; ENDrunInput diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index baa193d5..d68e3772 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -117,7 +117,7 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) finalize_it: free(pNewVal); /* in any case, this is no longer needed */ - return iRet; + RETiRet; } @@ -115,7 +115,7 @@ queueTurnOffDAMode(queue_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, queue); - assert(pThis->bRunsDA); + ASSERT(pThis->bRunsDA); /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need * to wait for its startup... -- rgerhards, 2008-01-25 @@ -386,7 +386,7 @@ static rsRetVal qConstructFixedArray(queue_t *pThis) { DEFiRet; - assert(pThis != NULL); + ASSERT(pThis != NULL); if(pThis->iMaxQueueSize == 0) ABORT_FINALIZE(RS_RET_QSIZE_ZERO); @@ -409,7 +409,7 @@ static rsRetVal qDestructFixedArray(queue_t *pThis) { DEFiRet; - assert(pThis != NULL); + ASSERT(pThis != NULL); if(pThis->tVars.farray.pBuf != NULL) free(pThis->tVars.farray.pBuf); @@ -421,7 +421,7 @@ static rsRetVal qAddFixedArray(queue_t *pThis, void* in) { DEFiRet; - assert(pThis != NULL); + ASSERT(pThis != NULL); pThis->tVars.farray.pBuf[pThis->tVars.farray.tail] = in; pThis->tVars.farray.tail++; if (pThis->tVars.farray.tail == pThis->iMaxQueueSize) @@ -434,7 +434,7 @@ static rsRetVal qDelFixedArray(queue_t *pThis, void **out) { DEFiRet; - assert(pThis != NULL); + ASSERT(pThis != NULL); *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.head]; pThis->tVars.farray.head++; @@ -450,7 +450,7 @@ static rsRetVal qConstructLinkedList(queue_t *pThis) { DEFiRet; - assert(pThis != NULL); + ASSERT(pThis != NULL); pThis->tVars.linklist.pRoot = 0; pThis->tVars.linklist.pLast = 0; @@ -479,7 +479,7 @@ static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr) DEFiRet; qLinkedList_t *pEntry; - assert(pThis != NULL); + ASSERT(pThis != NULL); if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } @@ -503,8 +503,8 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) DEFiRet; qLinkedList_t *pEntry; - assert(pThis != NULL); - assert(pThis->tVars.linklist.pRoot != NULL); + ASSERT(pThis != NULL); + ASSERT(pThis->tVars.linklist.pRoot != NULL); pEntry = pThis->tVars.linklist.pRoot; *ppUsr = pEntry->pUsr; @@ -654,7 +654,7 @@ static rsRetVal qConstructDisk(queue_t *pThis) DEFiRet; int bRestarted = 0; - assert(pThis != NULL); + ASSERT(pThis != NULL); /* and now check if there is some persistent information that needs to be read in */ iRet = queueTryLoadPersistedInfo(pThis); @@ -704,7 +704,7 @@ static rsRetVal qDestructDisk(queue_t *pThis) { DEFiRet; - assert(pThis != NULL); + ASSERT(pThis != NULL); strmDestruct(&pThis->tVars.disk.pWrite); strmDestruct(&pThis->tVars.disk.pRead); @@ -719,7 +719,7 @@ static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) { DEFiRet; - assert(pThis != NULL); + ASSERT(pThis != NULL); CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite)); CHKiRet(strmFlush(pThis->tVars.disk.pWrite)); @@ -750,7 +750,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr) DEFiRet; rsRetVal iRetLocal; - assert(pThis != NULL); + ASSERT(pThis != NULL); /* calling the consumer is quite different here than it is from a worker thread */ iRetLocal = pThis->pConsumer(pThis->pUsr, pUsr); @@ -781,7 +781,7 @@ queueAdd(queue_t *pThis, void *pUsr) { DEFiRet; - assert(pThis != NULL); + ASSERT(pThis != NULL); CHKiRet(pThis->qAdd(pThis, pUsr)); ++pThis->iQueueSize; @@ -799,7 +799,7 @@ queueDel(queue_t *pThis, void *pUsr) { DEFiRet; - assert(pThis != NULL); + ASSERT(pThis != NULL); /* we do NOT abort if we encounter an error, because otherwise the queue * will not be decremented, what will most probably result in an endless loop. @@ -835,7 +835,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) rsRetVal iRetLocal; ISOBJ_TYPE_assert(pThis, queue); - assert(pThis->pqParent == NULL); /* detect invalid calling sequence */ + ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis)); @@ -1052,9 +1052,9 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, DEFiRet; queue_t *pThis; - assert(ppThis != NULL); - assert(pConsumer != NULL); - assert(iWorkerThreads >= 0); + ASSERT(ppThis != NULL); + ASSERT(pConsumer != NULL); + ASSERT(iWorkerThreads >= 0); if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -1106,7 +1106,9 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, } finalize_it: +RUNLOG_VAR("%x", pThis->iObjCooCKiE ); OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP +RUNLOG_VAR("%x", pThis->iObjCooCKiE ); RETiRet; } @@ -1159,6 +1161,7 @@ static int queueChkDiscardMsg(queue_t *pThis, int iQueueSize, int bRunsDA, void int iSeverity; ISOBJ_TYPE_assert(pThis, queue); +RUNLOG_VAR("%p", pUsr); ISOBJ_assert(pUsr); if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) { @@ -1363,7 +1366,7 @@ queueRegOnWrkrShutdown(queue_t *pThis) if(pThis->pqParent != NULL) { RUNLOG_VAR("%p", pThis->pqParent); RUNLOG_VAR("%p", pThis->pqParent->pWtpDA); - assert(pThis->pqParent->pWtpDA != NULL); + ASSERT(pThis->pqParent->pWtpDA != NULL); pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ #if 0 @@ -1413,7 +1416,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ uchar pszBuf[64]; size_t lenBuf; - assert(pThis != NULL); + ASSERT(pThis != NULL); /* finalize some initializations that could not yet be done because it is * influenced by properties which might have been set after queueConstruct () @@ -1519,7 +1522,7 @@ static rsRetVal queuePersist(queue_t *pThis) uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; - assert(pThis != NULL); + ASSERT(pThis != NULL); if(pThis->qType != QUEUETYPE_DISK) { if(pThis->iQueueSize > 0) { @@ -1614,7 +1617,7 @@ rsRetVal queueDestruct(queue_t **ppThis) DEFiRet; queue_t *pThis; - assert(ppThis != NULL); + ASSERT(ppThis != NULL); pThis = *ppThis; ISOBJ_TYPE_assert(pThis, queue); @@ -1891,7 +1894,7 @@ static rsRetVal queueSetProperty(queue_t *pThis, property_t *pProp) DEFiRet; ISOBJ_TYPE_assert(pThis, queue); - assert(pProp != NULL); + ASSERT(pProp != NULL); if(isProp("iQueueSize")) { pThis->iQueueSize = pProp->val.vInt; @@ -60,7 +60,9 @@ SyncObjExit(pthread_mutex_t **mut) void lockObj(pthread_mutex_t *mut) { - pthread_mutex_lock(mut); + BEGINfunc + d_pthread_mutex_lock(mut); + ENDfunc } /* unlock an object. The synchronization tool (mutex) must be passed in. @@ -68,6 +70,8 @@ lockObj(pthread_mutex_t *mut) void unlockObj(pthread_mutex_t *mut) { - pthread_mutex_unlock(mut); + BEGINfunc + d_pthread_mutex_unlock(mut); + ENDfunc } #endif /* #ifndef NDEBUG */ @@ -39,8 +39,8 @@ * are better to trace in the stackframe. */ #ifdef NDEBUG -#define LockObj(x) pthread_mutex_lock((x)->Sync_mut) -#define UnlockObj(x) pthread_mutex_unlock((x)->Sync_mut) +#define LockObj(x) d_pthread_mutex_lock((x)->Sync_mut) +#define UnlockObj(x) d_pthread_mutex_unlock((x)->Sync_mut) #else #define LockObj(x) lockObj((x)->Sync_mut) #define UnlockObj(x) unlockObj((x)->Sync_mut) @@ -295,17 +295,12 @@ static int bFinished = 0; /* used by termination signal handler, read-only excep * termination. */ -/* - * Intervals at which we flush out "message repeated" messages, +/* Intervals at which we flush out "message repeated" messages, * in seconds after previous message is logged. After each flush, * we move to the next interval until we reach the largest. + * TODO: this shall go into action object! -- rgerhards, 2008-01-29 */ -int repeatinterval[] = { 30, 60 }; /* # of secs before flush */ -#define MAXREPEAT ((int)((sizeof(repeatinterval) / sizeof(repeatinterval[0])) - 1)) -#define REPEATTIME(f) ((f)->f_time + repeatinterval[(f)->f_repeatcount]) -#define BACKOFF(f) { if (++(f)->f_repeatcount > MAXREPEAT) \ - (f)->f_repeatcount = MAXREPEAT; \ - } +int repeatinterval[2] = { 30, 60 }; /* # of secs before flush */ #define LIST_DELIMITER ':' /* delimiter between two hosts */ @@ -386,7 +381,7 @@ uchar *pszWorkDir = NULL;/* name of rsyslog's spool directory (without trailing static unsigned int Forwarding = 0; char LocalHostName[MAXHOSTNAMELEN+1];/* our hostname - read-only after startup */ char *LocalDomain; /* our local domain name - read-only after startup */ -static int MarkInterval = 20 * 60; /* interval between marks in seconds - read-only after startup */ +int MarkInterval = 20 * 60; /* interval between marks in seconds - read-only after startup */ int family = PF_UNSPEC; /* protocol family (IPv4, IPv6 or both), set via cmdline */ int send_to_all = 0; /* send message to all IPv4/IPv6 addresses */ static int NoFork = 0; /* don't fork - don't run in daemon mode - read-only after startup */ @@ -558,7 +553,6 @@ static uchar template_StdPgSQLFmt[] = "\"insert into SystemEvents (Message, Faci /* up to the next comment, prototypes that should be removed by reordering */ /* Function prototypes. */ static char **crunch_list(char *list); -static rsRetVal fprintlog(action_t *pAction); static void reapchild(); static void debug_switch(); static rsRetVal cfline(uchar *line, selector_t **pfCurr); @@ -1677,99 +1671,6 @@ static void callActionMutClean(void *arg) } -/* call the configured action. Does all necessary housekeeping. - * rgerhards, 2007-08-01 - */ -static rsRetVal callAction(msg_t *pMsg, action_t *pAction) -{ - DEFiRet; - int iCancelStateSave; - - assert(pMsg != NULL); - assert(pAction != NULL); - - /* Make sure nodbody else modifies/uses this action object. Right now, this - * is important because of "message repeated n times" processing, later it will - * become important when we (possibly) have multiple worker threads. - * rgerhards, 2007-12-11 - */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - LockObj(pAction); - pthread_cleanup_push(callActionMutClean, pAction->Sync_mut); - pthread_setcancelstate(iCancelStateSave, NULL); - - /* first, we need to check if this is a disabled - * entry. If so, we must not further process it. - * rgerhards 2005-09-26 - * In the future, disabled modules may be re-probed from time - * to time. They are in a perfectly legal state, except that the - * doAction method indicated that it wanted to be disabled - but - * we do not consider this is a solution for eternity... So we - * should check from time to time if affairs have improved. - * rgerhards, 2007-07-24 - */ - if(pAction->bEnabled == 0) { - ABORT_FINALIZE(RS_RET_OK); - } - - if(actionIsSuspended(pAction)) { - CHKiRet(actionTryResume(pAction)); - } - - /* don't output marks to recently written files */ - if ((pMsg->msgFlags & MARK) && (time(NULL) - pAction->f_time) < MarkInterval / 2) { - ABORT_FINALIZE(RS_RET_OK); - } - - /* suppress duplicate messages - */ - if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL && - (pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) && - !strcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) && - !strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) && - !strcmp(getPROCID(pMsg), getPROCID(pAction->f_pMsg)) && - !strcmp(getAPPNAME(pMsg), getAPPNAME(pAction->f_pMsg))) { - pAction->f_prevcount++; - dbgprintf("msg repeated %d times, %ld sec of %d.\n", - pAction->f_prevcount, time(NULL) - pAction->f_time, - repeatinterval[pAction->f_repeatcount]); - /* use current message, so we have the new timestamp (means we need to discard previous one) */ - MsgDestruct(&pAction->f_pMsg); - pAction->f_pMsg = MsgAddRef(pMsg); - /* If domark would have logged this by now, flush it now (so we don't hold - * isolated messages), but back off so we'll flush less often in the future. - */ - if(time(NULL) > REPEATTIME(pAction)) { - iRet = fprintlog(pAction); - BACKOFF(pAction); - } - } else { - /* new message, save it */ - /* first check if we have a previous message stored - * if so, emit and then discard it first - */ - if(pAction->f_pMsg != NULL) { - if(pAction->f_prevcount > 0) - fprintlog(pAction); - /* we do not care about iRet above - I think it's right but if we have - * some troubles, you know where to look at ;) -- rgerhards, 2007-08-01 - */ - MsgDestruct(&pAction->f_pMsg); - } - pAction->f_pMsg = MsgAddRef(pMsg); - /* call the output driver */ - iRet = fprintlog(pAction); - } - -finalize_it: - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - UnlockObj(pAction); - pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - pthread_setcancelstate(iCancelStateSave, NULL); - RETiRet; -} - - /* helper to processMsg(), used to call the configured actions. It is * executed from within llExecFunc() of the action list. * rgerhards, 2007-08-02 @@ -1792,7 +1693,7 @@ DEFFUNC_llExecFunc(processMsgDoActions) ABORT_FINALIZE(RS_RET_OK); } - iRetMod = callAction(pDoActData->pMsg, pAction); + iRetMod = actionCallAction(pAction, pDoActData->pMsg); if(iRetMod == RS_RET_DISCARDMSG) { ABORT_FINALIZE(RS_RET_DISCARDMSG); } else if(iRetMod == RS_RET_SUSPENDED) { @@ -2333,88 +2234,6 @@ logmsg(int pri, msg_t *pMsg, int flags) } -/* rgerhards 2004-11-09: fprintlog() is the actual driver for - * the output channel. It receives the channel description (f) as - * well as the message and outputs them according to the channel - * semantics. The message is typically already contained in the - * channel save buffer (f->f_prevline). This is not only the case - * when a message was already repeated but also when a new message - * arrived. - * rgerhards 2007-08-01: interface changed to use action_t - * rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE - * CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do - * not do this here. Failing to do so results in all kinds of - * "interesting" problems! - */ -rsRetVal -fprintlog(action_t *pAction) -{ - msg_t *pMsgSave; /* to save current message pointer, necessary to restore - it in case it needs to be updated (e.g. repeated msgs) */ - DEFiRet; - - pMsgSave = NULL; /* indicate message poiner not saved */ - /* first check if this is a regular message or the repeation of - * a previous message. If so, we need to change the message text - * to "last message repeated n times" and then go ahead and write - * it. Please note that we can not modify the message object, because - * that would update it in other selectors as well. As such, we first - * need to create a local copy of the message, which we than can update. - * rgerhards, 2007-07-10 - */ - if(pAction->f_prevcount > 1) { - msg_t *pMsg; - uchar szRepMsg[64]; - snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times", - pAction->f_prevcount); - - if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) { - /* it failed - nothing we can do against it... */ - dbgprintf("Message duplication failed, dropping repeat message.\n"); - ABORT_FINALIZE(RS_RET_ERR); - } - - /* We now need to update the other message properties. - * ... RAWMSG is a problem ... Please note that digital - * signatures inside the message are also invalidated. - */ - getCurrTime(&(pMsg->tRcvdAt)); - getCurrTime(&(pMsg->tTIMESTAMP)); - MsgSetMSG(pMsg, (char*)szRepMsg); - MsgSetRawMsg(pMsg, (char*)szRepMsg); - - pMsgSave = pAction->f_pMsg; /* save message pointer for later restoration */ - pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */ - } - - dbgprintf("Called fprintlog, logging to %s", modGetStateName(pAction->pMod)); - - time(&pAction->f_time); /* we need this for message repeation processing */ - - /* When we reach this point, we have a valid, non-disabled action. - * So let's execute it. -- rgerhards, 2007-07-24 - */ - iRet = actionDoAction(pAction); - -finalize_it: - if(pMsgSave != NULL) { - /* we had saved the original message pointer. That was - * done because we needed to create a temporary one - * (most often for "message repeated n time" handling). If so, - * we need to restore the original one now, so that procesing - * can continue as normal. We also need to discard the temporary - * one, as we do not like memory leaks ;) Please note that the original - * message object will be discarded by our callers, so this is nothing - * of our business. rgerhards, 2007-07-10 - */ - MsgDestruct(&pAction->f_pMsg); - pAction->f_pMsg = pMsgSave; /* restore it */ - } - - RETiRet; -} - - static void reapchild() { @@ -2445,7 +2264,7 @@ DEFFUNC_llExecFunc(flushRptdMsgsActions) dbgprintf("flush %s: repeated %d times, %d sec.\n", modGetStateName(pAction->pMod), pAction->f_prevcount, repeatinterval[pAction->f_repeatcount]); - fprintlog(pAction); + actionWriteToAction(pAction); BACKOFF(pAction); } UnlockObj(pAction); @@ -3006,7 +2825,7 @@ DEFFUNC_llExecFunc(freeSelectorsActions) /* flush any pending output */ if(pAction->f_prevcount) { - fprintlog(pAction); + actionWriteToAction(pAction); } return RS_RET_OK; /* never fails ;) */ @@ -81,6 +81,19 @@ extern int DisableDNS; extern char **StripDomains; extern char *LocalDomain; extern int bDropMalPTRMsgs; -extern char ctty[]; +extern char ctty[]; +extern int MarkInterval; + +/* Intervals at which we flush out "message repeated" messages, + * in seconds after previous message is logged. After each flush, + * we move to the next interval until we reach the largest. + * TODO: move this to action object! + */ +extern int repeatinterval[2]; +#define MAXREPEAT ((int)((sizeof(repeatinterval) / sizeof(repeatinterval[0])) - 1)) +#define REPEATTIME(f) ((f)->f_time + repeatinterval[(f)->f_repeatcount]) +#define BACKOFF(f) { if (++(f)->f_repeatcount > MAXREPEAT) \ + (f)->f_repeatcount = MAXREPEAT; \ + } #endif /* #ifndef SYSLOGD_H_INCLUDED */ |