diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 141 |
1 files changed, 108 insertions, 33 deletions
@@ -58,6 +58,7 @@ static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0, static time_t iActExecEveryNthOccurTO = 0; /* timeout for n-occurence setting (in seconds, 0=never) */ static int glbliActionResumeInterval = 30; int glbliActionResumeRetryCount = 0; /* how often should suspended actions be retried? */ +static int bActionRepMsgHasMsg = 0; /* last messsage repeated... has msg fragment in it */ /* main message queue and its configuration parameters */ static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ @@ -179,7 +180,7 @@ rsRetVal actionDestruct(action_t *pThis) ASSERT(pThis != NULL); if(pThis->pQueue != NULL) { - queueDestruct(&pThis->pQueue); + qqueueDestruct(&pThis->pQueue); } if(pThis->pMod != NULL) @@ -254,7 +255,7 @@ actionConstructFinalize(action_t *pThis) * to be run on multiple threads. So far, this is forbidden by the interface * spec. -- rgerhards, 2008-01-30 */ - CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); + CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -267,24 +268,24 @@ actionConstructFinalize(action_t *pThis) errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ } - queueSetpUsr(pThis->pQueue, pThis); - setQPROP(queueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace); - setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); - setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); - setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); - setQPROP(queueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown ); - setQPROP(queueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown); - setQPROP(queueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown); - setQPROP(queueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq); - setQPROP(queueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark); - setQPROP(queueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark); - setQPROP(queueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark); - setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity); - setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs); - setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown); - setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown); - setQPROP(queueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr); - setQPROP(queueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr); + qqueueSetpUsr(pThis->pQueue, pThis); + setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace); + setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); + setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); + setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); + setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown ); + setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown); + setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown); + setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq); + setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark); + setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark); + setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark); + setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs); + setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown); + setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown); + setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr); + setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr); # undef setQPROP # undef setQPROPstr @@ -293,7 +294,7 @@ actionConstructFinalize(action_t *pThis) bActionQSaveOnShutdown, iActionQueMaxDiskSpace); - CHKiRet(queueStart(pThis->pQueue)); + CHKiRet(qqueueStart(pThis->pQueue)); dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue); /* and now reset the queue params (see comment in its function header!) */ @@ -352,7 +353,13 @@ static rsRetVal actionTryResume(action_t *pThis) ASSERT(pThis != NULL); - ttNow = getActNow(pThis); /* cache "now" */ + /* for resume handling, we must always obtain a fresh timestamp. We used + * to use the action timestamp, but in this case we will never reach a + * point where a resumption is actually tried, because the action timestamp + * is always in the past. So we can not avoid doing a fresh time() call + * here. -- rgerhards, 2009-03-18 + */ + time(&ttNow); /* cache "now" */ /* first check if it is time for a re-try */ if(ttNow > pThis->ttResumeRtry) { @@ -418,6 +425,7 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) DEFiRet; int iRetries; int i; + int iArr; int iSleepPeriod; int bCallAction; int iCancelStateSave; @@ -432,7 +440,15 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]))); + switch(pAction->eParamPassing) { + case ACT_STRING_PASSING: + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]))); + break; + case ACT_ARRAY_PASSING: + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(ppMsgs[i]))); + break; + default:assert(0); /* software bug if this happens! */ + } } iRetries = 0; /* We now must guard the output module against execution by multiple threads. The @@ -488,7 +504,19 @@ finalize_it: /* cleanup */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { if(ppMsgs[i] != NULL) { - d_free(ppMsgs[i]); + switch(pAction->eParamPassing) { + case ACT_ARRAY_PASSING: + iArr = 0; + while(((char **)ppMsgs[i])[iArr] != NULL) + d_free(((char **)ppMsgs[i])[iArr++]); + d_free(ppMsgs[i]); + break; + case ACT_STRING_PASSING: + d_free(ppMsgs[i]); + break; + default: + assert(0); + } } } d_free(ppMsgs); @@ -498,6 +526,39 @@ finalize_it: } #pragma GCC diagnostic warning "-Wempty-body" + +/* call the HUP handler for a given action, if such a handler is defined. The + * action mutex is locked, because the HUP handler most probably needs to modify + * some internal state information. + * rgerhards, 2008-10-22 + */ +#pragma GCC diagnostic ignored "-Wempty-body" +rsRetVal +actionCallHUPHdlr(action_t *pAction) +{ + DEFiRet; + int iCancelStateSave; + + ASSERT(pAction != NULL); + DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP); + + if(pAction->pMod->doHUP == NULL) { + FINALIZE; /* no HUP handler, so we are done ;) */ + } + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(&pAction->mutActExec); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); + pthread_setcancelstate(iCancelStateSave, NULL); + CHKiRet(pAction->pMod->doHUP(pAction->pModData)); + pthread_cleanup_pop(1); /* unlock mutex */ + +finalize_it: + RETiRet; +} +#pragma GCC diagnostic warning "-Wempty-body" + + /* set the action message queue mode * TODO: probably move this into queue object, merge with MainMsgQueue! * rgerhards, 2008-01-28 @@ -588,9 +649,7 @@ actionWriteToAction(action_t *pAction) */ if(pAction->f_prevcount > 1) { msg_t *pMsg; - uchar szRepMsg[64]; - snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times", - pAction->f_prevcount); + uchar szRepMsg[1024]; if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) { /* it failed - nothing we can do against it... */ @@ -598,11 +657,19 @@ actionWriteToAction(action_t *pAction) ABORT_FINALIZE(RS_RET_ERR); } + if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */ + snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times", + pAction->f_prevcount); + } else { + snprintf((char*)szRepMsg, sizeof(szRepMsg), "message repeated %d times: [%.800s]", + pAction->f_prevcount, getMSG(pAction->f_pMsg)); + } + /* We now need to update the other message properties. * ... RAWMSG is a problem ... Please note that digital * signatures inside the message are also invalidated. */ - datetime.getCurrTime(&(pMsg->tRcvdAt)); + datetime.getCurrTime(&(pMsg->tRcvdAt), &(pMsg->ttGenTime)); memcpy(&pMsg->tTIMESTAMP, &pMsg->tRcvdAt, sizeof(struct syslogTime)); MsgSetMSG(pMsg, (char*)szRepMsg); MsgSetRawMsg(pMsg, (char*)szRepMsg); @@ -625,18 +692,17 @@ actionWriteToAction(action_t *pAction) dbgprintf("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n", (int) pAction->iSecsExecOnceInterval, (int) getActNow(pAction), (int) (pAction->iSecsExecOnceInterval + pAction->tLastExec)); + pAction->tLastExec = getActNow(pAction); /* re-init time flags */ FINALIZE; } - pAction->f_time = pAction->tLastExec = getActNow(pAction); /* re-init time flags */ - /* Note: tLastExec could be set in the if block above, but f_time causes us a hard time - * so far, I do not see a solution to getting rid of it. -- rgerhards, 2008-09-16 - */ + /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ + pAction->f_time = pAction->f_pMsg->ttGenTime; /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = queueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg)); + iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg)); if(iRet == RS_RET_OK) pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ @@ -785,6 +851,7 @@ actionAddCfSysLineHdrl(void) CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL)); finalize_it: RETiRet; @@ -818,6 +885,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques pAction->iSecsExecOnceInterval = iActExecOnceInterval; pAction->iExecEveryNthOccur = iActExecEveryNthOccur; pAction->iExecEveryNthOccurTO = iActExecEveryNthOccurTO; + pAction->bRepMsgHasMsg = bActionRepMsgHasMsg; iActExecEveryNthOccur = 0; /* auto-reset */ iActExecEveryNthOccurTO = 0; /* auto-reset */ @@ -858,6 +926,13 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques ABORT_FINALIZE(RS_RET_RQD_TPLOPT_MISSING); } + /* set parameter-passing mode */ + if(iTplOpts & OMSR_TPL_AS_ARRAY) { + pAction->eParamPassing = ACT_ARRAY_PASSING; + } else { + pAction->eParamPassing = ACT_STRING_PASSING; + } + dbgprintf("template: '%s' assigned\n", pTplName); } |