summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-06-24 10:52:06 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-06-24 10:52:06 +0200
commit8eb10a7105dce1dc3ffc44b291de73d82bd04799 (patch)
treee3c70740599127009b6009a2846d8274e67c8900 /action.c
parent675d46f5b59f64e378968baa5e0dec6810090287 (diff)
parentc53ca3a23429e750aeb6ab1c3600ae8ecb3c8ac6 (diff)
downloadrsyslog-8eb10a7105dce1dc3ffc44b291de73d82bd04799.tar.gz
rsyslog-8eb10a7105dce1dc3ffc44b291de73d82bd04799.tar.xz
rsyslog-8eb10a7105dce1dc3ffc44b291de73d82bd04799.zip
Merge branch 'omfile'
Conflicts: ChangeLog
Diffstat (limited to 'action.c')
-rw-r--r--action.c176
1 files changed, 107 insertions, 69 deletions
diff --git a/action.c b/action.c
index 51620fce..01bbfd13 100644
--- a/action.c
+++ b/action.c
@@ -4,7 +4,7 @@
*
* File begun on 2007-08-06 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -72,6 +72,7 @@ static int iActionQueueNumWorkers = 1; /* number of worker threads for the mm
static uchar *pszActionQFName = NULL; /* prefix for the main message queue file */
static int64 iActionQueMaxFileSize = 1024*1024;
static int iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
+static int bActionQSyncQeueFiles = 0; /* sync queue files */
static int iActionQtoQShutdown = 0; /* queue shutdown */
static int iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
static int iActionQtoEnq = 2000; /* timeout for queue enque */
@@ -151,6 +152,7 @@ actionResetQueueParams(void)
iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
iActionQueMaxFileSize = 1024*1024;
iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
+ bActionQSyncQeueFiles = 0;
iActionQtoQShutdown = 0; /* queue shutdown */
iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
iActionQtoEnq = 2000; /* timeout for queue enque */
@@ -176,6 +178,7 @@ actionResetQueueParams(void)
*/
rsRetVal actionDestruct(action_t *pThis)
{
+ int i;
DEFiRet;
ASSERT(pThis != NULL);
@@ -193,6 +196,33 @@ rsRetVal actionDestruct(action_t *pThis)
pthread_mutex_destroy(&pThis->mutActExec);
d_free(pThis->pszName);
d_free(pThis->ppTpl);
+
+ /* message ptr cleanup */
+ for(i = 0 ; i < pThis->iNumTpls ; ++i) {
+ if(pThis->ppMsgs[i] != NULL) {
+ switch(pThis->eParamPassing) {
+ case ACT_ARRAY_PASSING:
+#if 0 /* later! */
+ iArr = 0;
+ while(((char **)pThis->ppMsgs[i])[iArr] != NULL) {
+ d_free(((char **)pThis->ppMsgs[i])[iArr++]);
+ ((char **)pThis->ppMsgs[i])[iArr++] = NULL;
+ }
+ d_free(pThis->ppMsgs[i]);
+ pThis->ppMsgs[i] = NULL;
+#endif
+ break;
+ case ACT_STRING_PASSING:
+ d_free(pThis->ppMsgs[i]);
+ break;
+ default:
+ assert(0);
+ }
+ }
+ }
+ d_free(pThis->ppMsgs);
+ d_free(pThis->lenMsgs);
+
d_free(pThis);
RETiRet;
@@ -209,10 +239,7 @@ rsRetVal actionConstruct(action_t **ppThis)
ASSERT(ppThis != NULL);
- if((pThis = (action_t*) calloc(1, sizeof(action_t))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
-
+ CHKmalloc(pThis = (action_t*) calloc(1, sizeof(action_t)));
pThis->iResumeInterval = glbliActionResumeInterval;
pThis->iResumeRetryCount = glbliActionResumeRetryCount;
pThis->tLastOccur = time(NULL); /* done once per action on startup only */
@@ -273,6 +300,7 @@ actionConstructFinalize(action_t *pThis)
setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
+ setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", bActionQSyncQeueFiles);
setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown);
@@ -429,37 +457,33 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg)
int iSleepPeriod;
int bCallAction;
int iCancelStateSave;
- uchar **ppMsgs; /* array of message pointers for doAction */
ASSERT(pAction != NULL);
- /* create the array for doAction() message pointers */
- if((ppMsgs = calloc(pAction->iNumTpls, sizeof(uchar *))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
+ /* We now must guard the output module against execution by multiple threads. The
+ * plugin interface specifies that output modules must not be thread-safe (except
+ * if they notify us they are - functionality not yet implemented...).
+ * rgerhards, 2008-01-30
+ */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(&pAction->mutActExec);
+ pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
+ pthread_setcancelstate(iCancelStateSave, NULL);
/* here we must loop to process all requested strings */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i])));
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pAction->ppMsgs[i]), &(pAction->lenMsgs[i])));
break;
case ACT_ARRAY_PASSING:
- CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(ppMsgs[i])));
+ CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pAction->ppMsgs[i])));
break;
default:assert(0); /* software bug if this happens! */
}
}
+
iRetries = 0;
- /* We now must guard the output module against execution by multiple threads. The
- * plugin interface specifies that output modules must not be thread-safe (except
- * if they notify us they are - functionality not yet implemented...).
- * rgerhards, 2008-01-30
- */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pAction->mutActExec);
- pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
- pthread_setcancelstate(iCancelStateSave, NULL);
do {
/* on first invocation, this if should never be true. We just put it at the top
* of the loop so that processing (and code) is simplified. This code is actually
@@ -484,7 +508,7 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg)
if(bCallAction) {
/* call configured action */
- iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData);
+ iRet = pAction->pMod->mod.om.doAction(pAction->ppMsgs, pMsg->msgFlags, pAction->pModData);
if(iRet == RS_RET_SUSPENDED) {
dbgprintf("Action requested to be suspended, done that.\n");
actionSuspend(pAction, getActNow(pAction));
@@ -498,30 +522,31 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg)
pAction->bEnabled = 0; /* that's it... */
}
- pthread_cleanup_pop(1); /* unlock mutex */
-
finalize_it:
/* cleanup */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
- if(ppMsgs[i] != NULL) {
+ if(pAction->ppMsgs[i] != NULL) {
switch(pAction->eParamPassing) {
case ACT_ARRAY_PASSING:
iArr = 0;
- while(((char **)ppMsgs[i])[iArr] != NULL)
- d_free(((char **)ppMsgs[i])[iArr++]);
- d_free(ppMsgs[i]);
+ while(((char **)pAction->ppMsgs[i])[iArr] != NULL) {
+ d_free(((char **)pAction->ppMsgs[i])[iArr++]);
+ ((char **)pAction->ppMsgs[i])[iArr++] = NULL;
+ }
+ d_free(pAction->ppMsgs[i]);
+ pAction->ppMsgs[i] = NULL;
break;
case ACT_STRING_PASSING:
- d_free(ppMsgs[i]);
break;
default:
assert(0);
}
}
}
- d_free(ppMsgs);
- msgDestruct(&pMsg); /* we are now finished with the message */
+ pthread_cleanup_pop(1); /* unlock mutex */
+
+ msgDestruct(&pMsg); /* we are now finished with the message */
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
@@ -632,7 +657,7 @@ actionWriteToAction(action_t *pAction)
if(pAction->iNbrNoExec < pAction->iExecEveryNthOccur - 1) {
++pAction->iNbrNoExec;
dbgprintf("action %p passed %d times to execution - less than neded - discarding\n",
- pAction, pAction->iNbrNoExec);
+ pAction, pAction->iNbrNoExec);
FINALIZE;
} else {
pAction->iNbrNoExec = 0; /* we execute the action now, so the number of no execs is down to */
@@ -649,6 +674,7 @@ actionWriteToAction(action_t *pAction)
*/
if(pAction->f_prevcount > 1) {
msg_t *pMsg;
+ size_t lenRepMsg;
uchar szRepMsg[1024];
if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) {
@@ -658,22 +684,19 @@ actionWriteToAction(action_t *pAction)
}
if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */
- snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times",
+ lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " last message repeated %d times",
pAction->f_prevcount);
} else {
- snprintf((char*)szRepMsg, sizeof(szRepMsg), "message repeated %d times: [%.800s]",
+ lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " message repeated %d times: [%.800s]",
pAction->f_prevcount, getMSG(pAction->f_pMsg));
}
- /* We now need to update the other message properties.
- * ... RAWMSG is a problem ... Please note that digital
+ /* We now need to update the other message properties. Please note that digital
* signatures inside the message are also invalidated.
*/
datetime.getCurrTime(&(pMsg->tRcvdAt), &(pMsg->ttGenTime));
memcpy(&pMsg->tTIMESTAMP, &pMsg->tRcvdAt, sizeof(struct syslogTime));
- MsgSetMSG(pMsg, (char*)szRepMsg);
- MsgSetRawMsg(pMsg, (char*)szRepMsg);
-
+ MsgReplaceMSG(pMsg, szRepMsg, lenRepMsg);
pMsgSave = pAction->f_pMsg; /* save message pointer for later restoration */
pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */
}
@@ -726,32 +749,13 @@ finalize_it:
}
-/* call the configured action. Does all necessary housekeeping.
- * rgerhards, 2007-08-01
- * FYI: currently, this function is only called from the queue
- * consumer. So we (conceptually) run detached from the input
- * threads (which also means we may run much later than when the
- * message was generated).
+/* helper to actonCallAction, mostly needed because of this damn
+ * pthread_cleanup_push() POSIX macro...
*/
-#pragma GCC diagnostic ignored "-Wempty-body"
-rsRetVal
-actionCallAction(action_t *pAction, msg_t *pMsg)
+static rsRetVal
+doActionCallAction(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
- int iCancelStateSave;
-
- ISOBJ_TYPE_assert(pMsg, msg);
- ASSERT(pAction != NULL);
-
- /* Make sure nodbody else modifies/uses this action object. Right now, this
- * is important because of "message repeated n times" processing and potentially
- * multiple worker threads. -- rgerhards, 2007-12-11
- */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- LockObj(pAction);
- pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
-
/* first, we need to check if this is a disabled
* entry. If so, we must not further process it.
* rgerhards 2005-09-26
@@ -812,10 +816,43 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
}
finalize_it:
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- UnlockObj(pAction);
- pthread_cleanup_pop(0); /* remove mutex cleanup handler */
- pthread_setcancelstate(iCancelStateSave, NULL);
+ RETiRet;
+}
+
+/* call the configured action. Does all necessary housekeeping.
+ * rgerhards, 2007-08-01
+ * FYI: currently, this function is only called from the queue
+ * consumer. So we (conceptually) run detached from the input
+ * threads (which also means we may run much later than when the
+ * message was generated).
+ */
+#pragma GCC diagnostic ignored "-Wempty-body"
+rsRetVal
+actionCallAction(action_t *pAction, msg_t *pMsg)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ISOBJ_TYPE_assert(pMsg, msg);
+ ASSERT(pAction != NULL);
+
+ /* We need to lock the mutex only for repeated line processing.
+ * rgerhards, 2009-06-19
+ */
+ if(pAction->f_ReduceRepeated == 1) {
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ LockObj(pAction);
+ pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ iRet = doActionCallAction(pAction, pMsg);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ UnlockObj(pAction);
+ pthread_cleanup_pop(0); /* remove mutex cleanup handler */
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ } else {
+ iRet = doActionCallAction(pAction, pMsg);
+ }
+
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
@@ -838,6 +875,7 @@ actionAddCfSysLineHdrl(void)
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL));
@@ -901,9 +939,9 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
*/
if(pAction->iNumTpls > 0) {
/* we first need to create the template pointer array */
- if((pAction->ppTpl = calloc(pAction->iNumTpls, sizeof(struct template *))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
+ CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *)));
+ CHKmalloc(pAction->ppMsgs = (uchar**) calloc(pAction->iNumTpls, sizeof(uchar *)));
+ CHKmalloc(pAction->lenMsgs = (size_t*) calloc(pAction->iNumTpls, sizeof(size_t)));
}
for(i = 0 ; i < pAction->iNumTpls ; ++i) {