From 61b10104612b3d776399f853f399e64ffe175e65 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 30 Jan 2008 13:07:44 +0000 Subject: - changed the ommysql output plugin so that the (lengthy) connection initialization now takes place in message processing. This works much better with the new queued action mode (fast startup) - fixed a newly introduced bug that caused output module's doAction entry point to be called on more than one thread under some circumstances --- action.c | 43 ++++++++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 11 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index b6b37381..77a22259 100644 --- a/action.c +++ b/action.c @@ -128,10 +128,9 @@ rsRetVal actionDestruct(action_t *pThis) msgDestruct(&pThis->f_pMsg); SYNC_OBJ_TOOL_EXIT(pThis); + pthread_mutex_destroy(&pThis->mutActExec); if(pThis->ppTpl != NULL) free(pThis->ppTpl); - if(pThis->ppMsgs != NULL) - free(pThis->ppMsgs); free(pThis); return RS_RET_OK; @@ -154,6 +153,7 @@ rsRetVal actionConstruct(action_t **ppThis) pThis->iResumeInterval = glbliActionResumeInterval; pThis->iResumeRetryCount = glbliActionResumeRetryCount; + pthread_mutex_init(&pThis->mutActExec, NULL); SYNC_OBJ_TOOL_INIT(pThis); /* indicate we have a new action */ @@ -335,15 +335,31 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) int iRetries; int i; int iSleepPeriod; + int bCallAction; + int iCancelStateSave; + uchar **ppMsgs; /* array of message pointers for doAction */ ASSERT(pAction != NULL); + /* create the array for doAction() message pointers */ + if((ppMsgs = calloc(pAction->iNumTpls, sizeof(uchar *))) == NULL) { + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &pAction->ppMsgs[i])); + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]))); } - iRetries = 0; + /* We now must guard the output module against execution by multiple threads. The + * plugin interface specifies that output modules must not be thread-safe (except + * if they notify us they are - functionality not yet implemented...). + * rgerhards, 2008-01-30 + */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(&pAction->mutActExec); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); + pthread_setcancelstate(iCancelStateSave, NULL); do { /* on first invocation, this if should never be true. We just put it at the top * of the loop so that processing (and code) is simplified. This code is actually @@ -358,11 +374,14 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) /* first check if we are suspended and, if so, retry */ if(actionIsSuspended(pAction)) { iRet = actionTryResume(pAction); + bCallAction = 0; + } else { + bCallAction = 1; } - if(iRet == RS_RET_OK) { + if(bCallAction) { /* call configured action */ - iRet = pAction->pMod->mod.om.doAction(pAction->ppMsgs, pMsg->msgFlags, pAction->pModData); + iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData); } } while(iRet == RS_RET_SUSPENDED && (pAction->iResumeRetryCount == -1 || iRetries < pAction->iResumeRetryCount)); /* do...while! */ @@ -378,16 +397,15 @@ RUNLOG_STR("out of retry loop"); actionSuspend(pAction); } - if(iRet == RS_RET_OK) - pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ + pthread_cleanup_pop(1); /* unlock mutex */ finalize_it: /* cleanup */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { - if(pAction->ppMsgs[i] != NULL) { - free(pAction->ppMsgs[i]); - pAction->ppMsgs[i] = NULL; + if(ppMsgs[i] != NULL) { + free(ppMsgs[i]); } + free(ppMsgs); } msgDestruct(&pMsg); /* we are now finished with the message */ @@ -489,6 +507,9 @@ actionWriteToAction(action_t *pAction) */ iRet = queueEnqObj(pAction->pQueue, (void*) MsgAddRef(pAction->f_pMsg)); + if(iRet == RS_RET_OK) + pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ + finalize_it: if(pMsgSave != NULL) { /* we had saved the original message pointer. That was -- cgit