From 59056371156616274d9970300b3ab02432201422 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 24 May 2011 09:34:14 +0200 Subject: enhanced imfile to support input batching --- plugins/imfile/imfile.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) (limited to 'plugins/imfile') diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index cac3a55d..415eb137 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -64,6 +64,7 @@ DEFobjCurrIf(strm) DEFobjCurrIf(prop) DEFobjCurrIf(ruleset) +#define NUM_MULTISUB 1024 /* max number of submits -- TODO: make configurable */ typedef struct fileInfo_s { uchar *pszFileName; uchar *pszTag; @@ -76,6 +77,7 @@ typedef struct fileInfo_s { strm_t *pStrm; /* its stream (NULL if not assigned) */ int readMode; /* which mode to use in ReadMulteLine call? */ ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + multi_submit_t multiSub; } fileInfo_t; @@ -122,7 +124,9 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine) pMsg->iFacility = LOG_FAC(pInfo->iFacility); pMsg->iSeverity = LOG_PRI(pInfo->iSeverity); MsgSetRuleset(pMsg, pInfo->pRuleset); - CHKiRet(submitMsg(pMsg)); + pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg; + if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem) + CHKiRet(multiSubmitMsg(&pInfo->multiSub)); finalize_it: RETiRet; } @@ -229,8 +233,13 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) pThis->nRecords = 0; } } + /* NOTE: This is usually not reached due to loop exit via CHKiRet() only! */ finalize_it: + if(pThis->multiSub.nElem > 0) { + /* submit everything that was not yet submitted */ + CHKiRet(multiSubmitMsg(&pThis->multiSub)); + } ; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */ /* Note: the problem above is that pthread:cleanup_pop() is a macro which * evaluates to something like "} while(0);". So the code would become @@ -513,6 +522,9 @@ static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal) pThis->pszStateFile = (uchar*) strdup((char*) pszStateFile); } + CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(NUM_MULTISUB * sizeof(msg_t*))); + pThis->multiSub.maxElem = NUM_MULTISUB; + pThis->multiSub.nElem = 0; pThis->iSeverity = iSeverity; pThis->iFacility = iFacility; pThis->iPersistStateInterval = iPersistStateInterval; -- cgit From ffdc33e3f178ad85ba5c2c9f7fcee98b743e9d5e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 24 May 2011 10:09:43 +0200 Subject: imfile: added $InputFileMaxLinesAtOnce directive --- plugins/imfile/imfile.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'plugins/imfile') diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index 415eb137..9bc84220 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -72,6 +72,7 @@ typedef struct fileInfo_s { uchar *pszStateFile; /* file in which state between runs is to be stored */ int iFacility; int iSeverity; + int maxLinesAtOnce; int nRecords; /**< How many records did we process before persisting the stream? */ int iPersistStateInterval; /**< how often should state be persisted? (0=on close only) */ strm_t *pStrm; /* its stream (NULL if not assigned) */ @@ -93,6 +94,7 @@ static int iPersistStateInterval = 0; /* how often if state file to be persisted static int iFacility = 128; /* local0 */ static int iSeverity = 5; /* notice, as of rfc 3164 */ static int readMode = 0; /* mode to use for ReadMultiLine call */ +static int maxLinesAtOnce = 10240; /* how many lines to process in a row? */ static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ static int iFilPtr = 0; /* number of files to be monitored; pointer to next free spot during config */ @@ -210,6 +212,7 @@ static void pollFileCancelCleanup(void *pArg) static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) { cstr_t *pCStr = NULL; + int nProcessed = 0; DEFiRet; ASSERT(pbHadFileData != NULL); @@ -224,7 +227,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) /* loop below will be exited when strmReadLine() returns EOF */ while(glbl.GetGlobalInputTermState() == 0) { + if(pThis->maxLinesAtOnce != 0 && nProcessed >= pThis->maxLinesAtOnce) + break; CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr, pThis->readMode)); + ++nProcessed; *pbHadFileData = 1; /* this is just a flag, so set it and forget it */ CHKiRet(enqLine(pThis, pCStr)); /* process line */ rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */ @@ -233,7 +239,6 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) pThis->nRecords = 0; } } - /* NOTE: This is usually not reached due to loop exit via CHKiRet() only! */ finalize_it: if(pThis->multiSub.nElem > 0) { @@ -484,6 +489,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a iSeverity = 5; /* notice, as of rfc 3164 */ readMode = 0; pBindRuleset = NULL; + maxLinesAtOnce = 10240; RETiRet; } @@ -527,6 +533,7 @@ static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal) pThis->multiSub.nElem = 0; pThis->iSeverity = iSeverity; pThis->iFacility = iFacility; + pThis->maxLinesAtOnce = maxLinesAtOnce; pThis->iPersistStateInterval = iPersistStateInterval; pThis->nRecords = 0; pThis->readMode = readMode; @@ -604,6 +611,8 @@ CODEmodInit_QueryRegCFSLineHdlr NULL, &iPollInterval, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilereadmode", 0, eCmdHdlrInt, NULL, &readMode, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilemaxlinesatonce", 0, eCmdHdlrSize, + NULL, &maxLinesAtOnce, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepersiststateinterval", 0, eCmdHdlrInt, NULL, &iPersistStateInterval, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilebindruleset", 0, eCmdHdlrGetWord, -- cgit