summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-30 13:07:44 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-30 13:07:44 +0000
commit61b10104612b3d776399f853f399e64ffe175e65 (patch)
treea6653e9e25d3da57451a699a2be9780eadf42da8 /action.c
parent91b5178c124417b419854cae35204b6742605af5 (diff)
downloadrsyslog-61b10104612b3d776399f853f399e64ffe175e65.tar.gz
rsyslog-61b10104612b3d776399f853f399e64ffe175e65.tar.xz
rsyslog-61b10104612b3d776399f853f399e64ffe175e65.zip
- changed the ommysql output plugin so that the (lengthy) connection
initialization now takes place in message processing. This works much better with the new queued action mode (fast startup) - fixed a newly introduced bug that caused output module's doAction entry point to be called on more than one thread under some circumstances
Diffstat (limited to 'action.c')
-rw-r--r--action.c43
1 files changed, 32 insertions, 11 deletions
diff --git a/action.c b/action.c
index b6b37381..77a22259 100644
--- a/action.c
+++ b/action.c
@@ -128,10 +128,9 @@ rsRetVal actionDestruct(action_t *pThis)
msgDestruct(&pThis->f_pMsg);
SYNC_OBJ_TOOL_EXIT(pThis);
+ pthread_mutex_destroy(&pThis->mutActExec);
if(pThis->ppTpl != NULL)
free(pThis->ppTpl);
- if(pThis->ppMsgs != NULL)
- free(pThis->ppMsgs);
free(pThis);
return RS_RET_OK;
@@ -154,6 +153,7 @@ rsRetVal actionConstruct(action_t **ppThis)
pThis->iResumeInterval = glbliActionResumeInterval;
pThis->iResumeRetryCount = glbliActionResumeRetryCount;
+ pthread_mutex_init(&pThis->mutActExec, NULL);
SYNC_OBJ_TOOL_INIT(pThis);
/* indicate we have a new action */
@@ -335,15 +335,31 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg)
int iRetries;
int i;
int iSleepPeriod;
+ int bCallAction;
+ int iCancelStateSave;
+ uchar **ppMsgs; /* array of message pointers for doAction */
ASSERT(pAction != NULL);
+ /* create the array for doAction() message pointers */
+ if((ppMsgs = calloc(pAction->iNumTpls, sizeof(uchar *))) == NULL) {
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+
/* here we must loop to process all requested strings */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &pAction->ppMsgs[i]));
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i])));
}
-
iRetries = 0;
+ /* We now must guard the output module against execution by multiple threads. The
+ * plugin interface specifies that output modules must not be thread-safe (except
+ * if they notify us they are - functionality not yet implemented...).
+ * rgerhards, 2008-01-30
+ */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(&pAction->mutActExec);
+ pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
+ pthread_setcancelstate(iCancelStateSave, NULL);
do {
/* on first invocation, this if should never be true. We just put it at the top
* of the loop so that processing (and code) is simplified. This code is actually
@@ -358,11 +374,14 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg)
/* first check if we are suspended and, if so, retry */
if(actionIsSuspended(pAction)) {
iRet = actionTryResume(pAction);
+ bCallAction = 0;
+ } else {
+ bCallAction = 1;
}
- if(iRet == RS_RET_OK) {
+ if(bCallAction) {
/* call configured action */
- iRet = pAction->pMod->mod.om.doAction(pAction->ppMsgs, pMsg->msgFlags, pAction->pModData);
+ iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData);
}
} while(iRet == RS_RET_SUSPENDED && (pAction->iResumeRetryCount == -1 || iRetries < pAction->iResumeRetryCount)); /* do...while! */
@@ -378,16 +397,15 @@ RUNLOG_STR("out of retry loop");
actionSuspend(pAction);
}
- if(iRet == RS_RET_OK)
- pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
+ pthread_cleanup_pop(1); /* unlock mutex */
finalize_it:
/* cleanup */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
- if(pAction->ppMsgs[i] != NULL) {
- free(pAction->ppMsgs[i]);
- pAction->ppMsgs[i] = NULL;
+ if(ppMsgs[i] != NULL) {
+ free(ppMsgs[i]);
}
+ free(ppMsgs);
}
msgDestruct(&pMsg); /* we are now finished with the message */
@@ -489,6 +507,9 @@ actionWriteToAction(action_t *pAction)
*/
iRet = queueEnqObj(pAction->pQueue, (void*) MsgAddRef(pAction->f_pMsg));
+ if(iRet == RS_RET_OK)
+ pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
+
finalize_it:
if(pMsgSave != NULL) {
/* we had saved the original message pointer. That was