diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-03 12:05:50 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-07-03 12:05:50 +0200 |
commit | 22328c72aef2df73741afefe0e8b53d3ad83f15d (patch) | |
tree | 4a0b311508ebb28a0dc72e12993f704c2709d46a /tools | |
parent | da933a7e105acf814d5e7955d39d29eab3a96613 (diff) | |
parent | 7c0ca7553738161c681c0d0600de99e9fabee81d (diff) | |
download | rsyslog-22328c72aef2df73741afefe0e8b53d3ad83f15d.tar.gz rsyslog-22328c72aef2df73741afefe0e8b53d3ad83f15d.tar.xz rsyslog-22328c72aef2df73741afefe0e8b53d3ad83f15d.zip |
Merge branch 'v5-devel'
Conflicts:
ChangeLog
runtime/datetime.c
Diffstat (limited to 'tools')
-rw-r--r-- | tools/syslogd.c | 264 |
1 files changed, 255 insertions, 9 deletions
diff --git a/tools/syslogd.c b/tools/syslogd.c index f7253a8e..e3824d9e 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -129,12 +129,15 @@ #include "omfile.h" #include "omdiscard.h" #include "threads.h" +#include "wti.h" #include "queue.h" #include "stream.h" #include "conf.h" #include "errmsg.h" #include "datetime.h" #include "parser.h" +//#include "sysvar.h" +#include "batch.h" #include "unicode-helper.h" #include "ruleset.h" #include "rule.h" @@ -293,6 +296,7 @@ static int iMainMsgQtoWrkShutdown = 60000; /* timeout for worker thread shutdo static int iMainMsgQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ static int iMainMsgQDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */ static int64 iMainMsgQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */ +static int iMainMsgQueDeqBatchSize = 32; /* dequeue batch size */ static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ static int iMainMsgQueueDeqtWinFromHr = 0; /* hour begin of time frame when queue is to be dequeued */ static int iMainMsgQueueDeqtWinToHr = 25; /* hour begin of time frame when queue is to be dequeued */ @@ -359,6 +363,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a bMainMsgQSaveOnShutdown = 1; MainMsgQueType = QUEUETYPE_FIXED_ARRAY; iMainMsgQueMaxDiskSpace = 0; + iMainMsgQueDeqBatchSize = 32; glbliActionResumeRetryCount = 0; return RS_RET_OK; @@ -915,25 +920,263 @@ finalize_it: RETiRet; } +#if 0 // Code obsoleted by merge imfile, but check for changes if there are problems +//thus I leave it in for the time being TODO: remove +/* This functions looks at the given message and checks if it matches the + * provided filter condition. If so, it returns true, else it returns + * false. This is a helper to logmsg() and meant to drive the decision + * process if a message is to be processed or not. As I expect this + * decision code to grow more complex over time AND logmsg() is already + * a very lengthy function, I thought a separate function is more appropriate. + * 2005-09-19 rgerhards + * 2008-02-25 rgerhards: changed interface, now utilizes iRet, bProcessMsg + * returns is message should be procesed. + */ +static rsRetVal shouldProcessThisMessage(selector_t *f, msg_t *pMsg, int *bProcessMsg) +{ + DEFiRet; + unsigned short pbMustBeFreed; + char *pszPropVal; + int bRet = 0; + vm_t *pVM = NULL; + var_t *pResult = NULL; + + assert(f != NULL); + assert(pMsg != NULL); + + /* we first have a look at the global, BSD-style block filters (for tag + * and host). Only if they match, we evaluate the actual filter. + * rgerhards, 2005-10-18 + */ + if(f->eHostnameCmpMode == HN_NO_COMP) { + /* EMPTY BY INTENSION - we check this value first, because + * it is the one most often used, so this saves us time! + */ + } else if(f->eHostnameCmpMode == HN_COMP_MATCH) { + if(rsCStrSzStrCmp(f->pCSHostnameComp, (uchar*) getHOSTNAME(pMsg), getHOSTNAMELen(pMsg))) { + /* not equal, so we are already done... */ + dbgprintf("hostname filter '+%s' does not match '%s'\n", + rsCStrGetSzStrNoNULL(f->pCSHostnameComp), getHOSTNAME(pMsg)); + FINALIZE; + } + } else { /* must be -hostname */ + if(!rsCStrSzStrCmp(f->pCSHostnameComp, (uchar*) getHOSTNAME(pMsg), getHOSTNAMELen(pMsg))) { + /* not equal, so we are already done... */ + dbgprintf("hostname filter '-%s' does not match '%s'\n", + rsCStrGetSzStrNoNULL(f->pCSHostnameComp), getHOSTNAME(pMsg)); + FINALIZE; + } + } + + if(f->pCSProgNameComp != NULL) { + int bInv = 0, bEqv = 0, offset = 0; + if(*(rsCStrGetSzStrNoNULL(f->pCSProgNameComp)) == '-') { + if(*(rsCStrGetSzStrNoNULL(f->pCSProgNameComp) + 1) == '-') + offset = 1; + else { + bInv = 1; + offset = 1; + } + } + if(!rsCStrOffsetSzStrCmp(f->pCSProgNameComp, offset, (uchar*) getProgramName(pMsg), getProgramNameLen(pMsg))) + bEqv = 1; + + if((!bEqv && !bInv) || (bEqv && bInv)) { + /* not equal or inverted selection, so we are already done... */ + dbgprintf("programname filter '%s' does not match '%s'\n", + rsCStrGetSzStrNoNULL(f->pCSProgNameComp), getProgramName(pMsg)); + FINALIZE; + } + } + + /* done with the BSD-style block filters */ + + if(f->f_filter_type == FILTER_PRI) { + /* skip messages that are incorrect priority */ + if ( (f->f_filterData.f_pmask[pMsg->iFacility] == TABLE_NOPRI) || \ + ((f->f_filterData.f_pmask[pMsg->iFacility] & (1<<pMsg->iSeverity)) == 0) ) + bRet = 0; + else + bRet = 1; + } else if(f->f_filter_type == FILTER_EXPR) { + CHKiRet(vm.Construct(&pVM)); + CHKiRet(vm.ConstructFinalize(pVM)); + CHKiRet(vm.SetMsg(pVM, pMsg)); + CHKiRet(vm.ExecProg(pVM, f->f_filterData.f_expr->pVmprg)); + CHKiRet(vm.PopBoolFromStack(pVM, &pResult)); + dbgprintf("result of expression evaluation: %lld\n", pResult->val.num); + /* VM is destructed on function exit */ + bRet = (pResult->val.num) ? 1 : 0; + } else { + assert(f->f_filter_type == FILTER_PROP); /* assert() just in case... */ + pszPropVal = MsgGetProp(pMsg, NULL, f->f_filterData.prop.pCSPropName, &pbMustBeFreed); + + /* Now do the compares (short list currently ;)) */ + switch(f->f_filterData.prop.operation ) { + case FIOP_CONTAINS: + if(rsCStrLocateInSzStr(f->f_filterData.prop.pCSCompValue, (uchar*) pszPropVal) != -1) + bRet = 1; + break; + case FIOP_ISEQUAL: + if(rsCStrSzStrCmp(f->f_filterData.prop.pCSCompValue, + (uchar*) pszPropVal, strlen(pszPropVal)) == 0) + bRet = 1; /* process message! */ + break; + case FIOP_STARTSWITH: + if(rsCStrSzStrStartsWithCStr(f->f_filterData.prop.pCSCompValue, + (uchar*) pszPropVal, strlen(pszPropVal)) == 0) + bRet = 1; /* process message! */ + break; + case FIOP_REGEX: + if(rsCStrSzStrMatchRegex(f->f_filterData.prop.pCSCompValue, + (unsigned char*) pszPropVal, 0, &f->f_filterData.prop.regex_cache) == RS_RET_OK) + bRet = 1; + break; + case FIOP_EREREGEX: + if(rsCStrSzStrMatchRegex(f->f_filterData.prop.pCSCompValue, + (unsigned char*) pszPropVal, 1, &f->f_filterData.prop.regex_cache) == RS_RET_OK) + bRet = 1; + break; + default: + /* here, it handles NOP (for performance reasons) */ + assert(f->f_filterData.prop.operation == FIOP_NOP); + bRet = 1; /* as good as any other default ;) */ + break; + } + + /* now check if the value must be negated */ + if(f->f_filterData.prop.isNegated) + bRet = (bRet == 1) ? 0 : 1; + + if(Debug) { + dbgprintf("Filter: check for property '%s' (value '%s') ", + rsCStrGetSzStrNoNULL(f->f_filterData.prop.pCSPropName), + pszPropVal); + if(f->f_filterData.prop.isNegated) + dbgprintf("NOT "); + dbgprintf("%s '%s': %s\n", + getFIOPName(f->f_filterData.prop.operation), + rsCStrGetSzStrNoNULL(f->f_filterData.prop.pCSCompValue), + bRet ? "TRUE" : "FALSE"); + } + + /* cleanup */ + if(pbMustBeFreed) + free(pszPropVal); + } + +finalize_it: + /* destruct in any case, not just on error, but it makes error handling much easier */ + if(pVM != NULL) + vm.Destruct(&pVM); + + if(pResult != NULL) + var.Destruct(&pResult); + + *bProcessMsg = bRet; + RETiRet; +} + + +/* helper to processMsg(), used to call the configured actions. It is + * executed from within llExecFunc() of the action list. + * rgerhards, 2007-08-02 + */ +typedef struct processMsgDoActions_s { + int bPrevWasSuspended; /* was the previous action suspended? */ + msg_t *pMsg; +} processMsgDoActions_t; +DEFFUNC_llExecFunc(processMsgDoActions) +{ + DEFiRet; + rsRetVal iRetMod; /* return value of module - we do not always pass that back */ + action_t *pAction = (action_t*) pData; + processMsgDoActions_t *pDoActData = (processMsgDoActions_t*) pParam; + + assert(pAction != NULL); + + if((pAction->bExecWhenPrevSusp == 1) && (pDoActData->bPrevWasSuspended == 0)) { + dbgprintf("not calling action because the previous one is not suspended\n"); + ABORT_FINALIZE(RS_RET_OK); + } + + /* MULTIQUEUE: look at this below! (I say: batch states!) */ + iRetMod = actionCallAction(pAction, pDoActData->pMsg); + if(iRetMod == RS_RET_DISCARDMSG) { + ABORT_FINALIZE(RS_RET_DISCARDMSG); + } else if(iRetMod == RS_RET_SUSPENDED) { + /* indicate suspension for next module to be called */ + pDoActData->bPrevWasSuspended = 1; + } else { + pDoActData->bPrevWasSuspended = 0; + } + +finalize_it: + RETiRet; +} + + +/* Process (consume) a received message from the main queue. Here, messages are + * filtered and those where the filter evaluates to true are passed to the action + * queue for further processing. + * rgerhards, 2005-10-13 + */ +static void +processMsg(msg_t *pMsg) +{ + selector_t *f; + int bContinue; + int bProcessMsg; + processMsgDoActions_t DoActData; + rsRetVal iRet; + + BEGINfunc + assert(pMsg != NULL); + + /* log the message to the particular outputs */ + + bContinue = 1; + for (f = Files; f != NULL && bContinue ; f = f->f_next) { + /* first check the filters... */ + iRet = shouldProcessThisMessage(f, pMsg, &bProcessMsg); + if(!bProcessMsg) { + continue; + } + + /* ok -- from here, we have action-specific code, nothing really selector-specific -- rger 2007-08-01 */ + DoActData.pMsg = pMsg; + DoActData.bPrevWasSuspended = 0; + if(llExecFunc(&f->llActList, processMsgDoActions, (void*)&DoActData) == RS_RET_DISCARDMSG) + bContinue = 0; + } + ENDfunc +} + +#endif // if 0 from merge omfile /* The consumer of dequeued messages. This function is called by the * queue engine on dequeueing of a message. It runs on a SEPARATE - * THREAD. - * Please note: the message object is destructed by the queue itself! + * THREAD. It receives an array of pointers, which it must iterate + * over. We do not do any further batching, as this is of no benefit + * for the main queue. */ static rsRetVal -msgConsumer(void __attribute__((unused)) *notNeeded, void *pUsr) +msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch) { + int i; + msg_t *pMsg; DEFiRet; - msg_t *pMsg = (msg_t*) pUsr; - assert(pMsg != NULL); + assert(pBatch != NULL); - if((pMsg->msgFlags & NEEDS_PARSING) != 0) { - parseMsg(pMsg); + for(i = 0 ; i < pBatch->nElem ; i++) { + pMsg = (msg_t*) pBatch->pElem[i].pUsrp; + DBGPRINTF("msgConsumer processes msg %d/%d\n", i, pBatch->nElem); + if((pMsg->msgFlags & NEEDS_PARSING) != 0) { + parseMsg(pMsg); + } + ruleset.ProcessMsg(pMsg); } - ruleset.ProcessMsg(pMsg); - msgDestruct(&pMsg); RETiRet; } @@ -1588,6 +1831,7 @@ static void doDie(int sig) # define MSG1 "DoDie called.\n" # define MSG2 "DoDie called 5 times - unconditional exit\n" static int iRetries = 0; /* debug aid */ + dbgprintf(MSG1); if(Debug) write(1, MSG1, sizeof(MSG1) - 1); if(iRetries++ == 4) { @@ -2304,6 +2548,7 @@ init() setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize); setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace); + setQPROP(qqueueSetiDeqBatchSize, "$MainMsgQueueDequeueBatchSize", iMainMsgQueDeqBatchSize); setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName); setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt); setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", bMainMsgQSyncQeueFiles); @@ -2650,6 +2895,7 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iMainMsgQDeqSlowdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeuebatchsize", 0, eCmdHdlrSize, NULL, &iMainMsgQueDeqBatchSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxDiskSpace, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iMainMsgQueueDeqtWinFromHr, NULL)); |