diff options
-rw-r--r-- | ChangeLog | 3 | ||||
-rw-r--r-- | action.c | 43 | ||||
-rw-r--r-- | action.h | 3 | ||||
-rw-r--r-- | plugins/ommysql/ommysql.c | 23 | ||||
-rw-r--r-- | syslogd.c | 6 |
5 files changed, 50 insertions, 28 deletions
@@ -25,6 +25,9 @@ Version 3.11.0 (rgerhards), 2008-01-?? guaranteed delivery of messages (if the output supports it, of course) - fixed bug in output module interface, see http://sourceforge.net/tracker/index.php?func=detail&aid=1881008&group_id=123448&atid=696552 +- changed the ommysql output plugin so that the (lengthy) connection + initialization now takes place in message processing. This works much + better with the new queued action mode (fast startup) --------------------------------------------------------------------------- Version 3.10.3 (rgerhards), 2008-01-28 - fixed a bug with standard template definitions (not a big deal) - thanks @@ -128,10 +128,9 @@ rsRetVal actionDestruct(action_t *pThis) msgDestruct(&pThis->f_pMsg); SYNC_OBJ_TOOL_EXIT(pThis); + pthread_mutex_destroy(&pThis->mutActExec); if(pThis->ppTpl != NULL) free(pThis->ppTpl); - if(pThis->ppMsgs != NULL) - free(pThis->ppMsgs); free(pThis); return RS_RET_OK; @@ -154,6 +153,7 @@ rsRetVal actionConstruct(action_t **ppThis) pThis->iResumeInterval = glbliActionResumeInterval; pThis->iResumeRetryCount = glbliActionResumeRetryCount; + pthread_mutex_init(&pThis->mutActExec, NULL); SYNC_OBJ_TOOL_INIT(pThis); /* indicate we have a new action */ @@ -335,15 +335,31 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) int iRetries; int i; int iSleepPeriod; + int bCallAction; + int iCancelStateSave; + uchar **ppMsgs; /* array of message pointers for doAction */ ASSERT(pAction != NULL); + /* create the array for doAction() message pointers */ + if((ppMsgs = calloc(pAction->iNumTpls, sizeof(uchar *))) == NULL) { + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &pAction->ppMsgs[i])); + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]))); } - iRetries = 0; + /* We now must guard the output module against execution by multiple threads. The + * plugin interface specifies that output modules must not be thread-safe (except + * if they notify us they are - functionality not yet implemented...). + * rgerhards, 2008-01-30 + */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(&pAction->mutActExec); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); + pthread_setcancelstate(iCancelStateSave, NULL); do { /* on first invocation, this if should never be true. We just put it at the top * of the loop so that processing (and code) is simplified. This code is actually @@ -358,11 +374,14 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) /* first check if we are suspended and, if so, retry */ if(actionIsSuspended(pAction)) { iRet = actionTryResume(pAction); + bCallAction = 0; + } else { + bCallAction = 1; } - if(iRet == RS_RET_OK) { + if(bCallAction) { /* call configured action */ - iRet = pAction->pMod->mod.om.doAction(pAction->ppMsgs, pMsg->msgFlags, pAction->pModData); + iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData); } } while(iRet == RS_RET_SUSPENDED && (pAction->iResumeRetryCount == -1 || iRetries < pAction->iResumeRetryCount)); /* do...while! */ @@ -378,16 +397,15 @@ RUNLOG_STR("out of retry loop"); actionSuspend(pAction); } - if(iRet == RS_RET_OK) - pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ + pthread_cleanup_pop(1); /* unlock mutex */ finalize_it: /* cleanup */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { - if(pAction->ppMsgs[i] != NULL) { - free(pAction->ppMsgs[i]); - pAction->ppMsgs[i] = NULL; + if(ppMsgs[i] != NULL) { + free(ppMsgs[i]); } + free(ppMsgs); } msgDestruct(&pMsg); /* we are now finished with the message */ @@ -489,6 +507,9 @@ actionWriteToAction(action_t *pAction) */ iRet = queueEnqObj(pAction->pQueue, (void*) MsgAddRef(pAction->f_pMsg)); + if(iRet == RS_RET_OK) + pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ + finalize_it: if(pMsgSave != NULL) { /* we had saved the original message pointer. That was @@ -55,14 +55,13 @@ struct action_s { int iNumTpls; /* number of array entries for template element below */ struct template **ppTpl;/* array of template to use - strings must be passed to doAction * in this order. */ - - uchar **ppMsgs; /* array of message pointers for doAction */ struct msg* f_pMsg; /* pointer to the message (this will replace the other vars with msg * content later). This is preserved after the message has been * processed - it is also used to detect duplicates. */ queue_t *pQueue; /* action queue */ SYNC_OBJ_TOOL; /* required for mutex support */ + pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */ }; typedef struct action_s action_t; diff --git a/plugins/ommysql/ommysql.c b/plugins/ommysql/ommysql.c index 107419a5..2c79e363 100644 --- a/plugins/ommysql/ommysql.c +++ b/plugins/ommysql/ommysql.c @@ -78,7 +78,7 @@ ENDisCompatibleWithFeature */ static void closeMySQL(instanceData *pData) { - assert(pData != NULL); + ASSERT(pData != NULL); if(pData->f_hmysql != NULL) { /* just to be on the safe side... */ mysql_server_end(); @@ -113,7 +113,7 @@ static void reportDBError(instanceData *pData, int bSilent) char errMsg[512]; unsigned uMySQLErrno; - assert(pData != NULL); + ASSERT(pData != NULL); /* output log message */ errno = 0; @@ -143,8 +143,8 @@ static rsRetVal initMySQL(instanceData *pData, int bSilent) { DEFiRet; - assert(pData != NULL); - assert(pData->f_hmysql == NULL); + ASSERT(pData != NULL); + ASSERT(pData->f_hmysql == NULL); pData->f_hmysql = mysql_init(NULL); if(pData->f_hmysql == NULL) { @@ -160,7 +160,7 @@ static rsRetVal initMySQL(instanceData *pData, int bSilent) } } - return iRet; + RETiRet; } @@ -172,8 +172,13 @@ rsRetVal writeMySQL(uchar *psz, instanceData *pData) { DEFiRet; - assert(psz != NULL); - assert(pData != NULL); + ASSERT(psz != NULL); + ASSERT(pData != NULL); + + /* see if we are ready to proceed */ + if(pData->f_hmysql == NULL) { + CHKiRet(initMySQL(pData, 0)); + } /* try insert */ if(mysql_query(pData->f_hmysql, (char*)psz)) { @@ -193,7 +198,7 @@ finalize_it: pData->uLastMySQLErrno = 0; /* reset error for error supression */ } - return iRet; + RETiRet; } @@ -273,7 +278,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) logerror("Trouble with MySQL connection properties. -MySQL logging disabled"); ABORT_FINALIZE(RS_RET_INVALID_PARAMS); } else { - CHKiRet(initMySQL(pData, 0)); + pData->f_hmysql = NULL; /* initialize, but connect only on first message (important for queued mode!) */ } CODE_STD_FINALIZERparseSelectorAct @@ -3758,12 +3758,6 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr if(pAction->iNumTpls > 0) { /* we first need to create the template pointer array */ if((pAction->ppTpl = calloc(pAction->iNumTpls, sizeof(struct template *))) == NULL) { - glblHadMemShortage = 1; - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } - /* and now the array for doAction() message pointers */ - if((pAction->ppMsgs = calloc(pAction->iNumTpls, sizeof(uchar *))) == NULL) { - glblHadMemShortage = 1; ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } } |