summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c141
1 files changed, 108 insertions, 33 deletions
diff --git a/action.c b/action.c
index 5c5bdbe9..03073153 100644
--- a/action.c
+++ b/action.c
@@ -58,6 +58,7 @@ static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,
static time_t iActExecEveryNthOccurTO = 0; /* timeout for n-occurence setting (in seconds, 0=never) */
static int glbliActionResumeInterval = 30;
int glbliActionResumeRetryCount = 0; /* how often should suspended actions be retried? */
+static int bActionRepMsgHasMsg = 0; /* last messsage repeated... has msg fragment in it */
/* main message queue and its configuration parameters */
static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
@@ -179,7 +180,7 @@ rsRetVal actionDestruct(action_t *pThis)
ASSERT(pThis != NULL);
if(pThis->pQueue != NULL) {
- queueDestruct(&pThis->pQueue);
+ qqueueDestruct(&pThis->pQueue);
}
if(pThis->pMod != NULL)
@@ -254,7 +255,7 @@ actionConstructFinalize(action_t *pThis)
* to be run on multiple threads. So far, this is forbidden by the interface
* spec. -- rgerhards, 2008-01-30
*/
- CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
+ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
obj.SetName((obj_t*) pThis->pQueue, pszQName);
/* ... set some properties ... */
@@ -267,24 +268,24 @@ actionConstructFinalize(action_t *pThis)
errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
}
- queueSetpUsr(pThis->pQueue, pThis);
- setQPROP(queueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
- setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
- setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
- setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
- setQPROP(queueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
- setQPROP(queueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
- setQPROP(queueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown);
- setQPROP(queueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq);
- setQPROP(queueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark);
- setQPROP(queueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark);
- setQPROP(queueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark);
- setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
- setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
- setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
- setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
- setQPROP(queueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr);
- setQPROP(queueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr);
+ qqueueSetpUsr(pThis->pQueue, pThis);
+ setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
+ setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
+ setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
+ setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
+ setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
+ setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
+ setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown);
+ setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq);
+ setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark);
+ setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark);
+ setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark);
+ setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
+ setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
+ setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
+ setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
+ setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr);
+ setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr);
# undef setQPROP
# undef setQPROPstr
@@ -293,7 +294,7 @@ actionConstructFinalize(action_t *pThis)
bActionQSaveOnShutdown, iActionQueMaxDiskSpace);
- CHKiRet(queueStart(pThis->pQueue));
+ CHKiRet(qqueueStart(pThis->pQueue));
dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue);
/* and now reset the queue params (see comment in its function header!) */
@@ -352,7 +353,13 @@ static rsRetVal actionTryResume(action_t *pThis)
ASSERT(pThis != NULL);
- ttNow = getActNow(pThis); /* cache "now" */
+ /* for resume handling, we must always obtain a fresh timestamp. We used
+ * to use the action timestamp, but in this case we will never reach a
+ * point where a resumption is actually tried, because the action timestamp
+ * is always in the past. So we can not avoid doing a fresh time() call
+ * here. -- rgerhards, 2009-03-18
+ */
+ time(&ttNow); /* cache "now" */
/* first check if it is time for a re-try */
if(ttNow > pThis->ttResumeRtry) {
@@ -418,6 +425,7 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg)
DEFiRet;
int iRetries;
int i;
+ int iArr;
int iSleepPeriod;
int bCallAction;
int iCancelStateSave;
@@ -432,7 +440,15 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg)
/* here we must loop to process all requested strings */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i])));
+ switch(pAction->eParamPassing) {
+ case ACT_STRING_PASSING:
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i])));
+ break;
+ case ACT_ARRAY_PASSING:
+ CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(ppMsgs[i])));
+ break;
+ default:assert(0); /* software bug if this happens! */
+ }
}
iRetries = 0;
/* We now must guard the output module against execution by multiple threads. The
@@ -488,7 +504,19 @@ finalize_it:
/* cleanup */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
if(ppMsgs[i] != NULL) {
- d_free(ppMsgs[i]);
+ switch(pAction->eParamPassing) {
+ case ACT_ARRAY_PASSING:
+ iArr = 0;
+ while(((char **)ppMsgs[i])[iArr] != NULL)
+ d_free(((char **)ppMsgs[i])[iArr++]);
+ d_free(ppMsgs[i]);
+ break;
+ case ACT_STRING_PASSING:
+ d_free(ppMsgs[i]);
+ break;
+ default:
+ assert(0);
+ }
}
}
d_free(ppMsgs);
@@ -498,6 +526,39 @@ finalize_it:
}
#pragma GCC diagnostic warning "-Wempty-body"
+
+/* call the HUP handler for a given action, if such a handler is defined. The
+ * action mutex is locked, because the HUP handler most probably needs to modify
+ * some internal state information.
+ * rgerhards, 2008-10-22
+ */
+#pragma GCC diagnostic ignored "-Wempty-body"
+rsRetVal
+actionCallHUPHdlr(action_t *pAction)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ASSERT(pAction != NULL);
+ DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP);
+
+ if(pAction->pMod->doHUP == NULL) {
+ FINALIZE; /* no HUP handler, so we are done ;) */
+ }
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(&pAction->mutActExec);
+ pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ CHKiRet(pAction->pMod->doHUP(pAction->pModData));
+ pthread_cleanup_pop(1); /* unlock mutex */
+
+finalize_it:
+ RETiRet;
+}
+#pragma GCC diagnostic warning "-Wempty-body"
+
+
/* set the action message queue mode
* TODO: probably move this into queue object, merge with MainMsgQueue!
* rgerhards, 2008-01-28
@@ -588,9 +649,7 @@ actionWriteToAction(action_t *pAction)
*/
if(pAction->f_prevcount > 1) {
msg_t *pMsg;
- uchar szRepMsg[64];
- snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times",
- pAction->f_prevcount);
+ uchar szRepMsg[1024];
if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) {
/* it failed - nothing we can do against it... */
@@ -598,11 +657,19 @@ actionWriteToAction(action_t *pAction)
ABORT_FINALIZE(RS_RET_ERR);
}
+ if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */
+ snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times",
+ pAction->f_prevcount);
+ } else {
+ snprintf((char*)szRepMsg, sizeof(szRepMsg), "message repeated %d times: [%.800s]",
+ pAction->f_prevcount, getMSG(pAction->f_pMsg));
+ }
+
/* We now need to update the other message properties.
* ... RAWMSG is a problem ... Please note that digital
* signatures inside the message are also invalidated.
*/
- datetime.getCurrTime(&(pMsg->tRcvdAt));
+ datetime.getCurrTime(&(pMsg->tRcvdAt), &(pMsg->ttGenTime));
memcpy(&pMsg->tTIMESTAMP, &pMsg->tRcvdAt, sizeof(struct syslogTime));
MsgSetMSG(pMsg, (char*)szRepMsg);
MsgSetRawMsg(pMsg, (char*)szRepMsg);
@@ -625,18 +692,17 @@ actionWriteToAction(action_t *pAction)
dbgprintf("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n",
(int) pAction->iSecsExecOnceInterval, (int) getActNow(pAction),
(int) (pAction->iSecsExecOnceInterval + pAction->tLastExec));
+ pAction->tLastExec = getActNow(pAction); /* re-init time flags */
FINALIZE;
}
- pAction->f_time = pAction->tLastExec = getActNow(pAction); /* re-init time flags */
- /* Note: tLastExec could be set in the if block above, but f_time causes us a hard time
- * so far, I do not see a solution to getting rid of it. -- rgerhards, 2008-09-16
- */
+ /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
+ pAction->f_time = pAction->f_pMsg->ttGenTime;
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = queueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
+ iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
if(iRet == RS_RET_OK)
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
@@ -785,6 +851,7 @@ actionAddCfSysLineHdrl(void)
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL));
finalize_it:
RETiRet;
@@ -818,6 +885,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
pAction->iSecsExecOnceInterval = iActExecOnceInterval;
pAction->iExecEveryNthOccur = iActExecEveryNthOccur;
pAction->iExecEveryNthOccurTO = iActExecEveryNthOccurTO;
+ pAction->bRepMsgHasMsg = bActionRepMsgHasMsg;
iActExecEveryNthOccur = 0; /* auto-reset */
iActExecEveryNthOccurTO = 0; /* auto-reset */
@@ -858,6 +926,13 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
ABORT_FINALIZE(RS_RET_RQD_TPLOPT_MISSING);
}
+ /* set parameter-passing mode */
+ if(iTplOpts & OMSR_TPL_AS_ARRAY) {
+ pAction->eParamPassing = ACT_ARRAY_PASSING;
+ } else {
+ pAction->eParamPassing = ACT_STRING_PASSING;
+ }
+
dbgprintf("template: '%s' assigned\n", pTplName);
}