diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imfile/imfile.c | 66 | ||||
-rw-r--r-- | plugins/imptcp/imptcp.c | 269 | ||||
-rw-r--r-- | plugins/imudp/Makefile.am | 2 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 168 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 111 | ||||
-rw-r--r-- | plugins/omhdfs/omhdfs.c | 2 | ||||
-rw-r--r-- | plugins/ommongodb/Makefile.am | 11 | ||||
-rw-r--r-- | plugins/ommongodb/README | 23 | ||||
-rw-r--r-- | plugins/ommongodb/ommongodb.c | 280 | ||||
-rw-r--r-- | plugins/ommysql/ommysql.c | 39 | ||||
-rw-r--r-- | plugins/pmaixforwardedfrom/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/pmaixforwardedfrom/pmaixforwardedfrom.c | 167 | ||||
-rw-r--r-- | plugins/pmcisconames/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/pmcisconames/pmcisconames.c | 177 | ||||
-rw-r--r-- | plugins/pmsnare/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/pmsnare/pmsnare.c | 238 |
16 files changed, 1415 insertions, 162 deletions
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index 36a2c015..bd44fd55 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -48,6 +48,7 @@ #include "unicode-helper.h" #include "prop.h" #include "stringbuf.h" +#include "ruleset.h" MODULE_TYPE_INPUT /* must be present for input modules, do not remove */ @@ -60,6 +61,7 @@ DEFobjCurrIf(glbl) DEFobjCurrIf(datetime) DEFobjCurrIf(strm) DEFobjCurrIf(prop) +DEFobjCurrIf(ruleset) typedef struct fileInfo_s { uchar *pszFileName; @@ -71,6 +73,8 @@ typedef struct fileInfo_s { int nRecords; /**< How many records did we process before persisting the stream? */ int iPersistStateInterval; /**< how often should state be persisted? (0=on close only) */ strm_t *pStrm; /* its stream (NULL if not assigned) */ + int readMode; /* which mode to use in ReadMulteLine call? */ + ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */ } fileInfo_t; @@ -85,6 +89,8 @@ static int iPollInterval = 10; /* number of seconds to sleep when there was no f static int iPersistStateInterval = 0; /* how often if state file to be persisted? (default 0->never) */ static int iFacility = 128; /* local0 */ static int iSeverity = 5; /* notice, as of rfc 3164 */ +static int readMode = 0; /* mode to use for ReadMultiLine call */ +static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ static int iFilPtr = 0; /* number of files to be monitored; pointer to next free spot during config */ #define MAX_INPUT_FILES 100 @@ -114,6 +120,7 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine) MsgSetTAG(pMsg, pInfo->pszTag, pInfo->lenTag); pMsg->iFacility = LOG_FAC(pInfo->iFacility); pMsg->iSeverity = LOG_PRI(pInfo->iSeverity); + MsgSetRuleset(pMsg, pInfo->pRuleset); CHKiRet(submitMsg(pMsg)); finalize_it: RETiRet; @@ -211,8 +218,8 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) } /* loop below will be exited when strmReadLine() returns EOF */ - while(1) { - CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr)); + while(glbl.GetGlobalInputTermState() == 0) { + CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr, pThis->readMode)); *pbHadFileData = 1; /* this is just a flag, so set it and forget it */ CHKiRet(enqLine(pThis, pCStr)); /* process line */ rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */ @@ -287,23 +294,24 @@ BEGINrunInput int bHadFileData; /* were there at least one file with data during this run? */ CODESTARTrunInput pthread_cleanup_push(inputModuleCleanup, NULL); - while(1) { - + while(glbl.GetGlobalInputTermState() == 0) { do { bHadFileData = 0; for(i = 0 ; i < iFilPtr ; ++i) { + if(glbl.GetGlobalInputTermState() == 1) + break; /* terminate input! */ pollFile(&files[i], &bHadFileData); } - } while(iFilPtr > 1 && bHadFileData == 1); /* warning: do...while()! */ + } while(iFilPtr > 1 && bHadFileData == 1 && glbl.GetGlobalInputTermState() == 0); /* warning: do...while()! */ /* Note: the additional 10ns wait is vitally important. It guards rsyslog against totally * hogging the CPU if the users selects a polling interval of 0 seconds. It doesn't hurt any * other valid scenario. So do not remove. -- rgerhards, 2008-02-14 */ - srSleep(iPollInterval, 10); - + if(glbl.GetGlobalInputTermState() == 0) + srSleep(iPollInterval, 10); } - /*NOTREACHED*/ + DBGPRINTF("imfile: terminating upon request of rsyslog core\n"); pthread_cleanup_pop(0); /* just for completeness, but never called... */ RETiRet; /* use it to make sure the housekeeping is done! */ @@ -396,6 +404,13 @@ CODESTARTafterRun ENDafterRun +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + /* The following entry points are defined in module-template.h. * In general, they need to be present, but you do NOT need to provide * any code here. @@ -408,12 +423,14 @@ CODESTARTmodExit objRelease(glbl, CORE_COMPONENT); objRelease(errmsg, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); + objRelease(ruleset, CORE_COMPONENT); ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -447,6 +464,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a iPollInterval = 10; iFacility = 128; /* local0 */ iSeverity = 5; /* notice, as of rfc 3164 */ + readMode = 0; + pBindRuleset = NULL; RETiRet; } @@ -489,6 +508,8 @@ static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal) pThis->iFacility = iFacility; pThis->iPersistStateInterval = iPersistStateInterval; pThis->nRecords = 0; + pThis->readMode = readMode; + pThis->pRuleset = pBindRuleset; iPersistStateInterval = 0; } else { errmsg.LogError(0, RS_RET_OUT_OF_DESRIPTORS, "Too many file monitors configured - ignoring this one"); @@ -504,6 +525,29 @@ finalize_it: RETiRet; } + +/* accept a new ruleset to bind. Checks if it exists and complains, if not */ +static rsRetVal +setRuleset(void __attribute__((unused)) *pVal, uchar *pszName) +{ + ruleset_t *pRuleset; + rsRetVal localRet; + DEFiRet; + + localRet = ruleset.GetRuleset(&pRuleset, pszName); + if(localRet == RS_RET_NOT_FOUND) { + errmsg.LogError(0, NO_ERRCODE, "error: ruleset '%s' not found - ignored", pszName); + } + CHKiRet(localRet); + pBindRuleset = pRuleset; + DBGPRINTF("imfile current bind ruleset %p: '%s'\n", pRuleset, pszName); + +finalize_it: + free(pszName); /* no longer needed */ + RETiRet; +} + + /* modInit() is called once the module is loaded. It must perform all module-wide * initialization tasks. There are also a number of housekeeping tasks that the * framework requires. These are handled by the macros. Please note that the @@ -521,8 +565,10 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(strm, CORE_COMPONENT)); + CHKiRet(objUse(ruleset, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); + DBGPRINTF("imfile: version %s initializing\n", VERSION); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilename", 0, eCmdHdlrGetWord, NULL, &pszFileName, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfiletag", 0, eCmdHdlrGetWord, @@ -535,8 +581,12 @@ CODEmodInit_QueryRegCFSLineHdlr NULL, &iFacility, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepollinterval", 0, eCmdHdlrInt, NULL, &iPollInterval, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilereadmode", 0, eCmdHdlrInt, + NULL, &readMode, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepersiststateinterval", 0, eCmdHdlrInt, NULL, &iPersistStateInterval, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilebindruleset", 0, eCmdHdlrGetWord, + setRuleset, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); /* that command ads a new file! */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputrunfilemonitor", 0, eCmdHdlrGetWord, addMonitor, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 2ff292c2..63447a72 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -10,7 +10,7 @@ * * File begun on 2010-08-10 by RGerhards * - * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -83,7 +83,8 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(errmsg) DEFobjCurrIf(ruleset) - +/* forward references */ +static void * wrkr(void *myself); /* config settings */ typedef struct configSettings_s { @@ -92,6 +93,7 @@ typedef struct configSettings_s { uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ uchar *lstnIP; /* which IP we should listen on? */ ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + int wrkrMax; /* max number of workers (actually "helper workers") */ } configSettings_t; static configSettings_t cs; @@ -117,6 +119,7 @@ struct ptcpsrv_s { ruleset_t *pRuleset; ptcplstn_t *pLstn; /* root of our listeners */ ptcpsess_t *pSess; /* root of our sessions */ + pthread_mutex_t mutSessLst; }; /* the ptcp session object. Describes a single active session. @@ -154,6 +157,20 @@ struct ptcplstn_s { }; +/* The following structure controls the worker threads. Global data is + * needed for their access. + */ +static struct wrkrInfo_s { + pthread_t tid; /* the worker's thread ID */ + pthread_cond_t run; + struct epoll_event *event; /* event == NULL -> idle */ + long long unsigned numCalled; /* how often was this called */ +} wrkrInfo[16]; +static pthread_mutex_t wrkrMut; +static pthread_cond_t wrkrIdle; +static int wrkrRunning; + + /* type of object stored in epoll descriptor */ typedef enum { epolld_lstn, @@ -171,20 +188,10 @@ struct epolld_s { /* global data */ -//static permittedPeers_t *pPermPeersRoot = NULL; +pthread_attr_t wrkrThrdAttr; /* Attribute for session threads; read only after startup */ static ptcpsrv_t *pSrvRoot = NULL; static int epollfd = -1; /* (sole) descriptor for epoll */ static int iMaxLine; /* maximum size of a single message */ -/* we use a single static receive buffer, as this module is not multi-threaded. Keeping - * the buffer in the data segment is probably a little bit more efficient than on the stack - * (but at least I can't believe it will ever be less efficient ;) -- rgerhards, 2010-08-10 - * Note that we do NOT (yet?) provide a config setting to set the buffer size. For usual - * syslog traffic, it should be large enough. Also keep in mind that we run under a virtual - * memory system, so if we do not use large parts of the buffer, that's no issue at - * all -- it'll just use up address space. On the other hand, it would be silly to page in - * or page out some data just to get space for the IO buffer. - */ -static char rcvBuf[128*1024]; /* forward definitions */ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal); @@ -209,6 +216,7 @@ static void destructSrv(ptcpsrv_t *pSrv) { prop.Destruct(&pSrv->pInputName); + pthread_mutex_destroy(&pSrv->mutSessLst); free(pSrv->port); free(pSrv); } @@ -678,6 +686,7 @@ static inline void initConfigSettings(void) { cs.bEmitMsgOnClose = 0; + cs.wrkrMax = 2; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; cs.pszInputName = NULL; cs.pRuleset = NULL; @@ -790,10 +799,12 @@ addSess(ptcpsrv_t *pSrv, int sock, prop_t *peerName, prop_t *peerIP) /* add to start of server's listener list */ pSess->prev = NULL; + pthread_mutex_lock(&pSrv->mutSessLst); pSess->next = pSrv->pSess; if(pSrv->pSess != NULL) pSrv->pSess->prev = pSess; pSrv->pSess = pSess; + pthread_mutex_unlock(&pSrv->mutSessLst); iRet = addEPollSock(epolld_sess, pSess, sock, &pSess->epd); @@ -816,10 +827,8 @@ closeSess(ptcpsess_t *pSess) CHKiRet(removeEPollSock(sock, pSess->epd)); close(sock); + pthread_mutex_lock(&pSess->pSrv->mutSessLst); /* finally unlink session from structures */ -//fprintf(stderr, "closing session %d next %p, prev %p\n", pSess->sock, pSess->next, pSess->prev); -//DBGPRINTF("imptcp: pSess->next %p\n", pSess->next); -//DBGPRINTF("imptcp: pSess->prev %p\n", pSess->prev); if(pSess->next != NULL) pSess->next->prev = pSess->prev; if(pSess->prev == NULL) { @@ -828,6 +837,7 @@ closeSess(ptcpsess_t *pSess) } else { pSess->prev->next = pSess->next; } + pthread_mutex_unlock(&pSess->pSrv->mutSessLst); /* unlinked, now remove structure */ destructSess(pSess); @@ -838,21 +848,6 @@ finalize_it: } -#if 0 -/* set permitted peer -- rgerhards, 2008-05-19 - */ -static rsRetVal -setPermittedPeer(void __attribute__((unused)) *pVal, uchar *pszID) -{ - DEFiRet; - CHKiRet(net.AddPermittedPeer(&pPermPeersRoot, pszID)); - free(pszID); /* no longer needed, but we need to free as of interface def */ -finalize_it: - RETiRet; -} -#endif - - /* accept a new ruleset to bind. Checks if it exists and complains, if not */ static rsRetVal setRuleset(void __attribute__((unused)) *pVal, uchar *pszName) { @@ -880,6 +875,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa ptcpsrv_t *pSrv; CHKmalloc(pSrv = malloc(sizeof(ptcpsrv_t))); + pthread_mutex_init(&pSrv->mutSessLst, NULL); pSrv->pSess = NULL; pSrv->pLstn = NULL; pSrv->bEmitMsgOnClose = cs.bEmitMsgOnClose; @@ -911,6 +907,46 @@ finalize_it: } +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +startWorkerPool(void) +{ + int i; + wrkrRunning = 0; + if(cs.wrkrMax > 16) + cs.wrkrMax = 16; /* TODO: make dynamic? */ + pthread_mutex_init(&wrkrMut, NULL); + pthread_cond_init(&wrkrIdle, NULL); + for(i = 0 ; i < cs.wrkrMax ; ++i) { + /* init worker info structure! */ + pthread_cond_init(&wrkrInfo[i].run, NULL); + wrkrInfo[i].event = NULL; + wrkrInfo[i].numCalled = 0; + pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i])); + } + +} + +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +stopWorkerPool(void) +{ + int i; + for(i = 0 ; i < cs.wrkrMax ; ++i) { + pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ + pthread_join(wrkrInfo[i].tid, NULL); + DBGPRINTF("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); + pthread_cond_destroy(&wrkrInfo[i].run); + } + pthread_cond_destroy(&wrkrIdle); + pthread_mutex_destroy(&wrkrMut); + +} + + + /* start up all listeners * This is a one-time stop once the module is set to start. */ @@ -922,7 +958,7 @@ startupServers() pSrv = pSrvRoot; while(pSrv != NULL) { - DBGPRINTF("Starting up ptcp server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName); + DBGPRINTF("imptcp: starting up server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName); startupSrv(pSrv); pSrv = pSrv->pNext; } @@ -944,9 +980,9 @@ lstnActivity(ptcplstn_t *pLstn) DEFiRet; DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock); - while(1) { + while(glbl.GetGlobalInputTermState() == 0) { localRet = AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP); - if(localRet == RS_RET_NO_MORE_DATA) + if(localRet == RS_RET_NO_MORE_DATA || glbl.GetGlobalInputTermState() == 1) break; CHKiRet(localRet); CHKiRet(addSess(pLstn->pSrv, newSock, peerName, peerIP)); @@ -965,6 +1001,7 @@ sessActivity(ptcpsess_t *pSess) { int lenRcv; int lenBuf; + char rcvBuf[128*1024]; DEFiRet; DBGPRINTF("imptcp: new activity on session socket %d\n", pSess->sock); @@ -1002,35 +1039,127 @@ finalize_it: } +/* This function is called to process a single request. This may + * be carried out by the main worker or a helper. It can be run + * concurrently. + */ +static inline void +processWorkItem(struct epoll_event *event) +{ + epolld_t *epd; + + epd = (epolld_t*) event->data.ptr; + switch(epd->typ) { + case epolld_lstn: + lstnActivity((ptcplstn_t *) epd->ptr); + break; + case epolld_sess: + sessActivity((ptcpsess_t *) epd->ptr); + break; + default: + errmsg.LogError(0, RS_RET_INTERNAL_ERROR, + "error: invalid epolld_type_t %d after epoll", epd->typ); + break; + } +} + + +/* This function is called to process a complete workset, that + * is a set of events returned from epoll. + */ +static inline void +processWorkSet(int nEvents, struct epoll_event events[]) +{ + int iEvt; + int i; + int remainEvents; + + remainEvents = nEvents; + for(iEvt = 0 ; (iEvt < nEvents) && (glbl.GetGlobalInputTermState() == 0) ; ++iEvt) { + if(remainEvents == 1) { + /* process self, save context switch */ + processWorkItem(events+iEvt); + } else { + pthread_mutex_lock(&wrkrMut); + /* check if there is a free worker */ + for(i = 0 ; (i < cs.wrkrMax) && (wrkrInfo[i].event != NULL) ; ++i) + /*do search*/; + if(i < cs.wrkrMax) { + /* worker free -> use it! */ + wrkrInfo[i].event = events+iEvt; + ++wrkrRunning; + pthread_cond_signal(&wrkrInfo[i].run); + pthread_mutex_unlock(&wrkrMut); + } else { + pthread_mutex_unlock(&wrkrMut); + /* no free worker, so we process this one ourselfs */ + processWorkItem(events+iEvt); + } + } + --remainEvents; + } + + if(nEvents > 1) { + /* we now need to wait until all workers finish. This is because the + * rest of this module can not handle the concurrency introduced + * by workers running during the epoll call. + */ + pthread_mutex_lock(&wrkrMut); + while(wrkrRunning > 0) { + pthread_cond_wait(&wrkrIdle, &wrkrMut); + } + pthread_mutex_unlock(&wrkrMut); + } + +} + + +/* worker to process incoming requests + */ +static void * +wrkr(void *myself) +{ + struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself; + + pthread_mutex_lock(&wrkrMut); + while(1) { + while(me->event == NULL && glbl.GetGlobalInputTermState() == 0) { + pthread_cond_wait(&me->run, &wrkrMut); + } + if(glbl.GetGlobalInputTermState() == 1) + break; + pthread_mutex_unlock(&wrkrMut); + + ++me->numCalled; + processWorkItem(me->event); + + pthread_mutex_lock(&wrkrMut); + me->event = NULL; /* indicate we are free again */ + --wrkrRunning; + pthread_cond_signal(&wrkrIdle); + } + pthread_mutex_unlock(&wrkrMut); + + return NULL; +} + + /* This function is called to gather input. */ BEGINrunInput - int i; - int nfds; - struct epoll_event events[1]; - epolld_t *epd; + int nEvents; + struct epoll_event events[128]; CODESTARTrunInput - DBGPRINTF("imptcp now beginning to process input data\n"); - /* v5 TODO: consentual termination mode */ - while(1) { + startWorkerPool(); + DBGPRINTF("imptcp: now beginning to process input data\n"); + while(glbl.GetGlobalInputTermState() == 0) { DBGPRINTF("imptcp going on epoll_wait\n"); - nfds = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1); - for(i = 0 ; i < nfds ; ++i) { /* support for larger batches (later, TODO) */ - epd = (epolld_t*) events[i].data.ptr; - switch(epd->typ) { - case epolld_lstn: - lstnActivity((ptcplstn_t *) epd->ptr); - break; - case epolld_sess: - sessActivity((ptcpsess_t *) epd->ptr); - break; - default: - errmsg.LogError(0, RS_RET_INTERNAL_ERROR, - "error: invalid epolld_type_t %d after epoll", epd->typ); - break; - } - } + nEvents = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1); + DBGPRINTF("imptcp: epoll returned %d events\n", nEvents); + processWorkSet(nEvents, events); } + DBGPRINTF("imptcp: successfully terminated\n"); + /* we stop the worker pool in AfterRun, in case we get cancelled for some reason (old Interface) */ ENDrunInput @@ -1038,7 +1167,6 @@ ENDrunInput BEGINwillRun CODESTARTwillRun /* first apply some config settings */ - //net.PrintAllowedSenders(2); /* TCP */ iMaxLine = glbl.GetMaxLine(); /* get maximum size we currently support */ if(pSrvRoot == NULL) { @@ -1104,8 +1232,8 @@ shutdownSrv(ptcpsrv_t *pSrv) BEGINafterRun ptcpsrv_t *pSrv, *srvDel; CODESTARTafterRun - /* do cleanup here */ - //net.clearAllowedSenders(UCHAR_CONSTANT("TCP")); + stopWorkerPool(); + /* we need to close everything that is still open */ pSrv = pSrvRoot; while(pSrv != NULL) { @@ -1121,12 +1249,7 @@ ENDafterRun BEGINmodExit CODESTARTmodExit -#if 0 - if(pPermPeersRoot != NULL) { - net.DestructPermittedPeers(&pPermPeersRoot); - } -#endif - + pthread_attr_destroy(&wrkrThrdAttr); /* release objects we used */ objRelease(glbl, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); @@ -1141,6 +1264,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { cs.bEmitMsgOnClose = 0; + cs.wrkrMax = 2; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; free(cs.pszInputName); cs.pszInputName = NULL; @@ -1150,10 +1274,17 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus } +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -1170,6 +1301,10 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); + /* initialize "read-only" thread attributes */ + pthread_attr_init(&wrkrThrdAttr); + pthread_attr_setstacksize(&wrkrThrdAttr, 2048*1024); + /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord, addTCPListener, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); @@ -1177,6 +1312,8 @@ CODEmodInit_QueryRegCFSLineHdlr eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt, NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt, + NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0, eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0, diff --git a/plugins/imudp/Makefile.am b/plugins/imudp/Makefile.am index 517b1287..bc64b8c8 100644 --- a/plugins/imudp/Makefile.am +++ b/plugins/imudp/Makefile.am @@ -3,4 +3,4 @@ pkglib_LTLIBRARIES = imudp.la imudp_la_SOURCES = imudp.c imudp_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) imudp_la_LDFLAGS = -module -avoid-version -imudp_la_LIBADD = +imudp_la_LIBADD = $(IMUDP_LIBS) diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index b960322e..d347b0ac 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -35,6 +35,9 @@ #if HAVE_SYS_EPOLL_H # include <sys/epoll.h> #endif +#ifdef HAVE_SCHED_H +# include <sched.h> +#endif #include "rsyslog.h" #include "dirty.h" #include "net.h" @@ -78,14 +81,103 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a * termination if we can not get it. -- rgerhards, 2007-12-27 */ static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */ +static uchar *pszSchedPolicy = NULL; /* scheduling policy string */ +static int iSchedPolicy; /* scheduling policy as SCHED_xxx */ +static int iSchedPrio; /* scheduling priority */ +static int seen_iSchedPrio = 0; /* have we seen scheduling priority in the config file? */ static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ -static uchar *pszSchedPolicy = NULL; /**< scheduling policy (string) */ -static int iSchedPrio = -1; /**< scheduling priority (must not be negative) */ #define TIME_REQUERY_DFLT 2 static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */ /* config settings */ +static rsRetVal check_scheduling_priority(int report_error) +{ + DEFiRet; + +#ifdef HAVE_SCHED_GET_PRIORITY_MAX + if (iSchedPrio < sched_get_priority_min(iSchedPolicy) || + iSchedPrio > sched_get_priority_max(iSchedPolicy)) { + if (report_error) + errmsg.LogError(errno, NO_ERRCODE, + "imudp: scheduling priority %d out of range (%d - %d)" + " for scheduling policy '%s' - ignoring settings", + iSchedPrio, + sched_get_priority_min(iSchedPolicy), + sched_get_priority_max(iSchedPolicy), + pszSchedPolicy); + ABORT_FINALIZE(RS_RET_VALIDATION_RUN); + } +#endif + +finalize_it: + RETiRet; +} + +/* Set scheduling priority in the supplied variable (will be iSchedPrio) + * and record that we have seen the directive (in seen_iSchedPrio). + */ +static rsRetVal set_scheduling_priority(void *pVal, int value) +{ + DEFiRet; + + if (seen_iSchedPrio) { + errmsg.LogError(0, NO_ERRCODE, "directive already seen"); + ABORT_FINALIZE(RS_RET_VALIDATION_RUN); + } + *(int *)pVal = value; + seen_iSchedPrio = 1; + if (pszSchedPolicy != NULL) + CHKiRet(check_scheduling_priority(1)); + +finalize_it: + RETiRet; +} + +/* Set scheduling policy in iSchedPolicy */ +static rsRetVal set_scheduling_policy(void *pVal, uchar *pNewVal) +{ + int have_sched_policy = 0; + DEFiRet; + + if (pszSchedPolicy != NULL) { + errmsg.LogError(0, NO_ERRCODE, "directive already seen"); + ABORT_FINALIZE(RS_RET_VALIDATION_RUN); + } + *((uchar**)pVal) = pNewVal; /* pVal is pszSchedPolicy */ + if (0) { /* trick to use conditional compilation */ +#ifdef SCHED_FIFO + } else if (!strcasecmp((char*)pszSchedPolicy, "fifo")) { + iSchedPolicy = SCHED_FIFO; + have_sched_policy = 1; +#endif +#ifdef SCHED_RR + } else if (!strcasecmp((char*)pszSchedPolicy, "rr")) { + iSchedPolicy = SCHED_RR; + have_sched_policy = 1; +#endif +#ifdef SCHED_OTHER + } else if (!strcasecmp((char*)pszSchedPolicy, "other")) { + iSchedPolicy = SCHED_OTHER; + have_sched_policy = 1; +#endif + } else { + errmsg.LogError(errno, NO_ERRCODE, + "imudp: invalid scheduling policy '%s' " + "- ignoring setting", pszSchedPolicy); + } + if (have_sched_policy == 0) { + free(pszSchedPolicy); + pszSchedPolicy = NULL; + ABORT_FINALIZE(RS_RET_VALIDATION_RUN); + } + if (seen_iSchedPrio) + CHKiRet(check_scheduling_priority(1)); + +finalize_it: + RETiRet; +} + /* This function is called when a new listener shall be added. It takes * the configured parameters, tries to bind the socket and, if that @@ -296,6 +388,41 @@ finalize_it: RETiRet; } +static void set_thread_schedparam(void) +{ + struct sched_param sparam; + + if (pszSchedPolicy != NULL && seen_iSchedPrio == 0) { + errmsg.LogError(0, NO_ERRCODE, + "imudp: scheduling policy set, but without priority - ignoring settings"); + } else if (pszSchedPolicy == NULL && seen_iSchedPrio != 0) { + errmsg.LogError(0, NO_ERRCODE, + "imudp: scheduling priority set, but without policy - ignoring settings"); + } else if (pszSchedPolicy != NULL && seen_iSchedPrio != 0 && + check_scheduling_priority(0) == 0) { +#ifndef HAVE_PTHREAD_SETSCHEDPARAM + errmsg.LogError(0, NO_ERRCODE, + "imudp: cannot set thread scheduling policy, " + "pthread_setschedparam() not available"); +#else + int err; + + memset(&sparam, 0, sizeof sparam); + sparam.sched_priority = iSchedPrio; + dbgprintf("imudp trying to set sched policy to '%s', prio %d\n", + pszSchedPolicy, iSchedPrio); + err = pthread_setschedparam(pthread_self(), iSchedPolicy, &sparam); + if (err != 0) { + errmsg.LogError(err, NO_ERRCODE, "imudp: pthread_setschedparam() failed"); + } +#endif + } + + if (pszSchedPolicy != NULL) { + free(pszSchedPolicy); + pszSchedPolicy = NULL; + } +} /* This function implements the main reception loop. Depending on the environment, * we either use the traditional (but slower) select() or the Linux-specific epoll() @@ -319,6 +446,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) /* start "name caching" algo by making sure the previous system indicator * is invalidated. */ + set_thread_schedparam(); bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); @@ -386,6 +514,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) /* start "name caching" algo by making sure the previous system indicator * is invalidated. */ + set_thread_schedparam(); bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); DBGPRINTF("imudp uses select()\n"); @@ -448,7 +577,6 @@ ENDrunInput /* initialize and return if will run or not */ BEGINwillRun - struct sched_param sparam; CODESTARTwillRun /* we need to create the inputName property (only once during our lifetime) */ CHKiRet(prop.Construct(&pInputName)); @@ -457,40 +585,6 @@ CODESTARTwillRun net.PrintAllowedSenders(1); /* UDP */ net.HasRestrictions(UCHAR_CONSTANT("UDP"), &bDoACLCheck); /* UDP */ - - if(pszSchedPolicy == NULL) { - if(iSchedPrio != -1) { - errmsg.LogError(errno, NO_ERRCODE, "imudp: scheduling policy not set, but " - "priority - ignoring settings"); - } - } else { - if(iSchedPrio == -1) { - errmsg.LogError(errno, NO_ERRCODE, "imudp: scheduling policy set, but no " - "priority - ignoring settings"); - } - sparam.sched_priority = iSchedPrio; - dbgprintf("imudp trying to set sched policy to '%s', prio %d\n", - pszSchedPolicy, iSchedPrio); - if(0) { /* trick to use conditional compilation */ -# ifdef SCHED_FIFO - } else if(!strcasecmp((char*)pszSchedPolicy, "fifo")) { - pthread_setschedparam(pthread_self(), SCHED_FIFO, &sparam); -# endif -# ifdef SCHED_RR - } else if(!strcasecmp((char*)pszSchedPolicy, "rr")) { - pthread_setschedparam(pthread_self(), SCHED_RR, &sparam); -# endif -# ifdef SCHED_OTHER - } else if(!strcasecmp((char*)pszSchedPolicy, "other")) { - pthread_setschedparam(pthread_self(), SCHED_OTHER, &sparam); -# endif - } else { - errmsg.LogError(errno, NO_ERRCODE, "imudp: invliad scheduling policy '%s' " - "ignoring settings", pszSchedPolicy); - } - free(pszSchedPolicy); - pszSchedPolicy = NULL; - } /* if we could not set up any listners, there is no point in running... */ if(udpLstnSocks == NULL) diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index c1168c87..b90a3363 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -46,6 +46,7 @@ #include "net.h" #include "glbl.h" #include "msg.h" +#include "parser.h" #include "prop.h" #include "debug.h" #include "unlimited_select.h" @@ -81,6 +82,7 @@ DEF_IMOD_STATIC_DATA DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) DEFobjCurrIf(prop) +DEFobjCurrIf(parser) DEFobjCurrIf(datetime) DEFobjCurrIf(statsobj) @@ -143,7 +145,7 @@ static int startIndexUxLocalSockets; /* process fd from that index on (used to * read-only after startup */ static int nfd = 1; /* number of Unix sockets open / read-only after startup */ -static int bSysSockFromSystemd = 0; /* Did we receive the system socket from systemd? */ +static int sd_fds = 0; /* number of systemd activated sockets */ /* config settings */ static int bOmitLocalLogging = 0; @@ -372,41 +374,32 @@ openLogSocket(lstn_t *pLstn) if(pLstn->sockName[0] == '\0') return -1; - if (ustrcmp(pLstn->sockName, UCHAR_CONSTANT(_PATH_LOG)) == 0) { - bSysSockFromSystemd = 0; /* set default */ - int r; - - /* System log socket code. Check whether an FD was passed in from systemd. If - * so, it's the /dev/log socket, so use it. */ - - r = sd_listen_fds(0); - if (r < 0) { - errmsg.LogError(-r, NO_ERRCODE, "Failed to acquire systemd socket"); - ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); - } - - if (r > 1) { - errmsg.LogError(EINVAL, NO_ERRCODE, "Wrong number of systemd sockets passed"); - ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); - } - - if (r == 1) { - pLstn->fd = SD_LISTEN_FDS_START; - r = sd_is_socket_unix(pLstn->fd, SOCK_DGRAM, -1, _PATH_LOG, 0); - if (r < 0) { - errmsg.LogError(-r, NO_ERRCODE, "Failed to verify systemd socket type"); - ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); - } - - if (!r) { - errmsg.LogError(EINVAL, NO_ERRCODE, "Passed systemd socket of wrong type"); - ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); - } - bSysSockFromSystemd = 1; /* indicate we got the socket from systemd */ - } else { - CHKiRet(createLogSocket(pLstn)); + pLstn->fd = -1; + + if (sd_fds > 0) { + /* Check if the current socket is a systemd activated one. + * If so, just use it. + */ + int fd; + + for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + sd_fds; fd++) { + if( sd_is_socket_unix(fd, SOCK_DGRAM, -1, (const char*) pLstn->sockName, 0) == 1) { + /* ok, it matches -- just use as is */ + pLstn->fd = fd; + + dbgprintf("imuxsock: Acquired UNIX socket '%s' (fd %d) from systemd.\n", + pLstn->sockName, pLstn->fd); + break; + } + /* + * otherwise it either didn't matched *this* socket and + * we just continue to check the next one or there were + * an error and we will create a new socket bellow. + */ } - } else { + } + + if (pLstn->fd == -1) { CHKiRet(createLogSocket(pLstn)); } @@ -510,6 +503,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred) { msg_t *pMsg; int lenMsg; + int offs; int i; uchar *parse; int pri; @@ -527,13 +521,14 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred) */ parse = pRcv; lenMsg = lenRcv; + offs = 1; /* '<' */ - parse++; lenMsg--; /* '<' */ + parse++; pri = 0; - while(lenMsg && isdigit(*parse)) { + while(offs < lenMsg && isdigit(*parse)) { pri = pri * 10 + *parse - '0'; ++parse; - --lenMsg; + ++offs; } facil = LOG_FAC(pri); sever = LOG_PRI(pri); @@ -552,12 +547,14 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred) /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, &st, tt)); MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv); + parser.SanitizeMsg(pMsg); + lenMsg = pMsg->iLenRawMsg - offs; MsgSetInputName(pMsg, pInputName); MsgSetFlowControlType(pMsg, pLstn->flowCtl); pMsg->iFacility = facil; pMsg->iSeverity = sever; - MsgSetAfterPRIOffs(pMsg, lenRcv - lenMsg); + MsgSetAfterPRIOffs(pMsg, offs); parse++; lenMsg--; /* '>' */ @@ -577,7 +574,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred) fixPID(bufParseTAG, &i, cred); MsgSetTAG(pMsg, bufParseTAG, i); - MsgSetMSGoffs(pMsg, lenRcv - lenMsg); + MsgSetMSGoffs(pMsg, pMsg->iLenRawMsg - lenMsg); if(pLstn->bParseHost) { pMsg->msgFlags = pLstn->flags | PARSE_HOSTNAME; @@ -609,7 +606,9 @@ static rsRetVal readSocket(lstn_t *pLstn) int iMaxLine; struct msghdr msgh; struct iovec msgiov; +# if HAVE_SCM_CREDENTIALS struct cmsghdr *cm; +# endif struct ucred *cred; uchar bufRcv[4096+1]; char aux[128]; @@ -633,11 +632,13 @@ static rsRetVal readSocket(lstn_t *pLstn) memset(&msgh, 0, sizeof(msgh)); memset(&msgiov, 0, sizeof(msgiov)); +# if HAVE_SCM_CREDENTIALS if(pLstn->bUseCreds) { memset(&aux, 0, sizeof(aux)); msgh.msg_control = aux; msgh.msg_controllen = sizeof(aux); } +# endif msgiov.iov_base = pRcv; msgiov.iov_len = iMaxLine; msgh.msg_iov = &msgiov; @@ -774,12 +775,18 @@ CODESTARTwillRun listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock) ? 1 : 0; listeners[0].bWritePid = bWritePidSysSock; + sd_fds = sd_listen_fds(0); + if (sd_fds < 0) { + errmsg.LogError(-sd_fds, NO_ERRCODE, "imuxsock: Failed to acquire systemd socket"); + ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX); + } + /* initialize and return if will run or not */ actSocks = 0; for (i = startIndexUxLocalSockets ; i < nfd ; i++) { if(openLogSocket(&(listeners[i])) == RS_RET_OK) { ++actSocks; - dbgprintf("Opened UNIX socket '%s' (fd %d).\n", listeners[i].sockName, listeners[i].fd); + dbgprintf("imuxsock: Opened UNIX socket '%s' (fd %d).\n", listeners[i].sockName, listeners[i].fd); } } @@ -806,15 +813,19 @@ CODESTARTafterRun if (listeners[i].fd != -1) close(listeners[i].fd); - /* Clean-up files. If systemd passed us a socket it is - * systemd's job to clean it up.*/ - if(bSysSockFromSystemd) { - DBGPRINTF("imuxsock: got system socket from systemd, not unlinking it\n"); - i = 1; - } else - i = startIndexUxLocalSockets; - for(; i < nfd; i++) + /* Clean-up files. */ + for(i = startIndexUxLocalSockets; i < nfd; i++) if (listeners[i].sockName && listeners[i].fd != -1) { + + /* If systemd passed us a socket it is systemd's job to clean it up. + * Do not unlink it -- we will get same socket (node) from systemd + * e.g. on restart again. + */ + if (sd_fds > 0 && + listeners[i].fd >= SD_LISTEN_FDS_START && + listeners[i].fd < SD_LISTEN_FDS_START + sd_fds) + continue; + DBGPRINTF("imuxsock: unlinking unix socket file[%d] %s\n", i, listeners[i].sockName); unlink((char*) listeners[i].sockName); } @@ -835,6 +846,7 @@ BEGINmodExit CODESTARTmodExit statsobj.Destruct(&modStats); + objRelease(parser, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); objRelease(errmsg, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); @@ -896,6 +908,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); + CHKiRet(objUse(parser, CORE_COMPONENT)); dbgprintf("imuxsock version %s initializing\n", PACKAGE_VERSION); diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c index 1fe9b46b..1bf10bd7 100644 --- a/plugins/omhdfs/omhdfs.c +++ b/plugins/omhdfs/omhdfs.c @@ -477,7 +477,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKmalloc(files = create_hashtable(20, hash_from_string, key_equals_string, fileObjDestruct4Hashtable)); - CHKiRet(regCfSysLineHdlr((uchar *)"omhdfscs.fileName", 0, eCmdHdlrGetWord, NULL, &cs.fileName, NULL, eConfObjAction)); + CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &cs.fileName, NULL, eConfObjAction)); CHKiRet(regCfSysLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &cs.hdfsHost, NULL, eConfObjAction)); CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &cs.hdfsPort, NULL, eConfObjAction)); CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &cs.dfltTplName, NULL, eConfObjAction)); diff --git a/plugins/ommongodb/Makefile.am b/plugins/ommongodb/Makefile.am new file mode 100644 index 00000000..1b0e23a1 --- /dev/null +++ b/plugins/ommongodb/Makefile.am @@ -0,0 +1,11 @@ +mongodir = ./mongo-c-driver/src +pkglib_LTLIBRARIES = ommongodb.la + +ommongodb_la_SOURCES = ommongodb.c +ommongodb_la_SOURCES += $(mongodir)/bson.c $(mongodir)/mongo.c $(mongodir)/md5.c $(mongodir)/numbers.c + +ommongodb_la_CPPFLAGS = -DMONGO_HAVE_STDINT -Imongo-c-driver/src $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) +ommongodb_la_LDFLAGS = -module -avoid-version +ommongodb_la_LIBADD = + +EXTRA_DIST = diff --git a/plugins/ommongodb/README b/plugins/ommongodb/README new file mode 100644 index 00000000..cea3f3bc --- /dev/null +++ b/plugins/ommongodb/README @@ -0,0 +1,23 @@ +plugin to use MongoDB as backend. + +tested in ubuntu 10.04 and ubuntu 10.10 + +configuration: + +in your /etc/rsyslog.conf, together with other modules: +$ModLoad ommongodb # provides mongodb support + +then in your /etc/rsyslog.d (check your distribution way to organize the configuration..) you create a file 10-mongodb.conf with the following content: + +#the format for the driver is :ommongodb:ip:db:collection;StdMongoDBFmt +#if you want to change what is logged in the db, the template, you must change the source code since the keys are hardcoded +$template StdMongoDBFmt,"%msg%%syslogfacility%%HOSTNAME%%syslogpriority%" +*.* :ommongodb:127.0.0.1,syslog,logs;StdMongoDBFmt + + +TODO +we must ensure that the collection is a capped collection +refactor my code :-) + +email Victor Pereira <victor.pereira@bigrails.com> +twitter twitter.com/vpereira diff --git a/plugins/ommongodb/ommongodb.c b/plugins/ommongodb/ommongodb.c new file mode 100644 index 00000000..8e19105f --- /dev/null +++ b/plugins/ommongodb/ommongodb.c @@ -0,0 +1,280 @@ +#include <stdio.h> +#include <string.h> +#include <stdlib.h> +#include <ctype.h> +#include <errno.h> +#include <assert.h> +#include <signal.h> +#include <time.h> +#include "bson.h" +#include "mongo.h" +#include "config.h" +#include "rsyslog.h" +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "cfsysline.h" +#include "mongo-c-driver/src/mongo.h" + +#define countof(X) ( (size_t) ( sizeof(X)/sizeof*(X) ) ) + +#define DEFAULT_SERVER "127.0.0.1" +#define DEFAULT_DATABASE "syslog" +#define DEFAULT_COLLECTION "log" +#define DEFAULT_DB_COLLECTION "syslog.log" + +//i just defined some constants, i couldt not find the limit +#define MONGO_DB_NAME_SIZE 128 +#define MONGO_COLLECTION_NAME_SIZE 128 + +MODULE_TYPE_OUTPUT +/* internal structures + */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) + +typedef struct _instanceData { + mongo_connection conn[1]; /* ptr */ + mongo_connection_options opts[1]; + mongo_conn_return status; + char db[MONGO_DB_NAME_SIZE]; + char collection[MONGO_COLLECTION_NAME_SIZE]; + char dbcollection[MONGO_DB_NAME_SIZE + MONGO_COLLECTION_NAME_SIZE + 1]; + unsigned uLastMongoDBErrno; + //unsigned iSrvPort; /* sample: server port */ +} instanceData; + +char db[_DB_MAXDBLEN+2]; +static int iSrvPort = 27017; +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + /* use this to specify if select features are supported by this + * plugin. If not, the framework will handle that. Currently, only + * RepeatedMsgReduction ("last message repeated n times") is optional. + */ + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + +static void closeMongoDB(instanceData *pData) +{ + ASSERT(pData != NULL); + + if(pData->conn != NULL) { + mongo_destroy( pData->conn ); + memset(pData->conn,0x00,sizeof(mongo_connection)); + } +} + +BEGINfreeInstance +CODESTARTfreeInstance + closeMongoDB(pData); +ENDfreeInstance + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo + /* nothing special here */ +ENDdbgPrintInstInfo + +/* log a database error with descriptive message. + * We check if we have a valid MongoDB handle. If not, we simply + * report an error + */ +static void reportDBError(instanceData *pData, int bSilent) +{ + char errMsg[512]; + bson ErrObj; + + ASSERT(pData != NULL); + + /* output log message */ + errno = 0; + if(pData->conn == NULL) { + errmsg.LogError(0, NO_ERRCODE, "unknown DB error occured - could not obtain MongoDB handle"); + } else { /* we can ask mysql for the error description... */ + //we should handle the error. if bSilent is set then we should print as debug + mongo_cmd_get_last_error(pData->conn, pData->db, &ErrObj); + bson_destroy(&ErrObj); + } + + return; +} + +/* The following function is responsible for initializing a + * MySQL connection. + * Initially added 2004-10-28 mmeckelein + */ +static rsRetVal initMongoDB(instanceData *pData, int bSilent) +{ + DEFiRet; + + ASSERT(pData != NULL); + ASSERT(pData->conn == NULL); + + //I'm trying to fallback to a default here + if(pData->opts->port == 0) + pData->opts->port = 27017; + + if(pData->opts->host == 0x00) + strcpy(pData->opts->host,DEFAULT_SERVER); + + if(pData->dbcollection == 0x00) + strcpy(pData->dbcollection,DEFAULT_DB_COLLECTION); + + pData->status = mongo_connect(pData->conn, pData->opts ); + + switch (pData->status) { + case mongo_conn_success: + fprintf(stderr, "connection succeeded\n" ); + iRet = RS_RET_OK; + break; + case mongo_conn_bad_arg: + errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle"); + fprintf(stderr, "bad arguments\n" ); + iRet = RS_RET_SUSPENDED; + break; + case mongo_conn_no_socket: + errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle"); + fprintf(stderr, "no socket\n" ); + iRet = RS_RET_SUSPENDED; + break; + case mongo_conn_fail: + errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle"); + fprintf(stderr, "connection failed\n" ); + iRet = RS_RET_SUSPENDED; + break; + case mongo_conn_not_master: + errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle"); + fprintf(stderr, "not master\n" ); + iRet = RS_RET_SUSPENDED; + break; + } + RETiRet; +} + +//we must implement it +rsRetVal writeMongoDB(uchar *psz, instanceData *pData) +{ + char mydate[32]; + char **szParams; + bson b[1]; + bson_buffer buf[1]; + bson_buffer_init( buf ); + bson_append_new_oid(buf, "_id" ); + memset(mydate,0x00,32); + + + DEFiRet; + + ASSERT(psz != NULL); + ASSERT(pData != NULL); + + + /* see if we are ready to proceed */ + if(pData->conn == NULL) { + CHKiRet(initMongoDB(pData, 0)); + } + +szParams = (char**)(void*) psz; +//We can make it beter +//if you change the fields in your template, we must update it here +//there is any C_metaprogramming_ninja there? :-) +if(countof(szParams) > 0) +{ + bson_append_string( buf, "msg", szParams[0]); + bson_append_string( buf, "facility",szParams[1]); + bson_append_string( buf, "hostname", szParams[2] ); + bson_append_string(buf, "priority",szParams[3]); + bson_append_int(buf,"count",countof(szParams)); + bson_from_buffer( b, buf ); + mongo_insert(pData->conn, pData->dbcollection, b ); +} + +if(b) + bson_destroy(b); + + + finalize_it: + if(iRet == RS_RET_OK) { + pData->uLastMongoDBErrno = 0; /* reset error for error supression */ + } + + + RETiRet; +} + +BEGINtryResume +CODESTARTtryResume + if(pData->conn == NULL) { + iRet = initMongoDB(pData, 1); + } +ENDtryResume + +BEGINdoAction +CODESTARTdoAction + iRet = writeMongoDB(ppString[0], pData); +ENDdoAction + +BEGINparseSelectorAct + //int iMongoDBPropErr = 0; +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + + if(!strncmp((char*) p, ":ommongodb:", sizeof(":ommongodb:") - 1)) { + p += sizeof(":ommongodb:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + } else { + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); + } + + CHKiRet(createInstance(&pData)); + + if(getSubString(&p, pData->opts->host, MAXHOSTNAMELEN+1, ',')) + strcpy(pData->opts->host,DEFAULT_SERVER); + + //we must define the max db name + if(getSubString(&p,pData->db,255,',')) + strcpy(pData->db,DEFAULT_DATABASE); + if(getSubString(&p,pData->collection,255,';')) + strcpy(pData->collection,DEFAULT_COLLECTION); + if(*(p-1) == ';') + --p; + + + CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_TPL_AS_ARRAY, (uchar*) " StdMongoDBFmt")); + + + pData->opts->port = (unsigned) iSrvPort; /* set configured port */ + sprintf(pData->dbcollection,"%s.%s",pData->db,pData->collection); + CHKiRet(initMongoDB(pData, 0)); + +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + + +BEGINmodExit +CODESTARTmodExit +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +ENDqueryEtryPt + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); + DBGPRINTF("ompgsql: module compiled with rsyslog version %s.\n", VERSION); + DBGPRINTF("ompgsql: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not "); +ENDmodInit
\ No newline at end of file diff --git a/plugins/ommysql/ommysql.c b/plugins/ommysql/ommysql.c index 5b44d687..4b9d2f7e 100644 --- a/plugins/ommysql/ommysql.c +++ b/plugins/ommysql/ommysql.c @@ -62,10 +62,14 @@ typedef struct _instanceData { char f_dbuid[_DB_MAXUNAMELEN+1]; /* DB user */ char f_dbpwd[_DB_MAXPWDLEN+1]; /* DB user's password */ unsigned uLastMySQLErrno; /* last errno returned by MySQL or 0 if all is well */ + uchar * f_configfile; /* MySQL Client Configuration File */ + uchar * f_configsection; /* MySQL Client Configuration Section */ } instanceData; typedef struct configSettings_s { int iSrvPort; /* database server port */ + uchar *pszMySQLConfigFile; /* MySQL Client Configuration File */ + uchar *pszMySQLConfigSection; /* MySQL Client Configuration Section */ } configSettings_t; SCOPING_SUPPORT; /* must be set AFTER configSettings_t is defined */ @@ -101,6 +105,14 @@ static void closeMySQL(instanceData *pData) mysql_close(pData->f_hmysql); pData->f_hmysql = NULL; } + if(pData->f_configfile!=NULL){ + free(pData->f_configfile); + pData->f_configfile=NULL; + } + if(pData->f_configsection!=NULL){ + free(pData->f_configsection); + pData->f_configsection=NULL; + } } BEGINfreeInstance @@ -162,6 +174,25 @@ static rsRetVal initMySQL(instanceData *pData, int bSilent) errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MySQL handle"); iRet = RS_RET_SUSPENDED; } else { /* we could get the handle, now on with work... */ + mysql_options(pData->f_hmysql,MYSQL_READ_DEFAULT_GROUP,((pData->f_configsection!=NULL)?(char*)pData->f_configsection:"client")); + if(pData->f_configfile!=NULL){ + FILE * fp; + fp=fopen((char*)pData->f_configfile,"r"); + int err=errno; + if(fp==NULL){ + char msg[512]; + snprintf(msg,sizeof(msg)/sizeof(char),"Could not open '%s' for reading",pData->f_configfile); + if(bSilent) { + char errStr[512]; + rs_strerror_r(err, errStr, sizeof(errStr)); + dbgprintf("mysql configuration error(%d): %s - %s\n",err,msg,errStr); + } else + errmsg.LogError(err,NO_ERRCODE,"mysql configuration error: %s\n",msg); + } else { + fclose(fp); + mysql_options(pData->f_hmysql,MYSQL_READ_DEFAULT_FILE,pData->f_configfile); + } + } /* Connect to database */ if(mysql_real_connect(pData->f_hmysql, pData->f_dbsrv, pData->f_dbuid, pData->f_dbpwd, pData->f_dbname, pData->f_dbsrvPort, NULL, 0) == NULL) { @@ -288,6 +319,8 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) ABORT_FINALIZE(RS_RET_INVALID_PARAMS); } else { pData->f_dbsrvPort = (unsigned) cs.iSrvPort; /* set configured port */ + pData->f_configfile = cs.pszMySQLConfigFile; + pData->f_configsection = cs.pszMySQLConfigSection; pData->f_hmysql = NULL; /* initialize, but connect only on first message (important for queued mode!) */ } @@ -312,6 +345,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a { DEFiRet; cs.iSrvPort = 0; /* zero is the default port */ + free(cs.pszMySQLConfigFile); + cs.pszMySQLConfigFile = NULL; + free(cs.pszMySQLConfigSection); + cs.pszMySQLConfigSection = NULL; RETiRet; } @@ -323,6 +360,8 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); /* register our config handlers */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionommysqlserverport", 0, eCmdHdlrInt, NULL, &cs.iSrvPort, STD_LOADABLE_MODULE_ID, eConfObjAction)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"ommysqlconfigfile",0,eCmdHdlrGetWord,NULL,&cs.pszMySQLConfigFile,STD_LOADABLE_MODULE_ID, eConfObjAction)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"ommysqlconfigsection",0,eCmdHdlrGetWord,NULL,&cs.pszMySQLConfigSection,STD_LOADABLE_MODULE_ID, eConfObjAction)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID, eConfObjAction)); ENDmodInit diff --git a/plugins/pmaixforwardedfrom/Makefile.am b/plugins/pmaixforwardedfrom/Makefile.am new file mode 100644 index 00000000..af359d31 --- /dev/null +++ b/plugins/pmaixforwardedfrom/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = pmaixforwardedfrom.la
+
+pmaixforwardedfrom_la_SOURCES = pmaixforwardedfrom.c
+pmaixforwardedfrom_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) -I ../../tools
+pmaixforwardedfrom_la_LDFLAGS = -module -avoid-version
+pmaixforwardedfrom_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/pmaixforwardedfrom/pmaixforwardedfrom.c b/plugins/pmaixforwardedfrom/pmaixforwardedfrom.c new file mode 100644 index 00000000..11634199 --- /dev/null +++ b/plugins/pmaixforwardedfrom/pmaixforwardedfrom.c @@ -0,0 +1,167 @@ +/* pmaixforwardedfrom.c + * + * this detects logs sent by Cisco devices that mangle their syslog output when you tell them to log by name by adding ' :' between the name and the %XXX-X-XXXXXXX: tag + * + * instead of actually parsing the message, this modifies the message and then falls through to allow a later parser to handle the now modified message + * + * created 2010-12-13 by David Lang based on pmlastmsg + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <ctype.h> +#include "conf.h" +#include "syslogd-types.h" +#include "template.h" +#include "msg.h" +#include "module-template.h" +#include "glbl.h" +#include "errmsg.h" +#include "parser.h" +#include "datetime.h" +#include "unicode-helper.h" + +MODULE_TYPE_PARSER +PARSER_NAME("rsyslog.aixforwardedfrom") + +/* internal structures + */ +DEF_PMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(glbl) +DEFobjCurrIf(parser) +DEFobjCurrIf(datetime) + + +/* static data */ +static int bParseHOSTNAMEandTAG; /* cache for the equally-named global param - performance enhancement */ + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATUREAutomaticSanitazion) + iRet = RS_RET_OK; + if(eFeat == sFEATUREAutomaticPRIParsing) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINparse + uchar *p2parse; + uchar *opening; + int lenMsg; +#define OpeningText "Message forwarded from " +CODESTARTparse + dbgprintf("Message will now be parsed by fix AIX Forwarded From parser.\n"); + assert(pMsg != NULL); + assert(pMsg->pszRawMsg != NULL); + lenMsg = pMsg->iLenRawMsg - pMsg->offAfterPRI; /* note: offAfterPRI is already the number of PRI chars (do not add one!) */ + p2parse = pMsg->pszRawMsg + pMsg->offAfterPRI; /* point to start of text, after PRI */ + + /* check if this message is of the type we handle in this (very limited) parser */ + /* first, we permit SP */ + while(lenMsg && *p2parse == ' ') { + --lenMsg; + ++p2parse; + } +dbgprintf("pmaixforwardedfrom: msg to look at: [%d]'%s'\n", lenMsg, p2parse); + if((unsigned) lenMsg < 42) { + /* too short, can not be "our" message */ + /* minimum message, 16 character timestamp, 'Message forwarded from ", 1 character name, ': '*/ +dbgprintf("msg too short!\n"); + ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE); + } + + /* skip over timestamp */ + lenMsg -=16; + p2parse +=16; + /* if there is the string "Message forwarded from " were the hostname should be */ + if(strncasecmp((char*) p2parse, OpeningText, sizeof(OpeningText)-1) != 0) { + /* wrong opening text */ +dbgprintf("not a AIX message forwarded from mangled log!\n"); + ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE); + } + /* bump the message portion up by 23 characters to overwrite the "Message forwarded from " with the hostname */ + lenMsg -=23; + memmove(p2parse, p2parse + 23, lenMsg); + *(p2parse + lenMsg) = '\n'; + *(p2parse + lenMsg + 1) = '\0'; + pMsg->iLenRawMsg -=23; + pMsg->iLenMSG -=23; + /* now look for the : after the hostname to walk past the hostname, also watch for a space in case this isn't really an AIX log, but has a similar preamble */ + while(lenMsg && *p2parse != ' ' && *p2parse != ':') { + --lenMsg; + ++p2parse; + } + if (lenMsg && *p2parse != ':') { +dbgprintf("not a AIX message forwarded from mangled log but similar enough that the preamble has been removed\n"); + ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE); + } + /* bump the message portion up by one character to overwrite the extra : */ + lenMsg -=1; + memmove(p2parse, p2parse + 1, lenMsg); + *(p2parse + lenMsg) = '\n'; + *(p2parse + lenMsg + 1) = '\0'; + pMsg->iLenRawMsg -=1; + pMsg->iLenMSG -=1; + /* now, claim to abort so that something else can parse the now modified message */ + DBGPRINTF("pmaixforwardedfrom: new mesage: [%d]'%s'\n", lenMsg, pMsg->pszRawMsg + pMsg->offAfterPRI); + ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE); + +finalize_it: +ENDparse + + +BEGINmodExit +CODESTARTmodExit + /* release what we no longer need */ + objRelease(errmsg, CORE_COMPONENT); + objRelease(glbl, CORE_COMPONENT); + objRelease(parser, CORE_COMPONENT); + objRelease(datetime, CORE_COMPONENT); +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_PMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(parser, CORE_COMPONENT)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); + + DBGPRINTF("aixforwardedfrom parser init called, compiled with version %s\n", VERSION); + bParseHOSTNAMEandTAG = glbl.GetParseHOSTNAMEandTAG(); /* cache value, is set only during rsyslogd option processing */ + + +ENDmodInit + +/* vim:set ai: + */ diff --git a/plugins/pmcisconames/Makefile.am b/plugins/pmcisconames/Makefile.am new file mode 100644 index 00000000..16ed347d --- /dev/null +++ b/plugins/pmcisconames/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = pmcisconames.la + +pmcisconames_la_SOURCES = pmcisconames.c +pmcisconames_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) -I ../../tools +pmcisconames_la_LDFLAGS = -module -avoid-version +pmcisconames_la_LIBADD = + +EXTRA_DIST = diff --git a/plugins/pmcisconames/pmcisconames.c b/plugins/pmcisconames/pmcisconames.c new file mode 100644 index 00000000..4171e688 --- /dev/null +++ b/plugins/pmcisconames/pmcisconames.c @@ -0,0 +1,177 @@ +/* pmcisconames.c + * + * this detects logs sent by Cisco devices that mangle their syslog output when you tell them to log by name by adding ' :' between the name and the %XXX-X-XXXXXXX: tag + * + * instead of actually parsing the message, this modifies the message and then falls through to allow a later parser to handle the now modified message + * + * created 2010-12-13 by David Lang based on pmlastmsg + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <ctype.h> +#include "conf.h" +#include "syslogd-types.h" +#include "template.h" +#include "msg.h" +#include "module-template.h" +#include "glbl.h" +#include "errmsg.h" +#include "parser.h" +#include "datetime.h" +#include "unicode-helper.h" + +MODULE_TYPE_PARSER +PARSER_NAME("rsyslog.cisconames") + +/* internal structures + */ +DEF_PMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(glbl) +DEFobjCurrIf(parser) +DEFobjCurrIf(datetime) + + +/* static data */ +static int bParseHOSTNAMEandTAG; /* cache for the equally-named global param - performance enhancement */ + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATUREAutomaticSanitazion) + iRet = RS_RET_OK; + if(eFeat == sFEATUREAutomaticPRIParsing) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINparse + uchar *p2parse; + int lenMsg; +#define OpeningText ": %" +CODESTARTparse + dbgprintf("Message will now be parsed by fix Cisco Names parser.\n"); + assert(pMsg != NULL); + assert(pMsg->pszRawMsg != NULL); + lenMsg = pMsg->iLenRawMsg - pMsg->offAfterPRI; /* note: offAfterPRI is already the number of PRI chars (do not add one!) */ + p2parse = pMsg->pszRawMsg + pMsg->offAfterPRI; /* point to start of text, after PRI */ + + /* check if this message is of the type we handle in this (very limited) parser */ + /* first, we permit SP */ + while(lenMsg && *p2parse == ' ') { + --lenMsg; + ++p2parse; + } +dbgprintf("pmcisconames: msg to look at: [%d]'%s'\n", lenMsg, p2parse); + if((unsigned) lenMsg < 34) { + /* too short, can not be "our" message */ + /* minimum message, 16 character timestamp, 1 character name, ' : %ASA-1-000000: '*/ +dbgprintf("msg too short!\n"); + ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE); + } + /* check if the timestamp is a 16 character or 21 character timestamp + 'Mmm DD HH:MM:SS ' spaces at 3,6,15 : at 9,12 + 'Mmm DD YYYY HH:MM:SS ' spaces at 3,6,11,20 : at 14,17 + check for the : first as that will differentiate the two conditions the fastest + this allows the compiler to short circuit the rst of the tests if it is the wrong timestamp + but still check the rest to see if it looks correct + */ + if ( *(p2parse + 9) == ':' && *(p2parse + 12) == ':' && *(p2parse + 3) == ' ' && *(p2parse + 6) == ' ' && *(p2parse + 15) == ' ') { + /* skip over timestamp */ + dbgprintf("short timestamp found\n"); + lenMsg -=16; + p2parse +=16; + } else { + if ( *(p2parse + 14) == ':' && *(p2parse + 17) == ':' && *(p2parse + 3) == ' ' && *(p2parse + 6) == ' ' && *(p2parse + 11) == ' ' && *(p2parse + 20) == ' ') { + /* skip over timestamp */ + dbgprintf("long timestamp found\n"); + lenMsg -=21; + p2parse +=21; + } else { + dbgprintf("timestamp is not one of the valid formats\n"); + ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE); + } + } + /* now look for the next space to walk past the hostname */ + while(lenMsg && *p2parse != ' ') { + --lenMsg; + ++p2parse; + } + /* skip the space after the hostname */ + lenMsg -=1; + p2parse +=1; + /* if the syslog tag is : and the next thing starts with a % assume that this is a mangled cisco log and fix it */ + if(strncasecmp((char*) p2parse, OpeningText, sizeof(OpeningText)-1) != 0) { + /* wrong opening text */ +dbgprintf("not a cisco name mangled log!\n"); + ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE); + } + /* bump the message portion up by two characters to overwrite the extra : */ + lenMsg -=2; + memmove(p2parse, p2parse + 2, lenMsg); + *(p2parse + lenMsg) = '\n'; + *(p2parse + lenMsg + 1) = '\0'; + pMsg->iLenRawMsg -=2; + pMsg->iLenMSG -=2; + /* now, claim to abort so that something else can parse the now modified message */ + DBGPRINTF("pmcisconames: new mesage: [%d]'%s'\n", lenMsg, p2parse); + ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE); + +finalize_it: +ENDparse + + +BEGINmodExit +CODESTARTmodExit + /* release what we no longer need */ + objRelease(errmsg, CORE_COMPONENT); + objRelease(glbl, CORE_COMPONENT); + objRelease(parser, CORE_COMPONENT); + objRelease(datetime, CORE_COMPONENT); +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_PMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(parser, CORE_COMPONENT)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); + + DBGPRINTF("cisconames parser init called, compiled with version %s\n", VERSION); + bParseHOSTNAMEandTAG = glbl.GetParseHOSTNAMEandTAG(); /* cache value, is set only during rsyslogd option processing */ + + +ENDmodInit + +/* vim:set ai: + */ diff --git a/plugins/pmsnare/Makefile.am b/plugins/pmsnare/Makefile.am new file mode 100644 index 00000000..5b2696ac --- /dev/null +++ b/plugins/pmsnare/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = pmsnare.la + +pmsnare_la_SOURCES = pmsnare.c +pmsnare_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) -I ../../tools +pmsnare_la_LDFLAGS = -module -avoid-version +pmsnare_la_LIBADD = + +EXTRA_DIST = diff --git a/plugins/pmsnare/pmsnare.c b/plugins/pmsnare/pmsnare.c new file mode 100644 index 00000000..4a9880d4 --- /dev/null +++ b/plugins/pmsnare/pmsnare.c @@ -0,0 +1,238 @@ +/* pmsnare.c + * + * this detects logs sent by Snare and cleans them up so that they can be processed by the normal parser + * + * there are two variations of this, if the client is set to 'syslog' mode it sends + * + * <pri>timestamp<sp>hostname<sp>tag<tab>otherstuff + * + * if the client is not set to syslog it sends + * + * hostname<tab>tag<tab>otherstuff + * + * ToDo, take advantage of items in the message itself to set more friendly information + * where the normal parser will find it by re-writing more of the message + * + * Intereting information includes: + * + * in the case of windows snare messages: + * the system hostname is field 12 + * the severity is field 3 (criticality ranging form 0 to 4) + * the source of the log is field 4 and may be able to be mapped to facility + * + * + * created 2010-12-13 by David Lang based on pmlastmsg + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <ctype.h> +#include "conf.h" +#include "syslogd-types.h" +#include "template.h" +#include "msg.h" +#include "module-template.h" +#include "glbl.h" +#include "errmsg.h" +#include "parser.h" +#include "datetime.h" +#include "unicode-helper.h" + +MODULE_TYPE_PARSER +PARSER_NAME("rsyslog.snare") + +/* internal structures + */ +DEF_PMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(glbl) +DEFobjCurrIf(parser) +DEFobjCurrIf(datetime) + + +/* static data */ +static int bParseHOSTNAMEandTAG; /* cache for the equally-named global param - performance enhancement */ + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATUREAutomaticSanitazion) + iRet = RS_RET_OK; + if(eFeat == sFEATUREAutomaticPRIParsing) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINparse + uchar *p2parse; + int lenMsg; + int snaremessage; + int tablength; + +CODESTARTparse + #define TabRepresentation "#011" + tablength=sizeof(TabRepresentation); + dbgprintf("Message will now be parsed by fix Snare parser.\n"); + assert(pMsg != NULL); + assert(pMsg->pszRawMsg != NULL); + + /* check if this message is of the type we handle in this (very limited) parser + + find out if we have a space separated or tab separated for the first item + if tab separated see if the second word is one of our expected tags + if so replace the tabs with spaces so that hostname and syslog tag are going to be parsed properly + optionally replace the hostname at the beginning of the message with one from later in the message + else, wrong message, abort + else, assume that we have a valid timestamp, move over to the syslog tag + if that is tab separated from the rest of the message and one of our expected tags + if so, replace the tab with a space so that it will be parsed properly + optionally replace the hostname at the beginning of the message withone from later in the message + + */ + snaremessage=0; + lenMsg = pMsg->iLenRawMsg - pMsg->offAfterPRI; /* note: offAfterPRI is already the number of PRI chars (do not add one!) */ + p2parse = pMsg->pszRawMsg + pMsg->offAfterPRI; /* point to start of text, after PRI */ + dbgprintf("pmsnare: msg to look at: [%d]'%s'\n", lenMsg, p2parse); + if((unsigned) lenMsg < 30) { + /* too short, can not be "our" message */ + dbgprintf("msg too short!\n"); + ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE); + } + + while(lenMsg && *p2parse != ' ' && *p2parse != '\t' && *p2parse != '#') { + --lenMsg; + ++p2parse; + } + dbgprintf("pmsnare: separator [%d]'%s' msg after the first separator: [%d]'%s'\n", tablength,TabRepresentation,lenMsg, p2parse); + if ((lenMsg > tablength) && (*p2parse == '\t' || strncasecmp((char*) p2parse, TabRepresentation , tablength-1) == 0)) { + //if ((lenMsg > tablength) && (*p2parse == '\t' || *p2parse == '#')) { + dbgprintf("pmsnare: tab separated message\n"); + if(strncasecmp((char*) (p2parse + tablength - 1), "MSWinEventLog", 13) == 0) { + snaremessage=13; /* 0 means not a snare message, a number is how long the tag is */ + } + if(strncasecmp((char*) (p2parse + tablength - 1), "LinuxKAudit", 11) == 0) { + snaremessage=11; /* 0 means not a snare message, a number is how long the tag is */ + } + if(snaremessage) { + /* replace the tab with a space and if needed move the message portion up by the length of TabRepresentation -2 characters to overwrite the extra : */ + *p2parse = ' '; + lenMsg -=(tablength-2); + p2parse++; + lenMsg--; + memmove(p2parse, p2parse + (tablength-2), lenMsg); + *(p2parse + lenMsg) = '\n'; + *(p2parse + lenMsg + 1) = '\0'; + pMsg->iLenRawMsg -=(tablength-2); + pMsg->iLenMSG -=(tablength-2); + p2parse += snaremessage; + lenMsg -= snaremessage; + *p2parse = ' '; + p2parse++; + lenMsg--; + lenMsg -=(tablength-2); + memmove(p2parse, p2parse + (tablength-2), lenMsg); + *(p2parse + lenMsg) = '\n'; + *(p2parse + lenMsg + 1) = '\0'; + pMsg->iLenRawMsg -=(tablength-2); + pMsg->iLenMSG -=(tablength-2); + dbgprintf("found a Snare message with snare not set to send syslog messages\n"); + } + } else { + /* go back to the beginning of the message */ + lenMsg = pMsg->iLenRawMsg - pMsg->offAfterPRI; /* note: offAfterPRI is already the number of PRI chars (do not add one!) */ + p2parse = pMsg->pszRawMsg + pMsg->offAfterPRI; /* point to start of text, after PRI */ + /* skip over timestamp and space*/ + lenMsg -=17; + p2parse +=17; + /* skip over what should be the hostname */ + while(lenMsg && *p2parse != ' ') { + --lenMsg; + ++p2parse; + } + if (lenMsg){ + --lenMsg; + ++p2parse; + } + dbgprintf("pmsnare: separator [%d]'%s' msg after the timestamp and hostname: [%d]'%s'\n", tablength,TabRepresentation,lenMsg, p2parse); + if(lenMsg > 13 && strncasecmp((char*) p2parse, "MSWinEventLog", 13) == 0) { + snaremessage=13; /* 0 means not a snare message, a number is how long the tag is */ + } + if(lenMsg > 11 && strncasecmp((char*) p2parse, "LinuxKAudit", 11) == 0) { + snaremessage=11; /* 0 means not a snare message, a number is how long the tag is */ + } + if(snaremessage) { + p2parse += snaremessage; + lenMsg -= snaremessage; + *p2parse = ' '; + p2parse++; + lenMsg--; + lenMsg -=(tablength-2); + memmove(p2parse, p2parse + (tablength-2), lenMsg); + *(p2parse + lenMsg) = '\n'; + *(p2parse + lenMsg + 1) = '\0'; + pMsg->iLenRawMsg -=(tablength-2); + pMsg->iLenMSG -=(tablength-2); + dbgprintf("found a Snare message with snare set to send syslog messages\n"); + } + + } + DBGPRINTF("pmsnare: new message: [%d]'%s'\n", lenMsg, pMsg->pszRawMsg + pMsg->offAfterPRI); + ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE); + +finalize_it: +ENDparse + + +BEGINmodExit +CODESTARTmodExit + /* release what we no longer need */ + objRelease(errmsg, CORE_COMPONENT); + objRelease(glbl, CORE_COMPONENT); + objRelease(parser, CORE_COMPONENT); + objRelease(datetime, CORE_COMPONENT); +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_PMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(parser, CORE_COMPONENT)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); + + DBGPRINTF("snare parser init called, compiled with version %s\n", VERSION); + bParseHOSTNAMEandTAG = glbl.GetParseHOSTNAMEandTAG(); /* cache value, is set only during rsyslogd option processing */ + + +ENDmodInit + +/* vim:set ai: + */ |