summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog3
-rw-r--r--action.c43
-rw-r--r--action.h3
-rw-r--r--plugins/ommysql/ommysql.c23
-rw-r--r--syslogd.c6
5 files changed, 50 insertions, 28 deletions
diff --git a/ChangeLog b/ChangeLog
index f5569f31..a31980a2 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -25,6 +25,9 @@ Version 3.11.0 (rgerhards), 2008-01-??
guaranteed delivery of messages (if the output supports it, of course)
- fixed bug in output module interface, see
http://sourceforge.net/tracker/index.php?func=detail&aid=1881008&group_id=123448&atid=696552
+- changed the ommysql output plugin so that the (lengthy) connection
+ initialization now takes place in message processing. This works much
+ better with the new queued action mode (fast startup)
---------------------------------------------------------------------------
Version 3.10.3 (rgerhards), 2008-01-28
- fixed a bug with standard template definitions (not a big deal) - thanks
diff --git a/action.c b/action.c
index b6b37381..77a22259 100644
--- a/action.c
+++ b/action.c
@@ -128,10 +128,9 @@ rsRetVal actionDestruct(action_t *pThis)
msgDestruct(&pThis->f_pMsg);
SYNC_OBJ_TOOL_EXIT(pThis);
+ pthread_mutex_destroy(&pThis->mutActExec);
if(pThis->ppTpl != NULL)
free(pThis->ppTpl);
- if(pThis->ppMsgs != NULL)
- free(pThis->ppMsgs);
free(pThis);
return RS_RET_OK;
@@ -154,6 +153,7 @@ rsRetVal actionConstruct(action_t **ppThis)
pThis->iResumeInterval = glbliActionResumeInterval;
pThis->iResumeRetryCount = glbliActionResumeRetryCount;
+ pthread_mutex_init(&pThis->mutActExec, NULL);
SYNC_OBJ_TOOL_INIT(pThis);
/* indicate we have a new action */
@@ -335,15 +335,31 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg)
int iRetries;
int i;
int iSleepPeriod;
+ int bCallAction;
+ int iCancelStateSave;
+ uchar **ppMsgs; /* array of message pointers for doAction */
ASSERT(pAction != NULL);
+ /* create the array for doAction() message pointers */
+ if((ppMsgs = calloc(pAction->iNumTpls, sizeof(uchar *))) == NULL) {
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+
/* here we must loop to process all requested strings */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &pAction->ppMsgs[i]));
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i])));
}
-
iRetries = 0;
+ /* We now must guard the output module against execution by multiple threads. The
+ * plugin interface specifies that output modules must not be thread-safe (except
+ * if they notify us they are - functionality not yet implemented...).
+ * rgerhards, 2008-01-30
+ */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(&pAction->mutActExec);
+ pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
+ pthread_setcancelstate(iCancelStateSave, NULL);
do {
/* on first invocation, this if should never be true. We just put it at the top
* of the loop so that processing (and code) is simplified. This code is actually
@@ -358,11 +374,14 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg)
/* first check if we are suspended and, if so, retry */
if(actionIsSuspended(pAction)) {
iRet = actionTryResume(pAction);
+ bCallAction = 0;
+ } else {
+ bCallAction = 1;
}
- if(iRet == RS_RET_OK) {
+ if(bCallAction) {
/* call configured action */
- iRet = pAction->pMod->mod.om.doAction(pAction->ppMsgs, pMsg->msgFlags, pAction->pModData);
+ iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData);
}
} while(iRet == RS_RET_SUSPENDED && (pAction->iResumeRetryCount == -1 || iRetries < pAction->iResumeRetryCount)); /* do...while! */
@@ -378,16 +397,15 @@ RUNLOG_STR("out of retry loop");
actionSuspend(pAction);
}
- if(iRet == RS_RET_OK)
- pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
+ pthread_cleanup_pop(1); /* unlock mutex */
finalize_it:
/* cleanup */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
- if(pAction->ppMsgs[i] != NULL) {
- free(pAction->ppMsgs[i]);
- pAction->ppMsgs[i] = NULL;
+ if(ppMsgs[i] != NULL) {
+ free(ppMsgs[i]);
}
+ free(ppMsgs);
}
msgDestruct(&pMsg); /* we are now finished with the message */
@@ -489,6 +507,9 @@ actionWriteToAction(action_t *pAction)
*/
iRet = queueEnqObj(pAction->pQueue, (void*) MsgAddRef(pAction->f_pMsg));
+ if(iRet == RS_RET_OK)
+ pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
+
finalize_it:
if(pMsgSave != NULL) {
/* we had saved the original message pointer. That was
diff --git a/action.h b/action.h
index 5bcdc461..09a3616b 100644
--- a/action.h
+++ b/action.h
@@ -55,14 +55,13 @@ struct action_s {
int iNumTpls; /* number of array entries for template element below */
struct template **ppTpl;/* array of template to use - strings must be passed to doAction
* in this order. */
-
- uchar **ppMsgs; /* array of message pointers for doAction */
struct msg* f_pMsg; /* pointer to the message (this will replace the other vars with msg
* content later). This is preserved after the message has been
* processed - it is also used to detect duplicates.
*/
queue_t *pQueue; /* action queue */
SYNC_OBJ_TOOL; /* required for mutex support */
+ pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */
};
typedef struct action_s action_t;
diff --git a/plugins/ommysql/ommysql.c b/plugins/ommysql/ommysql.c
index 107419a5..2c79e363 100644
--- a/plugins/ommysql/ommysql.c
+++ b/plugins/ommysql/ommysql.c
@@ -78,7 +78,7 @@ ENDisCompatibleWithFeature
*/
static void closeMySQL(instanceData *pData)
{
- assert(pData != NULL);
+ ASSERT(pData != NULL);
if(pData->f_hmysql != NULL) { /* just to be on the safe side... */
mysql_server_end();
@@ -113,7 +113,7 @@ static void reportDBError(instanceData *pData, int bSilent)
char errMsg[512];
unsigned uMySQLErrno;
- assert(pData != NULL);
+ ASSERT(pData != NULL);
/* output log message */
errno = 0;
@@ -143,8 +143,8 @@ static rsRetVal initMySQL(instanceData *pData, int bSilent)
{
DEFiRet;
- assert(pData != NULL);
- assert(pData->f_hmysql == NULL);
+ ASSERT(pData != NULL);
+ ASSERT(pData->f_hmysql == NULL);
pData->f_hmysql = mysql_init(NULL);
if(pData->f_hmysql == NULL) {
@@ -160,7 +160,7 @@ static rsRetVal initMySQL(instanceData *pData, int bSilent)
}
}
- return iRet;
+ RETiRet;
}
@@ -172,8 +172,13 @@ rsRetVal writeMySQL(uchar *psz, instanceData *pData)
{
DEFiRet;
- assert(psz != NULL);
- assert(pData != NULL);
+ ASSERT(psz != NULL);
+ ASSERT(pData != NULL);
+
+ /* see if we are ready to proceed */
+ if(pData->f_hmysql == NULL) {
+ CHKiRet(initMySQL(pData, 0));
+ }
/* try insert */
if(mysql_query(pData->f_hmysql, (char*)psz)) {
@@ -193,7 +198,7 @@ finalize_it:
pData->uLastMySQLErrno = 0; /* reset error for error supression */
}
- return iRet;
+ RETiRet;
}
@@ -273,7 +278,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
logerror("Trouble with MySQL connection properties. -MySQL logging disabled");
ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
} else {
- CHKiRet(initMySQL(pData, 0));
+ pData->f_hmysql = NULL; /* initialize, but connect only on first message (important for queued mode!) */
}
CODE_STD_FINALIZERparseSelectorAct
diff --git a/syslogd.c b/syslogd.c
index ed59f0d5..d47197d4 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -3758,12 +3758,6 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr
if(pAction->iNumTpls > 0) {
/* we first need to create the template pointer array */
if((pAction->ppTpl = calloc(pAction->iNumTpls, sizeof(struct template *))) == NULL) {
- glblHadMemShortage = 1;
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
- /* and now the array for doAction() message pointers */
- if((pAction->ppMsgs = calloc(pAction->iNumTpls, sizeof(uchar *))) == NULL) {
- glblHadMemShortage = 1;
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
}