diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2011-06-06 09:17:27 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2011-06-06 09:17:27 +0200 |
commit | ee403c2ad745039add6cd34ec1e9a55aa9d51160 (patch) | |
tree | 0d5a6f5463d91a7fa160532e2f3e094590ff4506 /plugins | |
parent | 06633d3fac69a3d380b36ce728a6f2278561b4bb (diff) | |
parent | ffdc33e3f178ad85ba5c2c9f7fcee98b743e9d5e (diff) | |
download | rsyslog-ee403c2ad745039add6cd34ec1e9a55aa9d51160.tar.gz rsyslog-ee403c2ad745039add6cd34ec1e9a55aa9d51160.tar.xz rsyslog-ee403c2ad745039add6cd34ec1e9a55aa9d51160.zip |
Merge branch 'v5-stable-imfile-batches' into v5-devel
Conflicts:
ChangeLog
configure.ac
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imfile/imfile.c | 23 | ||||
-rw-r--r-- | plugins/mmsnmptrapd/mmsnmptrapd.c | 3 | ||||
-rw-r--r-- | plugins/omlibdbi/omlibdbi.c | 1 |
3 files changed, 23 insertions, 4 deletions
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index cac3a55d..9bc84220 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -64,6 +64,7 @@ DEFobjCurrIf(strm) DEFobjCurrIf(prop) DEFobjCurrIf(ruleset) +#define NUM_MULTISUB 1024 /* max number of submits -- TODO: make configurable */ typedef struct fileInfo_s { uchar *pszFileName; uchar *pszTag; @@ -71,11 +72,13 @@ typedef struct fileInfo_s { uchar *pszStateFile; /* file in which state between runs is to be stored */ int iFacility; int iSeverity; + int maxLinesAtOnce; int nRecords; /**< How many records did we process before persisting the stream? */ int iPersistStateInterval; /**< how often should state be persisted? (0=on close only) */ strm_t *pStrm; /* its stream (NULL if not assigned) */ int readMode; /* which mode to use in ReadMulteLine call? */ ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + multi_submit_t multiSub; } fileInfo_t; @@ -91,6 +94,7 @@ static int iPersistStateInterval = 0; /* how often if state file to be persisted static int iFacility = 128; /* local0 */ static int iSeverity = 5; /* notice, as of rfc 3164 */ static int readMode = 0; /* mode to use for ReadMultiLine call */ +static int maxLinesAtOnce = 10240; /* how many lines to process in a row? */ static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ static int iFilPtr = 0; /* number of files to be monitored; pointer to next free spot during config */ @@ -122,7 +126,9 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine) pMsg->iFacility = LOG_FAC(pInfo->iFacility); pMsg->iSeverity = LOG_PRI(pInfo->iSeverity); MsgSetRuleset(pMsg, pInfo->pRuleset); - CHKiRet(submitMsg(pMsg)); + pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg; + if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem) + CHKiRet(multiSubmitMsg(&pInfo->multiSub)); finalize_it: RETiRet; } @@ -206,6 +212,7 @@ static void pollFileCancelCleanup(void *pArg) static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) { cstr_t *pCStr = NULL; + int nProcessed = 0; DEFiRet; ASSERT(pbHadFileData != NULL); @@ -220,7 +227,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) /* loop below will be exited when strmReadLine() returns EOF */ while(glbl.GetGlobalInputTermState() == 0) { + if(pThis->maxLinesAtOnce != 0 && nProcessed >= pThis->maxLinesAtOnce) + break; CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr, pThis->readMode)); + ++nProcessed; *pbHadFileData = 1; /* this is just a flag, so set it and forget it */ CHKiRet(enqLine(pThis, pCStr)); /* process line */ rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */ @@ -231,6 +241,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) } finalize_it: + if(pThis->multiSub.nElem > 0) { + /* submit everything that was not yet submitted */ + CHKiRet(multiSubmitMsg(&pThis->multiSub)); + } ; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */ /* Note: the problem above is that pthread:cleanup_pop() is a macro which * evaluates to something like "} while(0);". So the code would become @@ -475,6 +489,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a iSeverity = 5; /* notice, as of rfc 3164 */ readMode = 0; pBindRuleset = NULL; + maxLinesAtOnce = 10240; RETiRet; } @@ -513,8 +528,12 @@ static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal) pThis->pszStateFile = (uchar*) strdup((char*) pszStateFile); } + CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(NUM_MULTISUB * sizeof(msg_t*))); + pThis->multiSub.maxElem = NUM_MULTISUB; + pThis->multiSub.nElem = 0; pThis->iSeverity = iSeverity; pThis->iFacility = iFacility; + pThis->maxLinesAtOnce = maxLinesAtOnce; pThis->iPersistStateInterval = iPersistStateInterval; pThis->nRecords = 0; pThis->readMode = readMode; @@ -592,6 +611,8 @@ CODEmodInit_QueryRegCFSLineHdlr NULL, &iPollInterval, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilereadmode", 0, eCmdHdlrInt, NULL, &readMode, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilemaxlinesatonce", 0, eCmdHdlrSize, + NULL, &maxLinesAtOnce, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepersiststateinterval", 0, eCmdHdlrInt, NULL, &iPersistStateInterval, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilebindruleset", 0, eCmdHdlrGetWord, diff --git a/plugins/mmsnmptrapd/mmsnmptrapd.c b/plugins/mmsnmptrapd/mmsnmptrapd.c index 9311d5c6..b78046ee 100644 --- a/plugins/mmsnmptrapd/mmsnmptrapd.c +++ b/plugins/mmsnmptrapd/mmsnmptrapd.c @@ -37,9 +37,6 @@ #include <errno.h> #include <unistd.h> #include <ctype.h> -#include <liblognorm.h> -#include <libestr.h> -#include <libee/libee.h> #include "conf.h" #include "msg.h" #include "syslogd-types.h" diff --git a/plugins/omlibdbi/omlibdbi.c b/plugins/omlibdbi/omlibdbi.c index 7fcf9631..e6f3fbd9 100644 --- a/plugins/omlibdbi/omlibdbi.c +++ b/plugins/omlibdbi/omlibdbi.c @@ -43,6 +43,7 @@ #include "dirty.h" #include "syslogd-types.h" #include "cfsysline.h" +#include "conf.h" #include "srUtils.h" #include "template.h" #include "module-template.h" |