diff options
-rw-r--r-- | ChangeLog | 4 | ||||
-rw-r--r-- | action.c | 11 | ||||
-rw-r--r-- | doc/dev_oplugins.html | 179 | ||||
-rw-r--r-- | plugins/ompgsql/ompgsql.c | 22 | ||||
-rw-r--r-- | runtime/module-template.h | 72 | ||||
-rw-r--r-- | runtime/modules.c | 29 | ||||
-rw-r--r-- | runtime/modules.h | 2 | ||||
-rw-r--r-- | runtime/rsyslog.h | 11 |
8 files changed, 313 insertions, 17 deletions
@@ -1,5 +1,9 @@ - added $MainMsgQueueDequeueBatchSize and $ActionQueueDequeueBatchSize configuration directives +- implemented a new transactional output module interface which provides + superior performance (for databases potentially far superior performance) +- increased ompgsql performance by adapting to new transactional + output module interface --------------------------------------------------------------------------- Version 4.3.1 [DEVEL] (rgerhards), 2009-04-?? - improved doc @@ -466,8 +466,7 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) d_pthread_mutex_lock(&pAction->mutActExec); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); pthread_setcancelstate(iCancelStateSave, NULL); - do { - /* on first invocation, this if should never be true. We just put it at the top + do { /* on first invocation, this if should never be true. We just put it at the top * of the loop so that processing (and code) is simplified. This code is actually * triggered on the 2nd+ invocation. -- rgerhards, 2008-01-30 */ @@ -490,6 +489,10 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) if(bCallAction) { /* call configured action */ + /* MULTIQUEUE: TODO: and this now gets us in trouble. If it was suspended, we can + * assume (and must so) that the action did not succeed. So we now need to redo all + * those messages from the batch that are not yet processed. + */ iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData); if(iRet == RS_RET_SUSPENDED) { dbgprintf("Action requested to be suspended, done that.\n"); @@ -546,11 +549,15 @@ actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp) assert(paUsrp != NULL); + if(pAction->pMod->mod.om.beginTransaction != NULL) + CHKiRet(pAction->pMod->mod.om.beginTransaction(pAction->pModData)); for(i = 0 ; i < paUsrp->nElem ; i++) { pMsg = (msg_t*) paUsrp->pUsrp[i]; dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); CHKiRet(actionCallDoAction(pAction, pMsg)); } + if(pAction->pMod->mod.om.endTransaction != NULL) + CHKiRet(pAction->pMod->mod.om.endTransaction(pAction->pModData)); finalize_it: RETiRet; } diff --git a/doc/dev_oplugins.html b/doc/dev_oplugins.html index 5bfc974c..2e195028 100644 --- a/doc/dev_oplugins.html +++ b/doc/dev_oplugins.html @@ -144,19 +144,172 @@ array-passing capability not blindly be used.</b> In such cases, we can not guar plugin from segfaulting and if the plugin (as currently always) is run within rsyslog's process space, that results in a segfault for rsyslog. So do not do this. <h3>Batching of Messages</h3> -<p>With the current plugin interface, each message is passed via a separate call to the plugin. -This is annoying and costs performance in some uses cases (primarily for database outputs). -However, that's the way it (currently) is, no easy way around it. There are some ideas -to implement batching capabilities inside the rsyslog core, but without that the only -resort is to do it inside your plugin yourself. You are not prohibited from doing so. -There are some consequences, though: most importantly, the rsyslog core is no longer -intersted in messages that it passed to a plugin. As such, it will not try to make sure -the message is not lost before it was ultimately processed (because rsyslog, due to -doAction() returning successfully, thinks the message *was* ultimately processed). -<p>When the rsyslog core receives batching capabilities, this will be implemented in -a way that is fully compatible to the existing plugin interface. While we have not yet -thought about the implementation, that will probably mean that some new interfaces -or options be used to turn on batching capabilities. +<p>Starting with rsyslog 4.3.x, batching of output messages is supported. Previously, only +a single-message interface was supported. +<p>With the <b>single message</b> plugin interface, each message is passed via a separate call to the plugin. +Most importantly, the rsyslog engine assumes that each call to the plugin is a complete transaction +and as such assumes that messages be properly commited after the plugin returns to the engine. +<p>With the <b>batching</b> interface, rsyslog employs something along the line of +"transactions". Obviously, the rsyslog core can not make non-transactional outputs +to be fully transactional. But what it can is support that the output tells the core which +messages have been commited by the output and which not yet. The core can than take care +of those uncommited messages when problems occur. For example, if a plugin has received +50 messages but not yet told the core that it commited them, and then returns an error state, the +core assumes that all these 50 messages were <b>not</b> written to the output. The core then +requeues all 50 messages and does the usual retry processing. Once the output plugin tells the +core that it is ready again to accept messages, the rsyslog core will provide it with these 50 +not yet commited messages again (actually, at this point, the rsyslog core no longer knows that +it is re-submiting the messages). If, in contrary, the plugin had told rsyslog that 40 of these 50 +messages were commited (before it failed), then only 10 would have been requeued and resubmitted. +<p>In order to provide an efficient implementation, there are some (mild) constraints in that +transactional model: first of all, rsyslog itself specifies the ultimate transaction boundaries. +That is, it tells the plugin when a transaction begins and when it must finish. The plugin +is free to commit messages in between, but it <b>must</b> commit all work done when the core +tells it that the transaction ends. All messages passed in between a begin and end transaction +notification are called a batch of messages. They are passed in one by one, just as without +transaction support. Note that batch sizes are variable within the range of 1 to a user configured +maximum limit. Most importantly, that means that plugins may receive batches of single messages, +so they are required to commit each message individually. If the plugin tries to be "smarter" +than the rsyslog engine and does not commit messages in those cases (for example), the plugin +puts message stream integrity at risk: once rsyslog has notified the plugin of transacton end, +it discards all messages as it considers them committed and save. If now something goes wrong, +the rsyslog core does not try to recover lost messages (and keep in mind that "goes wrong" +includes such uncontrollable things like connection loss to a database server). So it is +highly recommended to fully abide to the plugin interface details, even though you may +think you can do it better. The second reason for that is that the core engine will +have configuration settings that enable the user to tune commit rate to their use-case +specific needs. And, as a relief: why would rsyslog ever decide to use batches of one? +There is a trivial case and that is when we have very low activity so that no queue of +messages builds up, in which case it makes sense to commit work as it arrives. +(As a side-note, there are some valid cases where a timeout-based commit feature makes sense. +This is also under evaluation and, once decided, the core will offer an interface plus a way +to preserve message stream integrity for properly-crafted plugins). +<p>The second restriction is that if a plugin makes commits in between (what is perfectly +legal) those commits must be in-order. So if a commit is made for message ten out of 50, +this means that messages one to nine are also commited. It would be possible to remove +this restriction, but we have decided to deliberately introduce it to simpify things. +<h3>Output Plugin Transaction Interface</h3> +<p>In order to keep compatible with existing output plugins (and because it introduces +no complexity), the transactional plugin interface is build on the traditional +non-transactional one. Well... actually the traditional interface was transactional +since its introduction, in the sense that each message was processed in its own +transaction. +<p>So the current <code>doAction()</b> entry point can be considered to have this +structure (from the transactional interface point of view): +<p><pre><code> +doAction() + { + beginTransaction() + ProcessMessage() + endTransaction() + } + </code></pre> +<p>For the <b>transactional interface</b>, we now move these implicit <code>beginTransaction()</code> +and <code>endTransaction(()</code> call out of the message processing body, resulting is such +a structure: +<p><pre><code> +beginTransaction() + { + /* prepare for transaction */ + } + +doAction() + { + ProcessMessage() + /* maybe do partial commits */ + } + +endTransaction() + { + /* commit (rest of) batch */ + } +</code></pre> +<p>And this calling structure actually is the transactional interface! It is as simple as this. +For the new interface, the core calls a <code>beginTransaction()</code> entry point inside the +plugin at the start of the batch. Similarly, the core call <code>endTransaction()</code> at the +end of the batch. The plugin must implement these entry points according to its needs. +<p>But how does the core know when to use the old or the new calling interface? This is rather +easy: when loading a plugin, the core queries the plugin for the <code>beginTransaction()</code> +and <code>endTransaction()</code> entry points. If the plugin supports these, the new interface is +used. If the plugin does not support them, the old interface is used and rsyslog implies that +a commit is done after each message. Note that there is no special "downlevel" handling +necessary to support this. In the case of the non-transactional interface, rsyslog considers +each completed call to <code>doAction</code> as partial commit up to the current message. +So implementation inside the core is very straightforward. +<p>Actually, <b>we recommend that the transactional entry points only be defined by those +plugins that actually need them</b>. All others should not define them in which case +the default commit behaviour inside rsyslog will apply (thus removing complexity from the +plugin). +<p>In order to support partial commits, special return codes must be defined for +<code>doAction</code>. All those return codes mean that processing completed successfully. +But they convey additional information about the commit status as follows: +<p> +<table border="0"> +<tr> +<td valign="top"><i>RS_RET_OK</i></td> +<td>The record and all previous inside the batch has been commited. +<i>Note:</i> this definition is what makes integrating plugins without the +transaction being/end calls so easy - this is the traditional "success" return +state and if every call returns it, there is no need for actually calling +<code>endTransaction()</code>, because there is no transaction open).</td> +</tr> +<tr> +<td valign="top"><i>RS_RET_DEFER_COMMIT</i></td> +<td>The record has been processed, but is not yet commited. This is the +expected state for transactional-aware plugins.</td> +</tr> +<tr> +<td valign="top"><i>RS_RET_PREVIOUS_COMMITTED</i></td> +<td>The <b>previous</b> record inside the batch has been committed, but the +current one not yet. This state is introduced to support sources that fill up +buffers and commit once a buffer is completely filled. That may occur halfway +in the next record, so it may be important to be able to tell the +engine the everything up to the previouos record is commited</td> +</tr> +</table> +<p>Note that the typical <b>calling cycle</b> is <code>beginTransaction()</code>, +followed by <i>n</i> times +<code>doAction()</code></n> followed by <code>endTransaction()</code>. However, if either +<code>beginTransaction()</code> or <code>doAction()</code> return back an error state +(including RS_RET_SUSPENDED), then the transaction is considered aborted. In result, the +remaining calls in this cycle (e.g. <code>endTransaction()</code>) are never made and a +new cycle (starting with <code>beginTransaction()</code> is begun when processing resumes. +So an output plugin must expect and handle those partial cycles gracefully. +<p><b>The question remains how can a plugin know if the core supports batching?</b> +First of all, even if the engine would not know it, the plugin would return with RS_RET_DEFER_COMMIT, +what then would be treated as an error by the engine. This would effectively disable the +output, but cause no further harm (but may be harm enough in itself). +<p>The real solution is to enable the plugin to query the rsyslog core if this feature is +supported or not. At the time of the introduction of batching, no such query-interface +exists. So we introduce it with that release. What the means is if a rsyslog core can +not provide this query interface, it is a core that was build before batching support +was available. So the absence of a query interface indicates that the transactional +interface is not available. One might now be tempted the think there is no need to do +the actual check, but is is recommended to ask the rsyslog engine explicitely if +the transactional interface is present and will be honored. This enables us to +create versions in the future which have, for whatever reason we do not yet know, no +support for this interface. +<p>The logic to do these checks is contained in the <code>INITChkCoreFeature</code> macro, +which can be used as follows: +<p><pre><code> +INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); +</code></pre> +<p>Here, bCoreSupportsBatching is a plugin-defined integer which after execution is +1 if batches (and thus the transational interface) is supported and 0 otherwise. +CORE_FEATURE_BATCHING is the feature we are interested in. Future versions of rsyslog +may contain additional feature-test-macros (you can see all of them in +./runtime/rsyslog.h). +<p>Note that the ompsql output plugin supports transactional mode in a hybrid way and +thus can be considered good example code. + +<h2>Open Issues</h2> +<ul> +<li>Processing errors handling +<li>reliable re-queue during error handling and queue termination +</ul> + + + <h3>Licensing</h3> <p>From the rsyslog point of view, plugins constitute separate projects. As such, we think plugins are not required to be compatible with GPLv3. However, this is diff --git a/plugins/ompgsql/ompgsql.c b/plugins/ompgsql/ompgsql.c index 6daac1c7..003cf6a8 100644 --- a/plugins/ompgsql/ompgsql.c +++ b/plugins/ompgsql/ompgsql.c @@ -170,6 +170,9 @@ tryExec(uchar *pszCmd, instanceData *pData) int bHadError = 0; /* try insert */ +BEGINfunc +RUNLOG_VAR("%p", pData->f_hpgsql); +RUNLOG_VAR("%s", pszCmd); pgRet = PQexec(pData->f_hpgsql, (char*)pszCmd); execState = PQresultStatus(pgRet); if(execState != PGRES_COMMAND_OK && execState != PGRES_TUPLES_OK) { @@ -178,6 +181,7 @@ tryExec(uchar *pszCmd, instanceData *pData) } PQclear(pgRet); +ENDfunc return(bHadError); } @@ -230,6 +234,14 @@ CODESTARTtryResume } ENDtryResume + +BEGINbeginTransaction +CODESTARTbeginTransaction +dbgprintf("ompgsql: beginTransaction\n"); + iRet = writePgSQL((uchar*) "begin", pData); /* TODO: make user-configurable */ +ENDbeginTransaction + + BEGINdoAction CODESTARTdoAction dbgprintf("\n"); @@ -237,6 +249,13 @@ CODESTARTdoAction ENDdoAction +BEGINendTransaction +CODESTARTendTransaction + iRet = writePgSQL((uchar*) "commit;", pData); /* TODO: make user-configurable */ +dbgprintf("ompgsql: endTransaction\n"); +ENDendTransaction + + BEGINparseSelectorAct int iPgSQLPropErr = 0; CODESTARTparseSelectorAct @@ -314,6 +333,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt @@ -322,6 +342,8 @@ CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); + INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); + DBGPRINTF("ompgsql: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not "); ENDmodInit /* vi:set ai: */ diff --git a/runtime/module-template.h b/runtime/module-template.h index 6f7d877c..3e963199 100644 --- a/runtime/module-template.h +++ b/runtime/module-template.h @@ -39,7 +39,8 @@ #define DEF_OMOD_STATIC_DATA \ DEF_MOD_STATIC_DATA \ - DEFobjCurrIf(obj) + DEFobjCurrIf(obj) \ + static __attribute__((unused)) int bCoreSupportsBatching; #define DEF_IMOD_STATIC_DATA \ DEF_MOD_STATIC_DATA \ DEFobjCurrIf(obj) @@ -160,6 +161,37 @@ static rsRetVal isCompatibleWithFeature(syslogFeature __attribute__((unused)) eF RETiRet;\ } + +/* beginTransaction() + * introduced in v4.3.3 -- rgerhards, 2009-04-27 + */ +#define BEGINbeginTransaction \ +static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\ +{\ + DEFiRet; + +#define CODESTARTbeginTransaction /* currently empty, but may be extended */ + +#define ENDbeginTransaction \ + RETiRet;\ +} + + +/* endTransaction() + * introduced in v4.3.3 -- rgerhards, 2009-04-27 + */ +#define BEGINendTransaction \ +static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\ +{\ + DEFiRet; + +#define CODESTARTendTransaction /* currently empty, but may be extended */ + +#define ENDendTransaction \ + RETiRet;\ +} + + /* doAction() */ #define BEGINdoAction \ @@ -324,6 +356,18 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\ *pEtryPoint = tryResume;\ } + +/* the following definition is queryEtryPt block that must be added + * if an output module supports the transactional interface. + * rgerhards, 2009-04-27 + */ +#define CODEqueryEtryPt_TXIF_OMOD_QUERIES \ + else if(!strcmp((char*) name, "beginTransaction")) {\ + *pEtryPoint = beginTransaction;\ + } else if(!strcmp((char*) name, "endTransaction")) {\ + *pEtryPoint = endTransaction;\ + } + /* the following definition is the standard block for queryEtryPt for INPUT * modules. This can be used if no specific handling (e.g. to cover version * differences) is needed. @@ -393,6 +437,32 @@ finalize_it:\ } +/* now come some check functions, which enable a standard way of obtaining feature + * information from the core. feat is the to-be-tested feature and featVar is a + * variable that receives the result (0-not support, 1-supported). + * This must be a macro, so that it is put into the output's code. Otherwise, we + * would need to rely on a library entry point, which is what we intend to avoid ;) + * rgerhards, 2009-04-27 + */ +#define INITChkCoreFeature(featVar, feat) \ +{ \ + rsRetVal MACRO_Ret; \ + rsRetVal (*pQueryCoreFeatureSupport)(int*, unsigned); \ + int bSupportsIt; \ + featVar = 0; \ + MACRO_Ret = pHostQueryEtryPt((uchar*)"queryCoreFeatureSupport", &pQueryCoreFeatureSupport); \ + if(MACRO_Ret == RS_RET_OK) { \ + /* found entry point, so let's see if core supports it */ \ + CHKiRet((*pQueryCoreFeatureSupport)(&bSupportsIt, feat)); \ + if(bSupportsIt) \ + featVar = 1; \ + } else if(MACRO_Ret != RS_RET_ENTRY_POINT_NOT_FOUND) { \ + ABORT_FINALIZE(MACRO_Ret); /* Something else went wrong, what is not acceptable */ \ + } \ +} + + + /* definitions for host API queries */ #define CODEmodInit_QueryRegCFSLineHdlr \ CHKiRet(pHostQueryEtryPt((uchar*)"regCfSysLineHdlr", &omsdRegCFSLineHdlr)); diff --git a/runtime/modules.c b/runtime/modules.c index 9fdb48e7..024c1c9a 100644 --- a/runtime/modules.c +++ b/runtime/modules.c @@ -207,19 +207,38 @@ static void moduleDestruct(modInfo_t *pThis) } +/* This enables a module to query the core for specific features. + * rgerhards, 2009-04-22 + */ +static rsRetVal queryCoreFeatureSupport(int *pBool, unsigned uFeat) +{ + DEFiRet; + + if((pBool == NULL)) + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + + *pBool = (uFeat & CORE_FEATURE_BATCHING) ? 1 : 0; + +finalize_it: + RETiRet; +} + + /* The following function is the queryEntryPoint for host-based entry points. * Modules may call it to get access to core interface functions. Please note * that utility functions can be accessed via shared libraries - at least this * is my current shool of thinking. * Please note that the implementation as a query interface allows to take * care of plug-in interface version differences. -- rgerhards, 2007-07-31 + * ... but often it better not to use a new interface. So we now add core + * functions here that a plugin may request. -- rgerhards, 2009-04-22 */ static rsRetVal queryHostEtryPt(uchar *name, rsRetVal (**pEtryPoint)()) { DEFiRet; if((name == NULL) || (pEtryPoint == NULL)) - return RS_RET_PARAM_ERROR; + ABORT_FINALIZE(RS_RET_PARAM_ERROR); if(!strcmp((char*) name, "regCfSysLineHdlr")) { *pEtryPoint = regCfSysLineHdlr; @@ -227,6 +246,8 @@ static rsRetVal queryHostEtryPt(uchar *name, rsRetVal (**pEtryPoint)()) *pEtryPoint = objGetObjInterface; } else if(!strcmp((char*) name, "OMSRgetSupportedTplOpts")) { *pEtryPoint = OMSRgetSupportedTplOpts; + } else if(!strcmp((char*) name, "queryCoreFeatureSupport")) { + *pEtryPoint = queryCoreFeatureSupport; } else { *pEtryPoint = NULL; /* to be on the safe side */ ABORT_FINALIZE(RS_RET_ENTRY_POINT_NOT_FOUND); @@ -402,6 +423,12 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_ localRet = (*pNew->modQueryEtryPt)((uchar*)"doHUP", &pNew->doHUP); if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) ABORT_FINALIZE(localRet); + localRet = (*pNew->modQueryEtryPt)((uchar*)"beginTransaction", &pNew->mod.om.beginTransaction); + if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) + ABORT_FINALIZE(localRet); + localRet = (*pNew->modQueryEtryPt)((uchar*)"endTransaction", &pNew->mod.om.endTransaction); + if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) + ABORT_FINALIZE(localRet); break; case eMOD_LIB: break; diff --git a/runtime/modules.h b/runtime/modules.h index 372529ee..e33bbbe1 100644 --- a/runtime/modules.h +++ b/runtime/modules.h @@ -110,7 +110,9 @@ typedef struct modInfo_s { struct {/* data for output modules */ /* below: perform the configured action */ + rsRetVal (*beginTransaction)(void*); rsRetVal (*doAction)(uchar**, unsigned, void*); + rsRetVal (*endTransaction)(void*); rsRetVal (*parseSelectorAct)(uchar**, void**,omodStringRequest_t**); } om; struct { /* data for library modules */ diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 74ea5270..3b2dff62 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -58,6 +58,15 @@ #endif +/* the rsyslog core provides information about present feature to plugins + * asking it. Below are feature-test macros which must be used to query + * features. Note that this must be powers of two, so that multiple queries + * can be combined. -- rgerhards, 2009-04-27 + */ +#define CORE_FEATURE_BATCHING 1 +/*#define CORE_FEATURE_whatever 2 ... and so on ... */ + + /* define some base data types */ typedef unsigned char uchar;/* get rid of the unhandy "unsigned char" */ typedef struct aUsrp_s aUsrp_t; @@ -268,6 +277,8 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_ERR_FORK = -2118, /**< error during fork() */ RS_RET_ERR_WRITE_PIPE = -2119, /**< error writing to pipe */ RS_RET_RSCORE_TOO_OLD = -2120, /**< rsyslog core is too old for ... (eg this plugin) */ + RS_RET_DEFER_COMMIT = -2121, /**< output plugin status: not yet committed (an OK state!) */ + RS_RET_PREVIOUS_COMMITTED = -2122, /**< output plugin status: previous record was committed (an OK state!) */ /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ |