summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imfile/imfile.c66
-rw-r--r--plugins/imptcp/imptcp.c269
-rw-r--r--plugins/imudp/Makefile.am2
-rw-r--r--plugins/imudp/imudp.c168
-rw-r--r--plugins/imuxsock/imuxsock.c111
-rw-r--r--plugins/omhdfs/omhdfs.c2
-rw-r--r--plugins/ommongodb/Makefile.am11
-rw-r--r--plugins/ommongodb/README23
-rw-r--r--plugins/ommongodb/ommongodb.c280
-rw-r--r--plugins/ommysql/ommysql.c39
-rw-r--r--plugins/pmaixforwardedfrom/Makefile.am8
-rw-r--r--plugins/pmaixforwardedfrom/pmaixforwardedfrom.c167
-rw-r--r--plugins/pmcisconames/Makefile.am8
-rw-r--r--plugins/pmcisconames/pmcisconames.c177
-rw-r--r--plugins/pmsnare/Makefile.am8
-rw-r--r--plugins/pmsnare/pmsnare.c238
16 files changed, 1415 insertions, 162 deletions
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index 36a2c015..bd44fd55 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -48,6 +48,7 @@
#include "unicode-helper.h"
#include "prop.h"
#include "stringbuf.h"
+#include "ruleset.h"
MODULE_TYPE_INPUT /* must be present for input modules, do not remove */
@@ -60,6 +61,7 @@ DEFobjCurrIf(glbl)
DEFobjCurrIf(datetime)
DEFobjCurrIf(strm)
DEFobjCurrIf(prop)
+DEFobjCurrIf(ruleset)
typedef struct fileInfo_s {
uchar *pszFileName;
@@ -71,6 +73,8 @@ typedef struct fileInfo_s {
int nRecords; /**< How many records did we process before persisting the stream? */
int iPersistStateInterval; /**< how often should state be persisted? (0=on close only) */
strm_t *pStrm; /* its stream (NULL if not assigned) */
+ int readMode; /* which mode to use in ReadMulteLine call? */
+ ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */
} fileInfo_t;
@@ -85,6 +89,8 @@ static int iPollInterval = 10; /* number of seconds to sleep when there was no f
static int iPersistStateInterval = 0; /* how often if state file to be persisted? (default 0->never) */
static int iFacility = 128; /* local0 */
static int iSeverity = 5; /* notice, as of rfc 3164 */
+static int readMode = 0; /* mode to use for ReadMultiLine call */
+static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
static int iFilPtr = 0; /* number of files to be monitored; pointer to next free spot during config */
#define MAX_INPUT_FILES 100
@@ -114,6 +120,7 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine)
MsgSetTAG(pMsg, pInfo->pszTag, pInfo->lenTag);
pMsg->iFacility = LOG_FAC(pInfo->iFacility);
pMsg->iSeverity = LOG_PRI(pInfo->iSeverity);
+ MsgSetRuleset(pMsg, pInfo->pRuleset);
CHKiRet(submitMsg(pMsg));
finalize_it:
RETiRet;
@@ -211,8 +218,8 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
}
/* loop below will be exited when strmReadLine() returns EOF */
- while(1) {
- CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr));
+ while(glbl.GetGlobalInputTermState() == 0) {
+ CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr, pThis->readMode));
*pbHadFileData = 1; /* this is just a flag, so set it and forget it */
CHKiRet(enqLine(pThis, pCStr)); /* process line */
rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */
@@ -287,23 +294,24 @@ BEGINrunInput
int bHadFileData; /* were there at least one file with data during this run? */
CODESTARTrunInput
pthread_cleanup_push(inputModuleCleanup, NULL);
- while(1) {
-
+ while(glbl.GetGlobalInputTermState() == 0) {
do {
bHadFileData = 0;
for(i = 0 ; i < iFilPtr ; ++i) {
+ if(glbl.GetGlobalInputTermState() == 1)
+ break; /* terminate input! */
pollFile(&files[i], &bHadFileData);
}
- } while(iFilPtr > 1 && bHadFileData == 1); /* warning: do...while()! */
+ } while(iFilPtr > 1 && bHadFileData == 1 && glbl.GetGlobalInputTermState() == 0); /* warning: do...while()! */
/* Note: the additional 10ns wait is vitally important. It guards rsyslog against totally
* hogging the CPU if the users selects a polling interval of 0 seconds. It doesn't hurt any
* other valid scenario. So do not remove. -- rgerhards, 2008-02-14
*/
- srSleep(iPollInterval, 10);
-
+ if(glbl.GetGlobalInputTermState() == 0)
+ srSleep(iPollInterval, 10);
}
- /*NOTREACHED*/
+ DBGPRINTF("imfile: terminating upon request of rsyslog core\n");
pthread_cleanup_pop(0); /* just for completeness, but never called... */
RETiRet; /* use it to make sure the housekeeping is done! */
@@ -396,6 +404,13 @@ CODESTARTafterRun
ENDafterRun
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
/* The following entry points are defined in module-template.h.
* In general, they need to be present, but you do NOT need to provide
* any code here.
@@ -408,12 +423,14 @@ CODESTARTmodExit
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
+ objRelease(ruleset, CORE_COMPONENT);
ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
@@ -447,6 +464,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iPollInterval = 10;
iFacility = 128; /* local0 */
iSeverity = 5; /* notice, as of rfc 3164 */
+ readMode = 0;
+ pBindRuleset = NULL;
RETiRet;
}
@@ -489,6 +508,8 @@ static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal)
pThis->iFacility = iFacility;
pThis->iPersistStateInterval = iPersistStateInterval;
pThis->nRecords = 0;
+ pThis->readMode = readMode;
+ pThis->pRuleset = pBindRuleset;
iPersistStateInterval = 0;
} else {
errmsg.LogError(0, RS_RET_OUT_OF_DESRIPTORS, "Too many file monitors configured - ignoring this one");
@@ -504,6 +525,29 @@ finalize_it:
RETiRet;
}
+
+/* accept a new ruleset to bind. Checks if it exists and complains, if not */
+static rsRetVal
+setRuleset(void __attribute__((unused)) *pVal, uchar *pszName)
+{
+ ruleset_t *pRuleset;
+ rsRetVal localRet;
+ DEFiRet;
+
+ localRet = ruleset.GetRuleset(&pRuleset, pszName);
+ if(localRet == RS_RET_NOT_FOUND) {
+ errmsg.LogError(0, NO_ERRCODE, "error: ruleset '%s' not found - ignored", pszName);
+ }
+ CHKiRet(localRet);
+ pBindRuleset = pRuleset;
+ DBGPRINTF("imfile current bind ruleset %p: '%s'\n", pRuleset, pszName);
+
+finalize_it:
+ free(pszName); /* no longer needed */
+ RETiRet;
+}
+
+
/* modInit() is called once the module is loaded. It must perform all module-wide
* initialization tasks. There are also a number of housekeeping tasks that the
* framework requires. These are handled by the macros. Please note that the
@@ -521,8 +565,10 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(strm, CORE_COMPONENT));
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
+ DBGPRINTF("imfile: version %s initializing\n", VERSION);
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilename", 0, eCmdHdlrGetWord,
NULL, &pszFileName, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfiletag", 0, eCmdHdlrGetWord,
@@ -535,8 +581,12 @@ CODEmodInit_QueryRegCFSLineHdlr
NULL, &iFacility, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepollinterval", 0, eCmdHdlrInt,
NULL, &iPollInterval, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilereadmode", 0, eCmdHdlrInt,
+ NULL, &readMode, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepersiststateinterval", 0, eCmdHdlrInt,
NULL, &iPersistStateInterval, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilebindruleset", 0, eCmdHdlrGetWord,
+ setRuleset, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
/* that command ads a new file! */
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputrunfilemonitor", 0, eCmdHdlrGetWord,
addMonitor, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 2ff292c2..63447a72 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -10,7 +10,7 @@
*
* File begun on 2010-08-10 by RGerhards
*
- * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -83,7 +83,8 @@ DEFobjCurrIf(datetime)
DEFobjCurrIf(errmsg)
DEFobjCurrIf(ruleset)
-
+/* forward references */
+static void * wrkr(void *myself);
/* config settings */
typedef struct configSettings_s {
@@ -92,6 +93,7 @@ typedef struct configSettings_s {
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
uchar *lstnIP; /* which IP we should listen on? */
ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ int wrkrMax; /* max number of workers (actually "helper workers") */
} configSettings_t;
static configSettings_t cs;
@@ -117,6 +119,7 @@ struct ptcpsrv_s {
ruleset_t *pRuleset;
ptcplstn_t *pLstn; /* root of our listeners */
ptcpsess_t *pSess; /* root of our sessions */
+ pthread_mutex_t mutSessLst;
};
/* the ptcp session object. Describes a single active session.
@@ -154,6 +157,20 @@ struct ptcplstn_s {
};
+/* The following structure controls the worker threads. Global data is
+ * needed for their access.
+ */
+static struct wrkrInfo_s {
+ pthread_t tid; /* the worker's thread ID */
+ pthread_cond_t run;
+ struct epoll_event *event; /* event == NULL -> idle */
+ long long unsigned numCalled; /* how often was this called */
+} wrkrInfo[16];
+static pthread_mutex_t wrkrMut;
+static pthread_cond_t wrkrIdle;
+static int wrkrRunning;
+
+
/* type of object stored in epoll descriptor */
typedef enum {
epolld_lstn,
@@ -171,20 +188,10 @@ struct epolld_s {
/* global data */
-//static permittedPeers_t *pPermPeersRoot = NULL;
+pthread_attr_t wrkrThrdAttr; /* Attribute for session threads; read only after startup */
static ptcpsrv_t *pSrvRoot = NULL;
static int epollfd = -1; /* (sole) descriptor for epoll */
static int iMaxLine; /* maximum size of a single message */
-/* we use a single static receive buffer, as this module is not multi-threaded. Keeping
- * the buffer in the data segment is probably a little bit more efficient than on the stack
- * (but at least I can't believe it will ever be less efficient ;) -- rgerhards, 2010-08-10
- * Note that we do NOT (yet?) provide a config setting to set the buffer size. For usual
- * syslog traffic, it should be large enough. Also keep in mind that we run under a virtual
- * memory system, so if we do not use large parts of the buffer, that's no issue at
- * all -- it'll just use up address space. On the other hand, it would be silly to page in
- * or page out some data just to get space for the IO buffer.
- */
-static char rcvBuf[128*1024];
/* forward definitions */
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal);
@@ -209,6 +216,7 @@ static void
destructSrv(ptcpsrv_t *pSrv)
{
prop.Destruct(&pSrv->pInputName);
+ pthread_mutex_destroy(&pSrv->mutSessLst);
free(pSrv->port);
free(pSrv);
}
@@ -678,6 +686,7 @@ static inline void
initConfigSettings(void)
{
cs.bEmitMsgOnClose = 0;
+ cs.wrkrMax = 2;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
cs.pszInputName = NULL;
cs.pRuleset = NULL;
@@ -790,10 +799,12 @@ addSess(ptcpsrv_t *pSrv, int sock, prop_t *peerName, prop_t *peerIP)
/* add to start of server's listener list */
pSess->prev = NULL;
+ pthread_mutex_lock(&pSrv->mutSessLst);
pSess->next = pSrv->pSess;
if(pSrv->pSess != NULL)
pSrv->pSess->prev = pSess;
pSrv->pSess = pSess;
+ pthread_mutex_unlock(&pSrv->mutSessLst);
iRet = addEPollSock(epolld_sess, pSess, sock, &pSess->epd);
@@ -816,10 +827,8 @@ closeSess(ptcpsess_t *pSess)
CHKiRet(removeEPollSock(sock, pSess->epd));
close(sock);
+ pthread_mutex_lock(&pSess->pSrv->mutSessLst);
/* finally unlink session from structures */
-//fprintf(stderr, "closing session %d next %p, prev %p\n", pSess->sock, pSess->next, pSess->prev);
-//DBGPRINTF("imptcp: pSess->next %p\n", pSess->next);
-//DBGPRINTF("imptcp: pSess->prev %p\n", pSess->prev);
if(pSess->next != NULL)
pSess->next->prev = pSess->prev;
if(pSess->prev == NULL) {
@@ -828,6 +837,7 @@ closeSess(ptcpsess_t *pSess)
} else {
pSess->prev->next = pSess->next;
}
+ pthread_mutex_unlock(&pSess->pSrv->mutSessLst);
/* unlinked, now remove structure */
destructSess(pSess);
@@ -838,21 +848,6 @@ finalize_it:
}
-#if 0
-/* set permitted peer -- rgerhards, 2008-05-19
- */
-static rsRetVal
-setPermittedPeer(void __attribute__((unused)) *pVal, uchar *pszID)
-{
- DEFiRet;
- CHKiRet(net.AddPermittedPeer(&pPermPeersRoot, pszID));
- free(pszID); /* no longer needed, but we need to free as of interface def */
-finalize_it:
- RETiRet;
-}
-#endif
-
-
/* accept a new ruleset to bind. Checks if it exists and complains, if not */
static rsRetVal setRuleset(void __attribute__((unused)) *pVal, uchar *pszName)
{
@@ -880,6 +875,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
ptcpsrv_t *pSrv;
CHKmalloc(pSrv = malloc(sizeof(ptcpsrv_t)));
+ pthread_mutex_init(&pSrv->mutSessLst, NULL);
pSrv->pSess = NULL;
pSrv->pLstn = NULL;
pSrv->bEmitMsgOnClose = cs.bEmitMsgOnClose;
@@ -911,6 +907,46 @@ finalize_it:
}
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+startWorkerPool(void)
+{
+ int i;
+ wrkrRunning = 0;
+ if(cs.wrkrMax > 16)
+ cs.wrkrMax = 16; /* TODO: make dynamic? */
+ pthread_mutex_init(&wrkrMut, NULL);
+ pthread_cond_init(&wrkrIdle, NULL);
+ for(i = 0 ; i < cs.wrkrMax ; ++i) {
+ /* init worker info structure! */
+ pthread_cond_init(&wrkrInfo[i].run, NULL);
+ wrkrInfo[i].event = NULL;
+ wrkrInfo[i].numCalled = 0;
+ pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i]));
+ }
+
+}
+
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+stopWorkerPool(void)
+{
+ int i;
+ for(i = 0 ; i < cs.wrkrMax ; ++i) {
+ pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */
+ pthread_join(wrkrInfo[i].tid, NULL);
+ DBGPRINTF("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
+ pthread_cond_destroy(&wrkrInfo[i].run);
+ }
+ pthread_cond_destroy(&wrkrIdle);
+ pthread_mutex_destroy(&wrkrMut);
+
+}
+
+
+
/* start up all listeners
* This is a one-time stop once the module is set to start.
*/
@@ -922,7 +958,7 @@ startupServers()
pSrv = pSrvRoot;
while(pSrv != NULL) {
- DBGPRINTF("Starting up ptcp server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName);
+ DBGPRINTF("imptcp: starting up server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName);
startupSrv(pSrv);
pSrv = pSrv->pNext;
}
@@ -944,9 +980,9 @@ lstnActivity(ptcplstn_t *pLstn)
DEFiRet;
DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock);
- while(1) {
+ while(glbl.GetGlobalInputTermState() == 0) {
localRet = AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP);
- if(localRet == RS_RET_NO_MORE_DATA)
+ if(localRet == RS_RET_NO_MORE_DATA || glbl.GetGlobalInputTermState() == 1)
break;
CHKiRet(localRet);
CHKiRet(addSess(pLstn->pSrv, newSock, peerName, peerIP));
@@ -965,6 +1001,7 @@ sessActivity(ptcpsess_t *pSess)
{
int lenRcv;
int lenBuf;
+ char rcvBuf[128*1024];
DEFiRet;
DBGPRINTF("imptcp: new activity on session socket %d\n", pSess->sock);
@@ -1002,35 +1039,127 @@ finalize_it:
}
+/* This function is called to process a single request. This may
+ * be carried out by the main worker or a helper. It can be run
+ * concurrently.
+ */
+static inline void
+processWorkItem(struct epoll_event *event)
+{
+ epolld_t *epd;
+
+ epd = (epolld_t*) event->data.ptr;
+ switch(epd->typ) {
+ case epolld_lstn:
+ lstnActivity((ptcplstn_t *) epd->ptr);
+ break;
+ case epolld_sess:
+ sessActivity((ptcpsess_t *) epd->ptr);
+ break;
+ default:
+ errmsg.LogError(0, RS_RET_INTERNAL_ERROR,
+ "error: invalid epolld_type_t %d after epoll", epd->typ);
+ break;
+ }
+}
+
+
+/* This function is called to process a complete workset, that
+ * is a set of events returned from epoll.
+ */
+static inline void
+processWorkSet(int nEvents, struct epoll_event events[])
+{
+ int iEvt;
+ int i;
+ int remainEvents;
+
+ remainEvents = nEvents;
+ for(iEvt = 0 ; (iEvt < nEvents) && (glbl.GetGlobalInputTermState() == 0) ; ++iEvt) {
+ if(remainEvents == 1) {
+ /* process self, save context switch */
+ processWorkItem(events+iEvt);
+ } else {
+ pthread_mutex_lock(&wrkrMut);
+ /* check if there is a free worker */
+ for(i = 0 ; (i < cs.wrkrMax) && (wrkrInfo[i].event != NULL) ; ++i)
+ /*do search*/;
+ if(i < cs.wrkrMax) {
+ /* worker free -> use it! */
+ wrkrInfo[i].event = events+iEvt;
+ ++wrkrRunning;
+ pthread_cond_signal(&wrkrInfo[i].run);
+ pthread_mutex_unlock(&wrkrMut);
+ } else {
+ pthread_mutex_unlock(&wrkrMut);
+ /* no free worker, so we process this one ourselfs */
+ processWorkItem(events+iEvt);
+ }
+ }
+ --remainEvents;
+ }
+
+ if(nEvents > 1) {
+ /* we now need to wait until all workers finish. This is because the
+ * rest of this module can not handle the concurrency introduced
+ * by workers running during the epoll call.
+ */
+ pthread_mutex_lock(&wrkrMut);
+ while(wrkrRunning > 0) {
+ pthread_cond_wait(&wrkrIdle, &wrkrMut);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+ }
+
+}
+
+
+/* worker to process incoming requests
+ */
+static void *
+wrkr(void *myself)
+{
+ struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself;
+
+ pthread_mutex_lock(&wrkrMut);
+ while(1) {
+ while(me->event == NULL && glbl.GetGlobalInputTermState() == 0) {
+ pthread_cond_wait(&me->run, &wrkrMut);
+ }
+ if(glbl.GetGlobalInputTermState() == 1)
+ break;
+ pthread_mutex_unlock(&wrkrMut);
+
+ ++me->numCalled;
+ processWorkItem(me->event);
+
+ pthread_mutex_lock(&wrkrMut);
+ me->event = NULL; /* indicate we are free again */
+ --wrkrRunning;
+ pthread_cond_signal(&wrkrIdle);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+
+ return NULL;
+}
+
+
/* This function is called to gather input.
*/
BEGINrunInput
- int i;
- int nfds;
- struct epoll_event events[1];
- epolld_t *epd;
+ int nEvents;
+ struct epoll_event events[128];
CODESTARTrunInput
- DBGPRINTF("imptcp now beginning to process input data\n");
- /* v5 TODO: consentual termination mode */
- while(1) {
+ startWorkerPool();
+ DBGPRINTF("imptcp: now beginning to process input data\n");
+ while(glbl.GetGlobalInputTermState() == 0) {
DBGPRINTF("imptcp going on epoll_wait\n");
- nfds = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1);
- for(i = 0 ; i < nfds ; ++i) { /* support for larger batches (later, TODO) */
- epd = (epolld_t*) events[i].data.ptr;
- switch(epd->typ) {
- case epolld_lstn:
- lstnActivity((ptcplstn_t *) epd->ptr);
- break;
- case epolld_sess:
- sessActivity((ptcpsess_t *) epd->ptr);
- break;
- default:
- errmsg.LogError(0, RS_RET_INTERNAL_ERROR,
- "error: invalid epolld_type_t %d after epoll", epd->typ);
- break;
- }
- }
+ nEvents = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1);
+ DBGPRINTF("imptcp: epoll returned %d events\n", nEvents);
+ processWorkSet(nEvents, events);
}
+ DBGPRINTF("imptcp: successfully terminated\n");
+ /* we stop the worker pool in AfterRun, in case we get cancelled for some reason (old Interface) */
ENDrunInput
@@ -1038,7 +1167,6 @@ ENDrunInput
BEGINwillRun
CODESTARTwillRun
/* first apply some config settings */
- //net.PrintAllowedSenders(2); /* TCP */
iMaxLine = glbl.GetMaxLine(); /* get maximum size we currently support */
if(pSrvRoot == NULL) {
@@ -1104,8 +1232,8 @@ shutdownSrv(ptcpsrv_t *pSrv)
BEGINafterRun
ptcpsrv_t *pSrv, *srvDel;
CODESTARTafterRun
- /* do cleanup here */
- //net.clearAllowedSenders(UCHAR_CONSTANT("TCP"));
+ stopWorkerPool();
+
/* we need to close everything that is still open */
pSrv = pSrvRoot;
while(pSrv != NULL) {
@@ -1121,12 +1249,7 @@ ENDafterRun
BEGINmodExit
CODESTARTmodExit
-#if 0
- if(pPermPeersRoot != NULL) {
- net.DestructPermittedPeers(&pPermPeersRoot);
- }
-#endif
-
+ pthread_attr_destroy(&wrkrThrdAttr);
/* release objects we used */
objRelease(glbl, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
@@ -1141,6 +1264,7 @@ static rsRetVal
resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
cs.bEmitMsgOnClose = 0;
+ cs.wrkrMax = 2;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
free(cs.pszInputName);
cs.pszInputName = NULL;
@@ -1150,10 +1274,17 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus
}
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
@@ -1170,6 +1301,10 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
+ /* initialize "read-only" thread attributes */
+ pthread_attr_init(&wrkrThrdAttr);
+ pthread_attr_setstacksize(&wrkrThrdAttr, 2048*1024);
+
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord,
addTCPListener, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
@@ -1177,6 +1312,8 @@ CODEmodInit_QueryRegCFSLineHdlr
eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt,
NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt,
+ NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0,
eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0,
diff --git a/plugins/imudp/Makefile.am b/plugins/imudp/Makefile.am
index 517b1287..bc64b8c8 100644
--- a/plugins/imudp/Makefile.am
+++ b/plugins/imudp/Makefile.am
@@ -3,4 +3,4 @@ pkglib_LTLIBRARIES = imudp.la
imudp_la_SOURCES = imudp.c
imudp_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS)
imudp_la_LDFLAGS = -module -avoid-version
-imudp_la_LIBADD =
+imudp_la_LIBADD = $(IMUDP_LIBS)
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index b960322e..d347b0ac 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -35,6 +35,9 @@
#if HAVE_SYS_EPOLL_H
# include <sys/epoll.h>
#endif
+#ifdef HAVE_SCHED_H
+# include <sched.h>
+#endif
#include "rsyslog.h"
#include "dirty.h"
#include "net.h"
@@ -78,14 +81,103 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a
* termination if we can not get it. -- rgerhards, 2007-12-27
*/
static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */
+static uchar *pszSchedPolicy = NULL; /* scheduling policy string */
+static int iSchedPolicy; /* scheduling policy as SCHED_xxx */
+static int iSchedPrio; /* scheduling priority */
+static int seen_iSchedPrio = 0; /* have we seen scheduling priority in the config file? */
static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
-static uchar *pszSchedPolicy = NULL; /**< scheduling policy (string) */
-static int iSchedPrio = -1; /**< scheduling priority (must not be negative) */
#define TIME_REQUERY_DFLT 2
static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */
/* config settings */
+static rsRetVal check_scheduling_priority(int report_error)
+{
+ DEFiRet;
+
+#ifdef HAVE_SCHED_GET_PRIORITY_MAX
+ if (iSchedPrio < sched_get_priority_min(iSchedPolicy) ||
+ iSchedPrio > sched_get_priority_max(iSchedPolicy)) {
+ if (report_error)
+ errmsg.LogError(errno, NO_ERRCODE,
+ "imudp: scheduling priority %d out of range (%d - %d)"
+ " for scheduling policy '%s' - ignoring settings",
+ iSchedPrio,
+ sched_get_priority_min(iSchedPolicy),
+ sched_get_priority_max(iSchedPolicy),
+ pszSchedPolicy);
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+#endif
+
+finalize_it:
+ RETiRet;
+}
+
+/* Set scheduling priority in the supplied variable (will be iSchedPrio)
+ * and record that we have seen the directive (in seen_iSchedPrio).
+ */
+static rsRetVal set_scheduling_priority(void *pVal, int value)
+{
+ DEFiRet;
+
+ if (seen_iSchedPrio) {
+ errmsg.LogError(0, NO_ERRCODE, "directive already seen");
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+ *(int *)pVal = value;
+ seen_iSchedPrio = 1;
+ if (pszSchedPolicy != NULL)
+ CHKiRet(check_scheduling_priority(1));
+
+finalize_it:
+ RETiRet;
+}
+
+/* Set scheduling policy in iSchedPolicy */
+static rsRetVal set_scheduling_policy(void *pVal, uchar *pNewVal)
+{
+ int have_sched_policy = 0;
+ DEFiRet;
+
+ if (pszSchedPolicy != NULL) {
+ errmsg.LogError(0, NO_ERRCODE, "directive already seen");
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+ *((uchar**)pVal) = pNewVal; /* pVal is pszSchedPolicy */
+ if (0) { /* trick to use conditional compilation */
+#ifdef SCHED_FIFO
+ } else if (!strcasecmp((char*)pszSchedPolicy, "fifo")) {
+ iSchedPolicy = SCHED_FIFO;
+ have_sched_policy = 1;
+#endif
+#ifdef SCHED_RR
+ } else if (!strcasecmp((char*)pszSchedPolicy, "rr")) {
+ iSchedPolicy = SCHED_RR;
+ have_sched_policy = 1;
+#endif
+#ifdef SCHED_OTHER
+ } else if (!strcasecmp((char*)pszSchedPolicy, "other")) {
+ iSchedPolicy = SCHED_OTHER;
+ have_sched_policy = 1;
+#endif
+ } else {
+ errmsg.LogError(errno, NO_ERRCODE,
+ "imudp: invalid scheduling policy '%s' "
+ "- ignoring setting", pszSchedPolicy);
+ }
+ if (have_sched_policy == 0) {
+ free(pszSchedPolicy);
+ pszSchedPolicy = NULL;
+ ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+ }
+ if (seen_iSchedPrio)
+ CHKiRet(check_scheduling_priority(1));
+
+finalize_it:
+ RETiRet;
+}
+
/* This function is called when a new listener shall be added. It takes
* the configured parameters, tries to bind the socket and, if that
@@ -296,6 +388,41 @@ finalize_it:
RETiRet;
}
+static void set_thread_schedparam(void)
+{
+ struct sched_param sparam;
+
+ if (pszSchedPolicy != NULL && seen_iSchedPrio == 0) {
+ errmsg.LogError(0, NO_ERRCODE,
+ "imudp: scheduling policy set, but without priority - ignoring settings");
+ } else if (pszSchedPolicy == NULL && seen_iSchedPrio != 0) {
+ errmsg.LogError(0, NO_ERRCODE,
+ "imudp: scheduling priority set, but without policy - ignoring settings");
+ } else if (pszSchedPolicy != NULL && seen_iSchedPrio != 0 &&
+ check_scheduling_priority(0) == 0) {
+#ifndef HAVE_PTHREAD_SETSCHEDPARAM
+ errmsg.LogError(0, NO_ERRCODE,
+ "imudp: cannot set thread scheduling policy, "
+ "pthread_setschedparam() not available");
+#else
+ int err;
+
+ memset(&sparam, 0, sizeof sparam);
+ sparam.sched_priority = iSchedPrio;
+ dbgprintf("imudp trying to set sched policy to '%s', prio %d\n",
+ pszSchedPolicy, iSchedPrio);
+ err = pthread_setschedparam(pthread_self(), iSchedPolicy, &sparam);
+ if (err != 0) {
+ errmsg.LogError(err, NO_ERRCODE, "imudp: pthread_setschedparam() failed");
+ }
+#endif
+ }
+
+ if (pszSchedPolicy != NULL) {
+ free(pszSchedPolicy);
+ pszSchedPolicy = NULL;
+ }
+}
/* This function implements the main reception loop. Depending on the environment,
* we either use the traditional (but slower) select() or the Linux-specific epoll()
@@ -319,6 +446,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
*/
+ set_thread_schedparam();
bIsPermitted = 0;
memset(&frominetPrev, 0, sizeof(frominetPrev));
@@ -386,6 +514,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
*/
+ set_thread_schedparam();
bIsPermitted = 0;
memset(&frominetPrev, 0, sizeof(frominetPrev));
DBGPRINTF("imudp uses select()\n");
@@ -448,7 +577,6 @@ ENDrunInput
/* initialize and return if will run or not */
BEGINwillRun
- struct sched_param sparam;
CODESTARTwillRun
/* we need to create the inputName property (only once during our lifetime) */
CHKiRet(prop.Construct(&pInputName));
@@ -457,40 +585,6 @@ CODESTARTwillRun
net.PrintAllowedSenders(1); /* UDP */
net.HasRestrictions(UCHAR_CONSTANT("UDP"), &bDoACLCheck); /* UDP */
-
- if(pszSchedPolicy == NULL) {
- if(iSchedPrio != -1) {
- errmsg.LogError(errno, NO_ERRCODE, "imudp: scheduling policy not set, but "
- "priority - ignoring settings");
- }
- } else {
- if(iSchedPrio == -1) {
- errmsg.LogError(errno, NO_ERRCODE, "imudp: scheduling policy set, but no "
- "priority - ignoring settings");
- }
- sparam.sched_priority = iSchedPrio;
- dbgprintf("imudp trying to set sched policy to '%s', prio %d\n",
- pszSchedPolicy, iSchedPrio);
- if(0) { /* trick to use conditional compilation */
-# ifdef SCHED_FIFO
- } else if(!strcasecmp((char*)pszSchedPolicy, "fifo")) {
- pthread_setschedparam(pthread_self(), SCHED_FIFO, &sparam);
-# endif
-# ifdef SCHED_RR
- } else if(!strcasecmp((char*)pszSchedPolicy, "rr")) {
- pthread_setschedparam(pthread_self(), SCHED_RR, &sparam);
-# endif
-# ifdef SCHED_OTHER
- } else if(!strcasecmp((char*)pszSchedPolicy, "other")) {
- pthread_setschedparam(pthread_self(), SCHED_OTHER, &sparam);
-# endif
- } else {
- errmsg.LogError(errno, NO_ERRCODE, "imudp: invliad scheduling policy '%s' "
- "ignoring settings", pszSchedPolicy);
- }
- free(pszSchedPolicy);
- pszSchedPolicy = NULL;
- }
/* if we could not set up any listners, there is no point in running... */
if(udpLstnSocks == NULL)
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index c1168c87..b90a3363 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -46,6 +46,7 @@
#include "net.h"
#include "glbl.h"
#include "msg.h"
+#include "parser.h"
#include "prop.h"
#include "debug.h"
#include "unlimited_select.h"
@@ -81,6 +82,7 @@ DEF_IMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
DEFobjCurrIf(prop)
+DEFobjCurrIf(parser)
DEFobjCurrIf(datetime)
DEFobjCurrIf(statsobj)
@@ -143,7 +145,7 @@ static int startIndexUxLocalSockets; /* process fd from that index on (used to
* read-only after startup
*/
static int nfd = 1; /* number of Unix sockets open / read-only after startup */
-static int bSysSockFromSystemd = 0; /* Did we receive the system socket from systemd? */
+static int sd_fds = 0; /* number of systemd activated sockets */
/* config settings */
static int bOmitLocalLogging = 0;
@@ -372,41 +374,32 @@ openLogSocket(lstn_t *pLstn)
if(pLstn->sockName[0] == '\0')
return -1;
- if (ustrcmp(pLstn->sockName, UCHAR_CONSTANT(_PATH_LOG)) == 0) {
- bSysSockFromSystemd = 0; /* set default */
- int r;
-
- /* System log socket code. Check whether an FD was passed in from systemd. If
- * so, it's the /dev/log socket, so use it. */
-
- r = sd_listen_fds(0);
- if (r < 0) {
- errmsg.LogError(-r, NO_ERRCODE, "Failed to acquire systemd socket");
- ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
- }
-
- if (r > 1) {
- errmsg.LogError(EINVAL, NO_ERRCODE, "Wrong number of systemd sockets passed");
- ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
- }
-
- if (r == 1) {
- pLstn->fd = SD_LISTEN_FDS_START;
- r = sd_is_socket_unix(pLstn->fd, SOCK_DGRAM, -1, _PATH_LOG, 0);
- if (r < 0) {
- errmsg.LogError(-r, NO_ERRCODE, "Failed to verify systemd socket type");
- ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
- }
-
- if (!r) {
- errmsg.LogError(EINVAL, NO_ERRCODE, "Passed systemd socket of wrong type");
- ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
- }
- bSysSockFromSystemd = 1; /* indicate we got the socket from systemd */
- } else {
- CHKiRet(createLogSocket(pLstn));
+ pLstn->fd = -1;
+
+ if (sd_fds > 0) {
+ /* Check if the current socket is a systemd activated one.
+ * If so, just use it.
+ */
+ int fd;
+
+ for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + sd_fds; fd++) {
+ if( sd_is_socket_unix(fd, SOCK_DGRAM, -1, (const char*) pLstn->sockName, 0) == 1) {
+ /* ok, it matches -- just use as is */
+ pLstn->fd = fd;
+
+ dbgprintf("imuxsock: Acquired UNIX socket '%s' (fd %d) from systemd.\n",
+ pLstn->sockName, pLstn->fd);
+ break;
+ }
+ /*
+ * otherwise it either didn't matched *this* socket and
+ * we just continue to check the next one or there were
+ * an error and we will create a new socket bellow.
+ */
}
- } else {
+ }
+
+ if (pLstn->fd == -1) {
CHKiRet(createLogSocket(pLstn));
}
@@ -510,6 +503,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
{
msg_t *pMsg;
int lenMsg;
+ int offs;
int i;
uchar *parse;
int pri;
@@ -527,13 +521,14 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
*/
parse = pRcv;
lenMsg = lenRcv;
+ offs = 1; /* '<' */
- parse++; lenMsg--; /* '<' */
+ parse++;
pri = 0;
- while(lenMsg && isdigit(*parse)) {
+ while(offs < lenMsg && isdigit(*parse)) {
pri = pri * 10 + *parse - '0';
++parse;
- --lenMsg;
+ ++offs;
}
facil = LOG_FAC(pri);
sever = LOG_PRI(pri);
@@ -552,12 +547,14 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
/* we now create our own message object and submit it to the queue */
CHKiRet(msgConstructWithTime(&pMsg, &st, tt));
MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv);
+ parser.SanitizeMsg(pMsg);
+ lenMsg = pMsg->iLenRawMsg - offs;
MsgSetInputName(pMsg, pInputName);
MsgSetFlowControlType(pMsg, pLstn->flowCtl);
pMsg->iFacility = facil;
pMsg->iSeverity = sever;
- MsgSetAfterPRIOffs(pMsg, lenRcv - lenMsg);
+ MsgSetAfterPRIOffs(pMsg, offs);
parse++; lenMsg--; /* '>' */
@@ -577,7 +574,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred)
fixPID(bufParseTAG, &i, cred);
MsgSetTAG(pMsg, bufParseTAG, i);
- MsgSetMSGoffs(pMsg, lenRcv - lenMsg);
+ MsgSetMSGoffs(pMsg, pMsg->iLenRawMsg - lenMsg);
if(pLstn->bParseHost) {
pMsg->msgFlags = pLstn->flags | PARSE_HOSTNAME;
@@ -609,7 +606,9 @@ static rsRetVal readSocket(lstn_t *pLstn)
int iMaxLine;
struct msghdr msgh;
struct iovec msgiov;
+# if HAVE_SCM_CREDENTIALS
struct cmsghdr *cm;
+# endif
struct ucred *cred;
uchar bufRcv[4096+1];
char aux[128];
@@ -633,11 +632,13 @@ static rsRetVal readSocket(lstn_t *pLstn)
memset(&msgh, 0, sizeof(msgh));
memset(&msgiov, 0, sizeof(msgiov));
+# if HAVE_SCM_CREDENTIALS
if(pLstn->bUseCreds) {
memset(&aux, 0, sizeof(aux));
msgh.msg_control = aux;
msgh.msg_controllen = sizeof(aux);
}
+# endif
msgiov.iov_base = pRcv;
msgiov.iov_len = iMaxLine;
msgh.msg_iov = &msgiov;
@@ -774,12 +775,18 @@ CODESTARTwillRun
listeners[0].bUseCreds = (bWritePidSysSock || ratelimitIntervalSysSock) ? 1 : 0;
listeners[0].bWritePid = bWritePidSysSock;
+ sd_fds = sd_listen_fds(0);
+ if (sd_fds < 0) {
+ errmsg.LogError(-sd_fds, NO_ERRCODE, "imuxsock: Failed to acquire systemd socket");
+ ABORT_FINALIZE(RS_RET_ERR_CRE_AFUX);
+ }
+
/* initialize and return if will run or not */
actSocks = 0;
for (i = startIndexUxLocalSockets ; i < nfd ; i++) {
if(openLogSocket(&(listeners[i])) == RS_RET_OK) {
++actSocks;
- dbgprintf("Opened UNIX socket '%s' (fd %d).\n", listeners[i].sockName, listeners[i].fd);
+ dbgprintf("imuxsock: Opened UNIX socket '%s' (fd %d).\n", listeners[i].sockName, listeners[i].fd);
}
}
@@ -806,15 +813,19 @@ CODESTARTafterRun
if (listeners[i].fd != -1)
close(listeners[i].fd);
- /* Clean-up files. If systemd passed us a socket it is
- * systemd's job to clean it up.*/
- if(bSysSockFromSystemd) {
- DBGPRINTF("imuxsock: got system socket from systemd, not unlinking it\n");
- i = 1;
- } else
- i = startIndexUxLocalSockets;
- for(; i < nfd; i++)
+ /* Clean-up files. */
+ for(i = startIndexUxLocalSockets; i < nfd; i++)
if (listeners[i].sockName && listeners[i].fd != -1) {
+
+ /* If systemd passed us a socket it is systemd's job to clean it up.
+ * Do not unlink it -- we will get same socket (node) from systemd
+ * e.g. on restart again.
+ */
+ if (sd_fds > 0 &&
+ listeners[i].fd >= SD_LISTEN_FDS_START &&
+ listeners[i].fd < SD_LISTEN_FDS_START + sd_fds)
+ continue;
+
DBGPRINTF("imuxsock: unlinking unix socket file[%d] %s\n", i, listeners[i].sockName);
unlink((char*) listeners[i].sockName);
}
@@ -835,6 +846,7 @@ BEGINmodExit
CODESTARTmodExit
statsobj.Destruct(&modStats);
+ objRelease(parser, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
@@ -896,6 +908,7 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(prop, CORE_COMPONENT));
CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(objUse(datetime, CORE_COMPONENT));
+ CHKiRet(objUse(parser, CORE_COMPONENT));
dbgprintf("imuxsock version %s initializing\n", PACKAGE_VERSION);
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
index 1fe9b46b..1bf10bd7 100644
--- a/plugins/omhdfs/omhdfs.c
+++ b/plugins/omhdfs/omhdfs.c
@@ -477,7 +477,7 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKmalloc(files = create_hashtable(20, hash_from_string, key_equals_string,
fileObjDestruct4Hashtable));
- CHKiRet(regCfSysLineHdlr((uchar *)"omhdfscs.fileName", 0, eCmdHdlrGetWord, NULL, &cs.fileName, NULL, eConfObjAction));
+ CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsfilename", 0, eCmdHdlrGetWord, NULL, &cs.fileName, NULL, eConfObjAction));
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfshost", 0, eCmdHdlrGetWord, NULL, &cs.hdfsHost, NULL, eConfObjAction));
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsport", 0, eCmdHdlrInt, NULL, &cs.hdfsPort, NULL, eConfObjAction));
CHKiRet(regCfSysLineHdlr((uchar *)"omhdfsdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &cs.dfltTplName, NULL, eConfObjAction));
diff --git a/plugins/ommongodb/Makefile.am b/plugins/ommongodb/Makefile.am
new file mode 100644
index 00000000..1b0e23a1
--- /dev/null
+++ b/plugins/ommongodb/Makefile.am
@@ -0,0 +1,11 @@
+mongodir = ./mongo-c-driver/src
+pkglib_LTLIBRARIES = ommongodb.la
+
+ommongodb_la_SOURCES = ommongodb.c
+ommongodb_la_SOURCES += $(mongodir)/bson.c $(mongodir)/mongo.c $(mongodir)/md5.c $(mongodir)/numbers.c
+
+ommongodb_la_CPPFLAGS = -DMONGO_HAVE_STDINT -Imongo-c-driver/src $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
+ommongodb_la_LDFLAGS = -module -avoid-version
+ommongodb_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/ommongodb/README b/plugins/ommongodb/README
new file mode 100644
index 00000000..cea3f3bc
--- /dev/null
+++ b/plugins/ommongodb/README
@@ -0,0 +1,23 @@
+plugin to use MongoDB as backend.
+
+tested in ubuntu 10.04 and ubuntu 10.10
+
+configuration:
+
+in your /etc/rsyslog.conf, together with other modules:
+$ModLoad ommongodb # provides mongodb support
+
+then in your /etc/rsyslog.d (check your distribution way to organize the configuration..) you create a file 10-mongodb.conf with the following content:
+
+#the format for the driver is :ommongodb:ip:db:collection;StdMongoDBFmt
+#if you want to change what is logged in the db, the template, you must change the source code since the keys are hardcoded
+$template StdMongoDBFmt,"%msg%%syslogfacility%%HOSTNAME%%syslogpriority%"
+*.* :ommongodb:127.0.0.1,syslog,logs;StdMongoDBFmt
+
+
+TODO
+we must ensure that the collection is a capped collection
+refactor my code :-)
+
+email Victor Pereira <victor.pereira@bigrails.com>
+twitter twitter.com/vpereira
diff --git a/plugins/ommongodb/ommongodb.c b/plugins/ommongodb/ommongodb.c
new file mode 100644
index 00000000..8e19105f
--- /dev/null
+++ b/plugins/ommongodb/ommongodb.c
@@ -0,0 +1,280 @@
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <ctype.h>
+#include <errno.h>
+#include <assert.h>
+#include <signal.h>
+#include <time.h>
+#include "bson.h"
+#include "mongo.h"
+#include "config.h"
+#include "rsyslog.h"
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "cfsysline.h"
+#include "mongo-c-driver/src/mongo.h"
+
+#define countof(X) ( (size_t) ( sizeof(X)/sizeof*(X) ) )
+
+#define DEFAULT_SERVER "127.0.0.1"
+#define DEFAULT_DATABASE "syslog"
+#define DEFAULT_COLLECTION "log"
+#define DEFAULT_DB_COLLECTION "syslog.log"
+
+//i just defined some constants, i couldt not find the limit
+#define MONGO_DB_NAME_SIZE 128
+#define MONGO_COLLECTION_NAME_SIZE 128
+
+MODULE_TYPE_OUTPUT
+/* internal structures
+ */
+DEF_OMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+
+typedef struct _instanceData {
+ mongo_connection conn[1]; /* ptr */
+ mongo_connection_options opts[1];
+ mongo_conn_return status;
+ char db[MONGO_DB_NAME_SIZE];
+ char collection[MONGO_COLLECTION_NAME_SIZE];
+ char dbcollection[MONGO_DB_NAME_SIZE + MONGO_COLLECTION_NAME_SIZE + 1];
+ unsigned uLastMongoDBErrno;
+ //unsigned iSrvPort; /* sample: server port */
+} instanceData;
+
+char db[_DB_MAXDBLEN+2];
+static int iSrvPort = 27017;
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ /* use this to specify if select features are supported by this
+ * plugin. If not, the framework will handle that. Currently, only
+ * RepeatedMsgReduction ("last message repeated n times") is optional.
+ */
+ if(eFeat == sFEATURERepeatedMsgReduction)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+static void closeMongoDB(instanceData *pData)
+{
+ ASSERT(pData != NULL);
+
+ if(pData->conn != NULL) {
+ mongo_destroy( pData->conn );
+ memset(pData->conn,0x00,sizeof(mongo_connection));
+ }
+}
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ closeMongoDB(pData);
+ENDfreeInstance
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ /* nothing special here */
+ENDdbgPrintInstInfo
+
+/* log a database error with descriptive message.
+ * We check if we have a valid MongoDB handle. If not, we simply
+ * report an error
+ */
+static void reportDBError(instanceData *pData, int bSilent)
+{
+ char errMsg[512];
+ bson ErrObj;
+
+ ASSERT(pData != NULL);
+
+ /* output log message */
+ errno = 0;
+ if(pData->conn == NULL) {
+ errmsg.LogError(0, NO_ERRCODE, "unknown DB error occured - could not obtain MongoDB handle");
+ } else { /* we can ask mysql for the error description... */
+ //we should handle the error. if bSilent is set then we should print as debug
+ mongo_cmd_get_last_error(pData->conn, pData->db, &ErrObj);
+ bson_destroy(&ErrObj);
+ }
+
+ return;
+}
+
+/* The following function is responsible for initializing a
+ * MySQL connection.
+ * Initially added 2004-10-28 mmeckelein
+ */
+static rsRetVal initMongoDB(instanceData *pData, int bSilent)
+{
+ DEFiRet;
+
+ ASSERT(pData != NULL);
+ ASSERT(pData->conn == NULL);
+
+ //I'm trying to fallback to a default here
+ if(pData->opts->port == 0)
+ pData->opts->port = 27017;
+
+ if(pData->opts->host == 0x00)
+ strcpy(pData->opts->host,DEFAULT_SERVER);
+
+ if(pData->dbcollection == 0x00)
+ strcpy(pData->dbcollection,DEFAULT_DB_COLLECTION);
+
+ pData->status = mongo_connect(pData->conn, pData->opts );
+
+ switch (pData->status) {
+ case mongo_conn_success:
+ fprintf(stderr, "connection succeeded\n" );
+ iRet = RS_RET_OK;
+ break;
+ case mongo_conn_bad_arg:
+ errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle");
+ fprintf(stderr, "bad arguments\n" );
+ iRet = RS_RET_SUSPENDED;
+ break;
+ case mongo_conn_no_socket:
+ errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle");
+ fprintf(stderr, "no socket\n" );
+ iRet = RS_RET_SUSPENDED;
+ break;
+ case mongo_conn_fail:
+ errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle");
+ fprintf(stderr, "connection failed\n" );
+ iRet = RS_RET_SUSPENDED;
+ break;
+ case mongo_conn_not_master:
+ errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MongoDB handle");
+ fprintf(stderr, "not master\n" );
+ iRet = RS_RET_SUSPENDED;
+ break;
+ }
+ RETiRet;
+}
+
+//we must implement it
+rsRetVal writeMongoDB(uchar *psz, instanceData *pData)
+{
+ char mydate[32];
+ char **szParams;
+ bson b[1];
+ bson_buffer buf[1];
+ bson_buffer_init( buf );
+ bson_append_new_oid(buf, "_id" );
+ memset(mydate,0x00,32);
+
+
+ DEFiRet;
+
+ ASSERT(psz != NULL);
+ ASSERT(pData != NULL);
+
+
+ /* see if we are ready to proceed */
+ if(pData->conn == NULL) {
+ CHKiRet(initMongoDB(pData, 0));
+ }
+
+szParams = (char**)(void*) psz;
+//We can make it beter
+//if you change the fields in your template, we must update it here
+//there is any C_metaprogramming_ninja there? :-)
+if(countof(szParams) > 0)
+{
+ bson_append_string( buf, "msg", szParams[0]);
+ bson_append_string( buf, "facility",szParams[1]);
+ bson_append_string( buf, "hostname", szParams[2] );
+ bson_append_string(buf, "priority",szParams[3]);
+ bson_append_int(buf,"count",countof(szParams));
+ bson_from_buffer( b, buf );
+ mongo_insert(pData->conn, pData->dbcollection, b );
+}
+
+if(b)
+ bson_destroy(b);
+
+
+ finalize_it:
+ if(iRet == RS_RET_OK) {
+ pData->uLastMongoDBErrno = 0; /* reset error for error supression */
+ }
+
+
+ RETiRet;
+}
+
+BEGINtryResume
+CODESTARTtryResume
+ if(pData->conn == NULL) {
+ iRet = initMongoDB(pData, 1);
+ }
+ENDtryResume
+
+BEGINdoAction
+CODESTARTdoAction
+ iRet = writeMongoDB(ppString[0], pData);
+ENDdoAction
+
+BEGINparseSelectorAct
+ //int iMongoDBPropErr = 0;
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+
+ if(!strncmp((char*) p, ":ommongodb:", sizeof(":ommongodb:") - 1)) {
+ p += sizeof(":ommongodb:") - 1; /* eat indicator sequence (-1 because of '\0'!) */
+ } else {
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+ }
+
+ CHKiRet(createInstance(&pData));
+
+ if(getSubString(&p, pData->opts->host, MAXHOSTNAMELEN+1, ','))
+ strcpy(pData->opts->host,DEFAULT_SERVER);
+
+ //we must define the max db name
+ if(getSubString(&p,pData->db,255,','))
+ strcpy(pData->db,DEFAULT_DATABASE);
+ if(getSubString(&p,pData->collection,255,';'))
+ strcpy(pData->collection,DEFAULT_COLLECTION);
+ if(*(p-1) == ';')
+ --p;
+
+
+ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_TPL_AS_ARRAY, (uchar*) " StdMongoDBFmt"));
+
+
+ pData->opts->port = (unsigned) iSrvPort; /* set configured port */
+ sprintf(pData->dbcollection,"%s.%s",pData->db,pData->collection);
+ CHKiRet(initMongoDB(pData, 0));
+
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINmodExit
+CODESTARTmodExit
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+ENDqueryEtryPt
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
+ DBGPRINTF("ompgsql: module compiled with rsyslog version %s.\n", VERSION);
+ DBGPRINTF("ompgsql: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not ");
+ENDmodInit \ No newline at end of file
diff --git a/plugins/ommysql/ommysql.c b/plugins/ommysql/ommysql.c
index 5b44d687..4b9d2f7e 100644
--- a/plugins/ommysql/ommysql.c
+++ b/plugins/ommysql/ommysql.c
@@ -62,10 +62,14 @@ typedef struct _instanceData {
char f_dbuid[_DB_MAXUNAMELEN+1]; /* DB user */
char f_dbpwd[_DB_MAXPWDLEN+1]; /* DB user's password */
unsigned uLastMySQLErrno; /* last errno returned by MySQL or 0 if all is well */
+ uchar * f_configfile; /* MySQL Client Configuration File */
+ uchar * f_configsection; /* MySQL Client Configuration Section */
} instanceData;
typedef struct configSettings_s {
int iSrvPort; /* database server port */
+ uchar *pszMySQLConfigFile; /* MySQL Client Configuration File */
+ uchar *pszMySQLConfigSection; /* MySQL Client Configuration Section */
} configSettings_t;
SCOPING_SUPPORT; /* must be set AFTER configSettings_t is defined */
@@ -101,6 +105,14 @@ static void closeMySQL(instanceData *pData)
mysql_close(pData->f_hmysql);
pData->f_hmysql = NULL;
}
+ if(pData->f_configfile!=NULL){
+ free(pData->f_configfile);
+ pData->f_configfile=NULL;
+ }
+ if(pData->f_configsection!=NULL){
+ free(pData->f_configsection);
+ pData->f_configsection=NULL;
+ }
}
BEGINfreeInstance
@@ -162,6 +174,25 @@ static rsRetVal initMySQL(instanceData *pData, int bSilent)
errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MySQL handle");
iRet = RS_RET_SUSPENDED;
} else { /* we could get the handle, now on with work... */
+ mysql_options(pData->f_hmysql,MYSQL_READ_DEFAULT_GROUP,((pData->f_configsection!=NULL)?(char*)pData->f_configsection:"client"));
+ if(pData->f_configfile!=NULL){
+ FILE * fp;
+ fp=fopen((char*)pData->f_configfile,"r");
+ int err=errno;
+ if(fp==NULL){
+ char msg[512];
+ snprintf(msg,sizeof(msg)/sizeof(char),"Could not open '%s' for reading",pData->f_configfile);
+ if(bSilent) {
+ char errStr[512];
+ rs_strerror_r(err, errStr, sizeof(errStr));
+ dbgprintf("mysql configuration error(%d): %s - %s\n",err,msg,errStr);
+ } else
+ errmsg.LogError(err,NO_ERRCODE,"mysql configuration error: %s\n",msg);
+ } else {
+ fclose(fp);
+ mysql_options(pData->f_hmysql,MYSQL_READ_DEFAULT_FILE,pData->f_configfile);
+ }
+ }
/* Connect to database */
if(mysql_real_connect(pData->f_hmysql, pData->f_dbsrv, pData->f_dbuid,
pData->f_dbpwd, pData->f_dbname, pData->f_dbsrvPort, NULL, 0) == NULL) {
@@ -288,6 +319,8 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
} else {
pData->f_dbsrvPort = (unsigned) cs.iSrvPort; /* set configured port */
+ pData->f_configfile = cs.pszMySQLConfigFile;
+ pData->f_configsection = cs.pszMySQLConfigSection;
pData->f_hmysql = NULL; /* initialize, but connect only on first message (important for queued mode!) */
}
@@ -312,6 +345,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
{
DEFiRet;
cs.iSrvPort = 0; /* zero is the default port */
+ free(cs.pszMySQLConfigFile);
+ cs.pszMySQLConfigFile = NULL;
+ free(cs.pszMySQLConfigSection);
+ cs.pszMySQLConfigSection = NULL;
RETiRet;
}
@@ -323,6 +360,8 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
/* register our config handlers */
CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionommysqlserverport", 0, eCmdHdlrInt, NULL, &cs.iSrvPort, STD_LOADABLE_MODULE_ID, eConfObjAction));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"ommysqlconfigfile",0,eCmdHdlrGetWord,NULL,&cs.pszMySQLConfigFile,STD_LOADABLE_MODULE_ID, eConfObjAction));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"ommysqlconfigsection",0,eCmdHdlrGetWord,NULL,&cs.pszMySQLConfigSection,STD_LOADABLE_MODULE_ID, eConfObjAction));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID, eConfObjAction));
ENDmodInit
diff --git a/plugins/pmaixforwardedfrom/Makefile.am b/plugins/pmaixforwardedfrom/Makefile.am
new file mode 100644
index 00000000..af359d31
--- /dev/null
+++ b/plugins/pmaixforwardedfrom/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = pmaixforwardedfrom.la
+
+pmaixforwardedfrom_la_SOURCES = pmaixforwardedfrom.c
+pmaixforwardedfrom_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) -I ../../tools
+pmaixforwardedfrom_la_LDFLAGS = -module -avoid-version
+pmaixforwardedfrom_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/pmaixforwardedfrom/pmaixforwardedfrom.c b/plugins/pmaixforwardedfrom/pmaixforwardedfrom.c
new file mode 100644
index 00000000..11634199
--- /dev/null
+++ b/plugins/pmaixforwardedfrom/pmaixforwardedfrom.c
@@ -0,0 +1,167 @@
+/* pmaixforwardedfrom.c
+ *
+ * this detects logs sent by Cisco devices that mangle their syslog output when you tell them to log by name by adding ' :' between the name and the %XXX-X-XXXXXXX: tag
+ *
+ * instead of actually parsing the message, this modifies the message and then falls through to allow a later parser to handle the now modified message
+ *
+ * created 2010-12-13 by David Lang based on pmlastmsg
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <ctype.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "template.h"
+#include "msg.h"
+#include "module-template.h"
+#include "glbl.h"
+#include "errmsg.h"
+#include "parser.h"
+#include "datetime.h"
+#include "unicode-helper.h"
+
+MODULE_TYPE_PARSER
+PARSER_NAME("rsyslog.aixforwardedfrom")
+
+/* internal structures
+ */
+DEF_PMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(parser)
+DEFobjCurrIf(datetime)
+
+
+/* static data */
+static int bParseHOSTNAMEandTAG; /* cache for the equally-named global param - performance enhancement */
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATUREAutomaticSanitazion)
+ iRet = RS_RET_OK;
+ if(eFeat == sFEATUREAutomaticPRIParsing)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
+BEGINparse
+ uchar *p2parse;
+ uchar *opening;
+ int lenMsg;
+#define OpeningText "Message forwarded from "
+CODESTARTparse
+ dbgprintf("Message will now be parsed by fix AIX Forwarded From parser.\n");
+ assert(pMsg != NULL);
+ assert(pMsg->pszRawMsg != NULL);
+ lenMsg = pMsg->iLenRawMsg - pMsg->offAfterPRI; /* note: offAfterPRI is already the number of PRI chars (do not add one!) */
+ p2parse = pMsg->pszRawMsg + pMsg->offAfterPRI; /* point to start of text, after PRI */
+
+ /* check if this message is of the type we handle in this (very limited) parser */
+ /* first, we permit SP */
+ while(lenMsg && *p2parse == ' ') {
+ --lenMsg;
+ ++p2parse;
+ }
+dbgprintf("pmaixforwardedfrom: msg to look at: [%d]'%s'\n", lenMsg, p2parse);
+ if((unsigned) lenMsg < 42) {
+ /* too short, can not be "our" message */
+ /* minimum message, 16 character timestamp, 'Message forwarded from ", 1 character name, ': '*/
+dbgprintf("msg too short!\n");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+ }
+
+ /* skip over timestamp */
+ lenMsg -=16;
+ p2parse +=16;
+ /* if there is the string "Message forwarded from " were the hostname should be */
+ if(strncasecmp((char*) p2parse, OpeningText, sizeof(OpeningText)-1) != 0) {
+ /* wrong opening text */
+dbgprintf("not a AIX message forwarded from mangled log!\n");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+ }
+ /* bump the message portion up by 23 characters to overwrite the "Message forwarded from " with the hostname */
+ lenMsg -=23;
+ memmove(p2parse, p2parse + 23, lenMsg);
+ *(p2parse + lenMsg) = '\n';
+ *(p2parse + lenMsg + 1) = '\0';
+ pMsg->iLenRawMsg -=23;
+ pMsg->iLenMSG -=23;
+ /* now look for the : after the hostname to walk past the hostname, also watch for a space in case this isn't really an AIX log, but has a similar preamble */
+ while(lenMsg && *p2parse != ' ' && *p2parse != ':') {
+ --lenMsg;
+ ++p2parse;
+ }
+ if (lenMsg && *p2parse != ':') {
+dbgprintf("not a AIX message forwarded from mangled log but similar enough that the preamble has been removed\n");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+ }
+ /* bump the message portion up by one character to overwrite the extra : */
+ lenMsg -=1;
+ memmove(p2parse, p2parse + 1, lenMsg);
+ *(p2parse + lenMsg) = '\n';
+ *(p2parse + lenMsg + 1) = '\0';
+ pMsg->iLenRawMsg -=1;
+ pMsg->iLenMSG -=1;
+ /* now, claim to abort so that something else can parse the now modified message */
+ DBGPRINTF("pmaixforwardedfrom: new mesage: [%d]'%s'\n", lenMsg, pMsg->pszRawMsg + pMsg->offAfterPRI);
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+
+finalize_it:
+ENDparse
+
+
+BEGINmodExit
+CODESTARTmodExit
+ /* release what we no longer need */
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(parser, CORE_COMPONENT);
+ objRelease(datetime, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_PMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(parser, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+
+ DBGPRINTF("aixforwardedfrom parser init called, compiled with version %s\n", VERSION);
+ bParseHOSTNAMEandTAG = glbl.GetParseHOSTNAMEandTAG(); /* cache value, is set only during rsyslogd option processing */
+
+
+ENDmodInit
+
+/* vim:set ai:
+ */
diff --git a/plugins/pmcisconames/Makefile.am b/plugins/pmcisconames/Makefile.am
new file mode 100644
index 00000000..16ed347d
--- /dev/null
+++ b/plugins/pmcisconames/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = pmcisconames.la
+
+pmcisconames_la_SOURCES = pmcisconames.c
+pmcisconames_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) -I ../../tools
+pmcisconames_la_LDFLAGS = -module -avoid-version
+pmcisconames_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/pmcisconames/pmcisconames.c b/plugins/pmcisconames/pmcisconames.c
new file mode 100644
index 00000000..4171e688
--- /dev/null
+++ b/plugins/pmcisconames/pmcisconames.c
@@ -0,0 +1,177 @@
+/* pmcisconames.c
+ *
+ * this detects logs sent by Cisco devices that mangle their syslog output when you tell them to log by name by adding ' :' between the name and the %XXX-X-XXXXXXX: tag
+ *
+ * instead of actually parsing the message, this modifies the message and then falls through to allow a later parser to handle the now modified message
+ *
+ * created 2010-12-13 by David Lang based on pmlastmsg
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <ctype.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "template.h"
+#include "msg.h"
+#include "module-template.h"
+#include "glbl.h"
+#include "errmsg.h"
+#include "parser.h"
+#include "datetime.h"
+#include "unicode-helper.h"
+
+MODULE_TYPE_PARSER
+PARSER_NAME("rsyslog.cisconames")
+
+/* internal structures
+ */
+DEF_PMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(parser)
+DEFobjCurrIf(datetime)
+
+
+/* static data */
+static int bParseHOSTNAMEandTAG; /* cache for the equally-named global param - performance enhancement */
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATUREAutomaticSanitazion)
+ iRet = RS_RET_OK;
+ if(eFeat == sFEATUREAutomaticPRIParsing)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
+BEGINparse
+ uchar *p2parse;
+ int lenMsg;
+#define OpeningText ": %"
+CODESTARTparse
+ dbgprintf("Message will now be parsed by fix Cisco Names parser.\n");
+ assert(pMsg != NULL);
+ assert(pMsg->pszRawMsg != NULL);
+ lenMsg = pMsg->iLenRawMsg - pMsg->offAfterPRI; /* note: offAfterPRI is already the number of PRI chars (do not add one!) */
+ p2parse = pMsg->pszRawMsg + pMsg->offAfterPRI; /* point to start of text, after PRI */
+
+ /* check if this message is of the type we handle in this (very limited) parser */
+ /* first, we permit SP */
+ while(lenMsg && *p2parse == ' ') {
+ --lenMsg;
+ ++p2parse;
+ }
+dbgprintf("pmcisconames: msg to look at: [%d]'%s'\n", lenMsg, p2parse);
+ if((unsigned) lenMsg < 34) {
+ /* too short, can not be "our" message */
+ /* minimum message, 16 character timestamp, 1 character name, ' : %ASA-1-000000: '*/
+dbgprintf("msg too short!\n");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+ }
+ /* check if the timestamp is a 16 character or 21 character timestamp
+ 'Mmm DD HH:MM:SS ' spaces at 3,6,15 : at 9,12
+ 'Mmm DD YYYY HH:MM:SS ' spaces at 3,6,11,20 : at 14,17
+ check for the : first as that will differentiate the two conditions the fastest
+ this allows the compiler to short circuit the rst of the tests if it is the wrong timestamp
+ but still check the rest to see if it looks correct
+ */
+ if ( *(p2parse + 9) == ':' && *(p2parse + 12) == ':' && *(p2parse + 3) == ' ' && *(p2parse + 6) == ' ' && *(p2parse + 15) == ' ') {
+ /* skip over timestamp */
+ dbgprintf("short timestamp found\n");
+ lenMsg -=16;
+ p2parse +=16;
+ } else {
+ if ( *(p2parse + 14) == ':' && *(p2parse + 17) == ':' && *(p2parse + 3) == ' ' && *(p2parse + 6) == ' ' && *(p2parse + 11) == ' ' && *(p2parse + 20) == ' ') {
+ /* skip over timestamp */
+ dbgprintf("long timestamp found\n");
+ lenMsg -=21;
+ p2parse +=21;
+ } else {
+ dbgprintf("timestamp is not one of the valid formats\n");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+ }
+ }
+ /* now look for the next space to walk past the hostname */
+ while(lenMsg && *p2parse != ' ') {
+ --lenMsg;
+ ++p2parse;
+ }
+ /* skip the space after the hostname */
+ lenMsg -=1;
+ p2parse +=1;
+ /* if the syslog tag is : and the next thing starts with a % assume that this is a mangled cisco log and fix it */
+ if(strncasecmp((char*) p2parse, OpeningText, sizeof(OpeningText)-1) != 0) {
+ /* wrong opening text */
+dbgprintf("not a cisco name mangled log!\n");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+ }
+ /* bump the message portion up by two characters to overwrite the extra : */
+ lenMsg -=2;
+ memmove(p2parse, p2parse + 2, lenMsg);
+ *(p2parse + lenMsg) = '\n';
+ *(p2parse + lenMsg + 1) = '\0';
+ pMsg->iLenRawMsg -=2;
+ pMsg->iLenMSG -=2;
+ /* now, claim to abort so that something else can parse the now modified message */
+ DBGPRINTF("pmcisconames: new mesage: [%d]'%s'\n", lenMsg, p2parse);
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+
+finalize_it:
+ENDparse
+
+
+BEGINmodExit
+CODESTARTmodExit
+ /* release what we no longer need */
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(parser, CORE_COMPONENT);
+ objRelease(datetime, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_PMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(parser, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+
+ DBGPRINTF("cisconames parser init called, compiled with version %s\n", VERSION);
+ bParseHOSTNAMEandTAG = glbl.GetParseHOSTNAMEandTAG(); /* cache value, is set only during rsyslogd option processing */
+
+
+ENDmodInit
+
+/* vim:set ai:
+ */
diff --git a/plugins/pmsnare/Makefile.am b/plugins/pmsnare/Makefile.am
new file mode 100644
index 00000000..5b2696ac
--- /dev/null
+++ b/plugins/pmsnare/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = pmsnare.la
+
+pmsnare_la_SOURCES = pmsnare.c
+pmsnare_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) -I ../../tools
+pmsnare_la_LDFLAGS = -module -avoid-version
+pmsnare_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/pmsnare/pmsnare.c b/plugins/pmsnare/pmsnare.c
new file mode 100644
index 00000000..4a9880d4
--- /dev/null
+++ b/plugins/pmsnare/pmsnare.c
@@ -0,0 +1,238 @@
+/* pmsnare.c
+ *
+ * this detects logs sent by Snare and cleans them up so that they can be processed by the normal parser
+ *
+ * there are two variations of this, if the client is set to 'syslog' mode it sends
+ *
+ * <pri>timestamp<sp>hostname<sp>tag<tab>otherstuff
+ *
+ * if the client is not set to syslog it sends
+ *
+ * hostname<tab>tag<tab>otherstuff
+ *
+ * ToDo, take advantage of items in the message itself to set more friendly information
+ * where the normal parser will find it by re-writing more of the message
+ *
+ * Intereting information includes:
+ *
+ * in the case of windows snare messages:
+ * the system hostname is field 12
+ * the severity is field 3 (criticality ranging form 0 to 4)
+ * the source of the log is field 4 and may be able to be mapped to facility
+ *
+ *
+ * created 2010-12-13 by David Lang based on pmlastmsg
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <ctype.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "template.h"
+#include "msg.h"
+#include "module-template.h"
+#include "glbl.h"
+#include "errmsg.h"
+#include "parser.h"
+#include "datetime.h"
+#include "unicode-helper.h"
+
+MODULE_TYPE_PARSER
+PARSER_NAME("rsyslog.snare")
+
+/* internal structures
+ */
+DEF_PMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(parser)
+DEFobjCurrIf(datetime)
+
+
+/* static data */
+static int bParseHOSTNAMEandTAG; /* cache for the equally-named global param - performance enhancement */
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATUREAutomaticSanitazion)
+ iRet = RS_RET_OK;
+ if(eFeat == sFEATUREAutomaticPRIParsing)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
+BEGINparse
+ uchar *p2parse;
+ int lenMsg;
+ int snaremessage;
+ int tablength;
+
+CODESTARTparse
+ #define TabRepresentation "#011"
+ tablength=sizeof(TabRepresentation);
+ dbgprintf("Message will now be parsed by fix Snare parser.\n");
+ assert(pMsg != NULL);
+ assert(pMsg->pszRawMsg != NULL);
+
+ /* check if this message is of the type we handle in this (very limited) parser
+
+ find out if we have a space separated or tab separated for the first item
+ if tab separated see if the second word is one of our expected tags
+ if so replace the tabs with spaces so that hostname and syslog tag are going to be parsed properly
+ optionally replace the hostname at the beginning of the message with one from later in the message
+ else, wrong message, abort
+ else, assume that we have a valid timestamp, move over to the syslog tag
+ if that is tab separated from the rest of the message and one of our expected tags
+ if so, replace the tab with a space so that it will be parsed properly
+ optionally replace the hostname at the beginning of the message withone from later in the message
+
+ */
+ snaremessage=0;
+ lenMsg = pMsg->iLenRawMsg - pMsg->offAfterPRI; /* note: offAfterPRI is already the number of PRI chars (do not add one!) */
+ p2parse = pMsg->pszRawMsg + pMsg->offAfterPRI; /* point to start of text, after PRI */
+ dbgprintf("pmsnare: msg to look at: [%d]'%s'\n", lenMsg, p2parse);
+ if((unsigned) lenMsg < 30) {
+ /* too short, can not be "our" message */
+ dbgprintf("msg too short!\n");
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+ }
+
+ while(lenMsg && *p2parse != ' ' && *p2parse != '\t' && *p2parse != '#') {
+ --lenMsg;
+ ++p2parse;
+ }
+ dbgprintf("pmsnare: separator [%d]'%s' msg after the first separator: [%d]'%s'\n", tablength,TabRepresentation,lenMsg, p2parse);
+ if ((lenMsg > tablength) && (*p2parse == '\t' || strncasecmp((char*) p2parse, TabRepresentation , tablength-1) == 0)) {
+ //if ((lenMsg > tablength) && (*p2parse == '\t' || *p2parse == '#')) {
+ dbgprintf("pmsnare: tab separated message\n");
+ if(strncasecmp((char*) (p2parse + tablength - 1), "MSWinEventLog", 13) == 0) {
+ snaremessage=13; /* 0 means not a snare message, a number is how long the tag is */
+ }
+ if(strncasecmp((char*) (p2parse + tablength - 1), "LinuxKAudit", 11) == 0) {
+ snaremessage=11; /* 0 means not a snare message, a number is how long the tag is */
+ }
+ if(snaremessage) {
+ /* replace the tab with a space and if needed move the message portion up by the length of TabRepresentation -2 characters to overwrite the extra : */
+ *p2parse = ' ';
+ lenMsg -=(tablength-2);
+ p2parse++;
+ lenMsg--;
+ memmove(p2parse, p2parse + (tablength-2), lenMsg);
+ *(p2parse + lenMsg) = '\n';
+ *(p2parse + lenMsg + 1) = '\0';
+ pMsg->iLenRawMsg -=(tablength-2);
+ pMsg->iLenMSG -=(tablength-2);
+ p2parse += snaremessage;
+ lenMsg -= snaremessage;
+ *p2parse = ' ';
+ p2parse++;
+ lenMsg--;
+ lenMsg -=(tablength-2);
+ memmove(p2parse, p2parse + (tablength-2), lenMsg);
+ *(p2parse + lenMsg) = '\n';
+ *(p2parse + lenMsg + 1) = '\0';
+ pMsg->iLenRawMsg -=(tablength-2);
+ pMsg->iLenMSG -=(tablength-2);
+ dbgprintf("found a Snare message with snare not set to send syslog messages\n");
+ }
+ } else {
+ /* go back to the beginning of the message */
+ lenMsg = pMsg->iLenRawMsg - pMsg->offAfterPRI; /* note: offAfterPRI is already the number of PRI chars (do not add one!) */
+ p2parse = pMsg->pszRawMsg + pMsg->offAfterPRI; /* point to start of text, after PRI */
+ /* skip over timestamp and space*/
+ lenMsg -=17;
+ p2parse +=17;
+ /* skip over what should be the hostname */
+ while(lenMsg && *p2parse != ' ') {
+ --lenMsg;
+ ++p2parse;
+ }
+ if (lenMsg){
+ --lenMsg;
+ ++p2parse;
+ }
+ dbgprintf("pmsnare: separator [%d]'%s' msg after the timestamp and hostname: [%d]'%s'\n", tablength,TabRepresentation,lenMsg, p2parse);
+ if(lenMsg > 13 && strncasecmp((char*) p2parse, "MSWinEventLog", 13) == 0) {
+ snaremessage=13; /* 0 means not a snare message, a number is how long the tag is */
+ }
+ if(lenMsg > 11 && strncasecmp((char*) p2parse, "LinuxKAudit", 11) == 0) {
+ snaremessage=11; /* 0 means not a snare message, a number is how long the tag is */
+ }
+ if(snaremessage) {
+ p2parse += snaremessage;
+ lenMsg -= snaremessage;
+ *p2parse = ' ';
+ p2parse++;
+ lenMsg--;
+ lenMsg -=(tablength-2);
+ memmove(p2parse, p2parse + (tablength-2), lenMsg);
+ *(p2parse + lenMsg) = '\n';
+ *(p2parse + lenMsg + 1) = '\0';
+ pMsg->iLenRawMsg -=(tablength-2);
+ pMsg->iLenMSG -=(tablength-2);
+ dbgprintf("found a Snare message with snare set to send syslog messages\n");
+ }
+
+ }
+ DBGPRINTF("pmsnare: new message: [%d]'%s'\n", lenMsg, pMsg->pszRawMsg + pMsg->offAfterPRI);
+ ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
+
+finalize_it:
+ENDparse
+
+
+BEGINmodExit
+CODESTARTmodExit
+ /* release what we no longer need */
+ objRelease(errmsg, CORE_COMPONENT);
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(parser, CORE_COMPONENT);
+ objRelease(datetime, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_PMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(parser, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+
+ DBGPRINTF("snare parser init called, compiled with version %s\n", VERSION);
+ bParseHOSTNAMEandTAG = glbl.GetParseHOSTNAMEandTAG(); /* cache value, is set only during rsyslogd option processing */
+
+
+ENDmodInit
+
+/* vim:set ai:
+ */