summaryrefslogtreecommitdiffstats
path: root/plugins/imfile
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-06-27 12:33:26 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2011-06-27 12:33:26 +0200
commit8488d8c3c1e65cb4dacb1dddc71c9186ec9f8f37 (patch)
tree9f612b2808a1590e48cd0f43cb85efca3bb6f83f /plugins/imfile
parent2bd4e10a4dc909346d5a010edefb12c65ed77aec (diff)
parent47729f3b9362f7956c936088ac4bb703633cb33b (diff)
downloadrsyslog-8488d8c3c1e65cb4dacb1dddc71c9186ec9f8f37.tar.gz
rsyslog-8488d8c3c1e65cb4dacb1dddc71c9186ec9f8f37.tar.xz
rsyslog-8488d8c3c1e65cb4dacb1dddc71c9186ec9f8f37.zip
Merge branch 'v5-devel'
Conflicts: ChangeLog configure.ac doc/manual.html plugins/imfile/imfile.c plugins/imklog/imklog.c plugins/imptcp/imptcp.c plugins/imtcp/imtcp.c plugins/imuxsock/imuxsock.c plugins/mmsnmptrapd/mmsnmptrapd.c tools/omfile.c
Diffstat (limited to 'plugins/imfile')
-rw-r--r--plugins/imfile/imfile.c23
1 files changed, 22 insertions, 1 deletions
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index 4425c949..3537d809 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -64,6 +64,7 @@ DEFobjCurrIf(strm)
DEFobjCurrIf(prop)
DEFobjCurrIf(ruleset)
+#define NUM_MULTISUB 1024 /* max number of submits -- TODO: make configurable */
typedef struct fileInfo_s {
uchar *pszFileName;
uchar *pszTag;
@@ -71,11 +72,13 @@ typedef struct fileInfo_s {
uchar *pszStateFile; /* file in which state between runs is to be stored */
int iFacility;
int iSeverity;
+ int maxLinesAtOnce;
int nRecords; /**< How many records did we process before persisting the stream? */
int iPersistStateInterval; /**< how often should state be persisted? (0=on close only) */
strm_t *pStrm; /* its stream (NULL if not assigned) */
int readMode; /* which mode to use in ReadMulteLine call? */
ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ multi_submit_t multiSub;
} fileInfo_t;
@@ -95,6 +98,7 @@ static int iPersistStateInterval = 0; /* how often if state file to be persisted
static int iFacility = 128; /* local0 */
static int iSeverity = 5; /* notice, as of rfc 3164 */
static int readMode = 0; /* mode to use for ReadMultiLine call */
+static int maxLinesAtOnce = 10240; /* how many lines to process in a row? */
static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
static int iFilPtr = 0; /* number of files to be monitored; pointer to next free spot during config */
@@ -126,7 +130,9 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine)
pMsg->iFacility = LOG_FAC(pInfo->iFacility);
pMsg->iSeverity = LOG_PRI(pInfo->iSeverity);
MsgSetRuleset(pMsg, pInfo->pRuleset);
- CHKiRet(submitMsg(pMsg));
+ pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg;
+ if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem)
+ CHKiRet(multiSubmitMsg(&pInfo->multiSub));
finalize_it:
RETiRet;
}
@@ -210,6 +216,7 @@ static void pollFileCancelCleanup(void *pArg)
static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
{
cstr_t *pCStr = NULL;
+ int nProcessed = 0;
DEFiRet;
ASSERT(pbHadFileData != NULL);
@@ -224,7 +231,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
/* loop below will be exited when strmReadLine() returns EOF */
while(glbl.GetGlobalInputTermState() == 0) {
+ if(pThis->maxLinesAtOnce != 0 && nProcessed >= pThis->maxLinesAtOnce)
+ break;
CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr, pThis->readMode));
+ ++nProcessed;
*pbHadFileData = 1; /* this is just a flag, so set it and forget it */
CHKiRet(enqLine(pThis, pCStr)); /* process line */
rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */
@@ -235,6 +245,10 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
}
finalize_it:
+ if(pThis->multiSub.nElem > 0) {
+ /* submit everything that was not yet submitted */
+ CHKiRet(multiSubmitMsg(&pThis->multiSub));
+ }
; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */
/* Note: the problem above is that pthread:cleanup_pop() is a macro which
* evaluates to something like "} while(0);". So the code would become
@@ -480,6 +494,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iSeverity = 5; /* notice, as of rfc 3164 */
readMode = 0;
pBindRuleset = NULL;
+ maxLinesAtOnce = 10240;
RETiRet;
}
@@ -518,8 +533,12 @@ static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal)
pThis->pszStateFile = (uchar*) strdup((char*) pszStateFile);
}
+ CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(NUM_MULTISUB * sizeof(msg_t*)));
+ pThis->multiSub.maxElem = NUM_MULTISUB;
+ pThis->multiSub.nElem = 0;
pThis->iSeverity = iSeverity;
pThis->iFacility = iFacility;
+ pThis->maxLinesAtOnce = maxLinesAtOnce;
pThis->iPersistStateInterval = iPersistStateInterval;
pThis->nRecords = 0;
pThis->readMode = readMode;
@@ -597,6 +616,8 @@ CODEmodInit_QueryRegCFSLineHdlr
NULL, &iPollInterval, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilereadmode", 0, eCmdHdlrInt,
NULL, &readMode, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilemaxlinesatonce", 0, eCmdHdlrSize,
+ NULL, &maxLinesAtOnce, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepersiststateinterval", 0, eCmdHdlrInt,
NULL, &iPersistStateInterval, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilebindruleset", 0, eCmdHdlrGetWord,