diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2012-01-18 14:51:33 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2012-01-18 14:51:33 +0100 |
commit | 03be2fcd7cfe3355b8108fe8368a6a65ab98e9e9 (patch) | |
tree | 5b56babfece1bc2f5e3ee6cbf030859b72232eca | |
parent | ac9afc4149db314d9c480232d70216960342e3e4 (diff) | |
parent | ef34821a2737799f48c3032b9616418e4f7fa34f (diff) | |
download | rsyslog-03be2fcd7cfe3355b8108fe8368a6a65ab98e9e9.tar.gz rsyslog-03be2fcd7cfe3355b8108fe8368a6a65ab98e9e9.tar.xz rsyslog-03be2fcd7cfe3355b8108fe8368a6a65ab98e9e9.zip |
Merge branch 'v5-devel' into master
Conflicts:
ChangeLog
Makefile.am
configure.ac
doc/manual.html
plugins/imptcp/imptcp.c
plugins/imudp/imudp.c
plugins/imuxsock/imuxsock.c
runtime/parser.c
template.c
tools/omfwd.c
tools/syslogd.c
-rw-r--r-- | ChangeLog | 14 | ||||
-rw-r--r-- | Makefile.am | 5 | ||||
-rw-r--r-- | action.c | 2 | ||||
-rw-r--r-- | configure.ac | 36 | ||||
-rw-r--r-- | doc/imuxsock.html | 5 | ||||
-rw-r--r-- | doc/rsyslog_conf_global.html | 1 | ||||
-rw-r--r-- | doc/rsyslog_conf_templates.html | 4 | ||||
-rw-r--r-- | plugins/imptcp/imptcp.c | 59 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 146 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 6 | ||||
-rw-r--r-- | plugins/omelasticsearch/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 270 | ||||
-rw-r--r-- | runtime/parser.c | 9 | ||||
-rw-r--r-- | runtime/rsconf.c | 3 | ||||
-rw-r--r-- | template.c | 92 | ||||
-rw-r--r-- | template.h | 9 | ||||
-rw-r--r-- | tools/omfwd.c | 73 | ||||
-rw-r--r-- | tools/syslogd.c | 8 |
18 files changed, 590 insertions, 160 deletions
@@ -334,11 +334,19 @@ expected that interfaces, even new ones, break during the initial syslog plain tcp input plugin (NOT supporting TLS!) [ported from v4] --------------------------------------------------------------------------- -Version 5.9.6 [V5-DEVEL], 20??-??-?? -- new stats counters "discarded.nf" and "discarded.full" for queue object. - Tells how many messages have been discarded due to queue full condition. +Version 5.9.6 [V5-DEVEL], 2012-??-?? +- $IMUXSockRateLimitInterval DEFAULT CHANGED, was 5, now 0 + The new default turns off rate limiting. This was chosen as people + experienced problems with rate-limiting activated by default. Now it + needs an explicit opt-in by setting this parameter. + Thanks to Chris Gaffney for suggesting to make it opt-in; thanks to + many unnamed others who already had complained at the time Chris made + the suggestion ;-) --------------------------------------------------------------------------- Version 5.9.5 [V5-DEVEL], 2011-11-29 +- new stats counters for imudp and imtcp +- new stats counters "discarded.nf" and "discarded.full" for queue object. + Tells how many messages have been discarded due to queue full condition. - enhanced module loader to not rely on PATH_MAX --------------------------------------------------------------------------- Version 5.9.4 [V5-DEVEL], 2011-11-29 diff --git a/Makefile.am b/Makefile.am index 49ece3a8..ff8a0e86 100644 --- a/Makefile.am +++ b/Makefile.am @@ -160,6 +160,10 @@ if ENABLE_OMHDFS SUBDIRS += plugins/omhdfs endif +if ENABLE_ELASTICSEARCH +SUBDIRS += plugins/omelasticsearch +endif + if ENABLE_MMSNMPTRAPD SUBDIRS += plugins/mmsnmptrapd endif @@ -243,5 +247,6 @@ DISTCHECK_CONFIGURE_FLAGS= --enable-gssapi_krb5 \ --enable-pmcisconames \ --enable-pmsnare \ --enable-mmsnmptrapd \ + --enable-elasticsearch \ --with-systemdsystemunitdir=$$dc_install_base/$(systemdsystemunitdir) ACLOCAL_AMFLAGS = -I m4 @@ -1923,7 +1923,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, } /* check required template options */ if( (iTplOpts & OMSR_RQD_TPL_OPT_SQL) - && (pAction->ppTpl[i]->optFormatForSQL == 0)) { + && (pAction->ppTpl[i]->optFormatEscape == 0)) { errno = 0; errmsg.LogError(0, RS_RET_RQD_TPLOPT_MISSING, "Action disabled. To use this action, you have to specify " "the SQL or stdSQL option in your template!\n"); diff --git a/configure.ac b/configure.ac index 89177fc3..aaf13996 100644 --- a/configure.ac +++ b/configure.ac @@ -671,6 +671,40 @@ AC_SUBST(SNMP_CFLAGS) AC_SUBST(SNMP_LIBS) +# elasticsearch support +AC_ARG_ENABLE(elasticsearch, + [AS_HELP_STRING([--enable-elasticsearch],[Enable elasticsearch output module @<:@default=no@:>@])], + [case "${enableval}" in + yes) enable_elasticsearch="yes" ;; + no) enable_elasticsearch="no" ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-elasticsearch) ;; + esac], + [enable_elasticsearch=no] +) +if test "x$enable_elasticsearch" = "xyes"; then + AC_CHECK_PROG( + [HAVE_CURL_CONFIG], + [curl-config], + [yes],,, + ) + if test "x${HAVE_CURL_CONFIG}" != "xyes"; then + AC_MSG_FAILURE([curl-config not found in PATH]) + fi + AC_CHECK_LIB( + [curl], + [curl_global_init], + [CURL_CFLAGS="`curl-config --cflags`" + CURL_LIBS="`curl-config --libs`" + ], + [AC_MSG_FAILURE([curl library is missing])], + [`curl-config --libs --cflags`] + ) +fi +AM_CONDITIONAL(ENABLE_ELASTICSEARCH, test x$enable_elasticsearch = xyes) +AC_SUBST(CURL_CFLAGS) +AC_SUBST(CURL_LIBS) + + # GnuTLS support AC_ARG_ENABLE(gnutls, [AS_HELP_STRING([--enable-gnutls],[Enable GNU TLS support @<:@default=no@:>@])], @@ -1220,6 +1254,7 @@ AC_CONFIG_FILES([Makefile \ plugins/omoracle/Makefile \ plugins/omudpspoof/Makefile \ plugins/mmnormalize/Makefile \ + plugins/omelasticsearch/Makefile \ plugins/sm_cust_bindcdr/Makefile \ plugins/mmsnmptrapd/Makefile \ plugins/cust1/Makefile \ @@ -1255,6 +1290,7 @@ echo " omprog module will be compiled: $enable_omprog" echo " output mongodb module will be compiled: $enable_ommongodb" echo " omstdout module will be compiled: $enable_omstdout" echo " omhdfs module will be compiled: $enable_omhdfs" +echo " omelasticsearch module will be compiled: $enable_elasticsearch" echo " omruleset module will be compiled: $enable_omruleset" echo " omdbalerting module will be compiled: $enable_omdbalerting" echo " omudpspoof module will be compiled: $enable_omudpspoof" diff --git a/doc/imuxsock.html b/doc/imuxsock.html index f80bc598..734ae889 100644 --- a/doc/imuxsock.html +++ b/doc/imuxsock.html @@ -65,7 +65,10 @@ you must turn it on (via $SystemLogSocketAnnotate and $InputUnixListenSocketAnno <li><b>$InputUnixListenSocketFlowControl</b> [on/<b>off</b>] - specifies if flow control should be applied to the next socket.</li> <li><b>$IMUXSockRateLimitInterval</b> [number] - specifies the rate-limiting -interval in seconds. Default value is 5 seconds. Set it to 0 to turn rate limiting off. +interval in seconds. Default value is 0, which turns off rate limiting. Set it to a number +of seconds (5 recommended) to activate rate-limiting. The default of 0 has been choosen in 5.9.6+, +as people experienced problems with this feature activated by default. Now it needs an +explicit opt-in by setting this parameter. </li> <li><b>$IMUXSockRateLimitBurst</b> [number] - specifies the rate-limiting burst in number of messages. Default is 200. diff --git a/doc/rsyslog_conf_global.html b/doc/rsyslog_conf_global.html index 21786a7f..b254f366 100644 --- a/doc/rsyslog_conf_global.html +++ b/doc/rsyslog_conf_global.html @@ -143,6 +143,7 @@ our paper on <a href="multi_ruleset.html">using multiple rule sets in rsyslog</a <li><a href="rsconf1_escape8bitcharsonreceive.html">$Escape8BitCharactersOnReceive</a></li> <li><a href="rsconf1_escapecontrolcharactersonreceive.html">$EscapeControlCharactersOnReceive</a></li> <li><b>$EscapeControlCharactersOnReceive</b> [<b>on</b>|off] - escape USASCII HT character</li> +<li>$SpaceLFOnReceive [on/<b>off</b>] - instructs rsyslogd to replace LF with spaces during message reception (sysklogd compatibility aid)</li> <li>$ErrorMessagesToStderr [<b>on</b>|off] - direct rsyslogd error message to stderr (in addition to other targets)</li> <li><a href="rsconf1_failonchownfailure.html">$FailOnChownFailure</a></li> <li><a href="rsconf1_filecreatemode.html">$FileCreateMode</a></li> diff --git a/doc/rsyslog_conf_templates.html b/doc/rsyslog_conf_templates.html index 23a02049..bd0b3253 100644 --- a/doc/rsyslog_conf_templates.html +++ b/doc/rsyslog_conf_templates.html @@ -146,6 +146,10 @@ with high-precision timestamps and timezone information</li> useful if you send messages to other syslogd's or rsyslogd below version 3.12.5.</li> +<li><span style="font-weight: bold;">RSYSLOG_SysklogdFileFormat</span> +- sysklogd compatible log file format. If used with options: $SpaceLFOnReceive on; +$EscapeControlCharactersOnReceive off; $DropTrailingLFOnReception off, +the log format will conform to sysklogd log format.</li> <li><span style="font-weight: bold;">RSYSLOG_ForwardFormat</span> - a new high-precision forwarding format very similar to the traditional one, but with high-precision timestamps and timezone diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 428f5123..f5868311 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -66,6 +66,7 @@ #include "datetime.h" #include "ruleset.h" #include "msg.h" +#include "statsobj.h" #include "net.h" /* for permittedPeers, may be removed when this is removed */ /* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */ @@ -84,6 +85,7 @@ DEFobjCurrIf(prop) DEFobjCurrIf(datetime) DEFobjCurrIf(errmsg) DEFobjCurrIf(ruleset) +DEFobjCurrIf(statsobj) /* forward references */ static void * wrkr(void *myself); @@ -161,7 +163,8 @@ struct ptcpsrv_s { * includes support for doubly-linked list. */ struct ptcpsess_s { - ptcpsrv_t *pSrv; /* our server */ +// ptcpsrv_t *pSrv; /* our server TODO: check remove! */ + ptcplstn_t *pLstn; /* our listener */ ptcpsess_t *prev, *next; int sock; epolld_t *epd; @@ -189,6 +192,8 @@ struct ptcplstn_s { ptcplstn_t *prev, *next; int sock; epolld_t *epd; + statsobj_t *stats; /* listener stats */ + STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) }; @@ -230,7 +235,7 @@ static int iMaxLine; /* maximum size of a single message */ /* forward definitions */ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal); -static rsRetVal addLstn(ptcpsrv_t *pSrv, int sock); +static rsRetVal addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6); /* some simple constructors/destructors */ @@ -276,6 +281,7 @@ startupSrv(ptcpsrv_t *pSrv) int sockflags; struct addrinfo hints, *res = NULL, *r; uchar *lstnIP; + int isIPv6 = 0; lstnIP = pSrv->lstnIP == NULL ? UCHAR_CONSTANT("") : pSrv->lstnIP; @@ -308,8 +314,9 @@ startupSrv(ptcpsrv_t *pSrv) continue; } -#ifdef IPV6_V6ONLY if(r->ai_family == AF_INET6) { + isIPv6 = 1; +#ifdef IPV6_V6ONLY int iOn = 1; if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&iOn, sizeof (iOn)) < 0) { @@ -317,8 +324,8 @@ startupSrv(ptcpsrv_t *pSrv) sock = -1; continue; } - } #endif + } if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)) < 0 ) { DBGPRINTF("error %d setting tcp socket option\n", errno); close(sock); @@ -380,7 +387,7 @@ startupSrv(ptcpsrv_t *pSrv) /* if we reach this point, we were able to obtain a valid socket, so we can * create our listener object. -- rgerhards, 2010-08-10 */ - CHKiRet(addLstn(pSrv, sock)); + CHKiRet(addLstn(pSrv, sock, isIPv6)); ++numSocks; } @@ -607,22 +614,25 @@ static rsRetVal doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub) { msg_t *pMsg; + ptcpsrv_t *pSrv; DEFiRet; if(pThis->iMsg == 0) { DBGPRINTF("discarding zero-sized message\n"); FINALIZE; } + pSrv = pThis->pLstn->pSrv; /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pThis->pMsg, pThis->iMsg); - MsgSetInputName(pMsg, pThis->pSrv->pInputName); + MsgSetInputName(pMsg, pSrv->pInputName); MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; MsgSetRcvFrom(pMsg, pThis->peerName); CHKiRet(MsgSetRcvFromIP(pMsg, pThis->peerIP)); - MsgSetRuleset(pMsg, pThis->pSrv->pRuleset); + MsgSetRuleset(pMsg, pSrv->pRuleset); + STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit); if(pMultiSub == NULL) { CHKiRet(submitMsg(pMsg)); @@ -704,7 +714,8 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG } if(( (c == '\n') - || ((pThis->pSrv->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->iAddtlFrameDelim)) + || ((pThis->pLstn->pSrv->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) + && (c == pThis->pLstn->pSrv->iAddtlFrameDelim)) ) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */ doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); pThis->inputState = eAtStrtFram; @@ -864,14 +875,25 @@ finalize_it: /* add a listener to the server */ static rsRetVal -addLstn(ptcpsrv_t *pSrv, int sock) +addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6) { DEFiRet; ptcplstn_t *pLstn; + uchar statname[64]; CHKmalloc(pLstn = malloc(sizeof(ptcplstn_t))); pLstn->pSrv = pSrv; pLstn->sock = sock; + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&(pLstn->stats))); + snprintf((char*)statname, sizeof(statname), "imptcp(%s/%s/%s)", + (pSrv->lstnIP == NULL) ? "*" : (char*)pSrv->lstnIP, pSrv->port, + isIPv6 ? "IPv6" : "IPv4"); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(pLstn->stats, statname)); + CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &(pLstn->ctrSubmit))); + CHKiRet(statsobj.ConstructFinalize(pLstn->stats)); /* add to start of server's listener list */ pLstn->prev = NULL; @@ -890,14 +912,15 @@ finalize_it: /* add a session to the server */ static rsRetVal -addSess(ptcpsrv_t *pSrv, int sock, prop_t *peerName, prop_t *peerIP) +addSess(ptcplstn_t *pLstn, int sock, prop_t *peerName, prop_t *peerIP) { DEFiRet; ptcpsess_t *pSess = NULL; + ptcpsrv_t *pSrv = pLstn->pSrv; CHKmalloc(pSess = malloc(sizeof(ptcpsess_t))); CHKmalloc(pSess->pMsg = malloc(iMaxLine * sizeof(uchar))); - pSess->pSrv = pSrv; + pSess->pLstn = pLstn; pSess->sock = sock; pSess->inputState = eAtStrtFram; pSess->iMsg = 0; @@ -935,17 +958,17 @@ closeSess(ptcpsess_t *pSess) CHKiRet(removeEPollSock(sock, pSess->epd)); close(sock); - pthread_mutex_lock(&pSess->pSrv->mutSessLst); + pthread_mutex_lock(&pSess->pLstn->pSrv->mutSessLst); /* finally unlink session from structures */ if(pSess->next != NULL) pSess->next->prev = pSess->prev; if(pSess->prev == NULL) { /* need to update root! */ - pSess->pSrv->pSess = pSess->next; + pSess->pLstn->pSrv->pSess = pSess->next; } else { pSess->prev->next = pSess->next; } - pthread_mutex_unlock(&pSess->pSrv->mutSessLst); + pthread_mutex_unlock(&pSess->pLstn->pSrv->mutSessLst); /* unlinked, now remove structure */ destructSess(pSess); @@ -1149,7 +1172,7 @@ lstnActivity(ptcplstn_t *pLstn) if(localRet == RS_RET_NO_MORE_DATA || glbl.GetGlobalInputTermState() == 1) break; CHKiRet(localRet); - CHKiRet(addSess(pLstn->pSrv, newSock, peerName, peerIP)); + CHKiRet(addSess(pLstn, newSock, peerName, peerIP)); } finalize_it: @@ -1180,7 +1203,7 @@ sessActivity(ptcpsess_t *pSess) CHKiRet(DataRcvd(pSess, rcvBuf, lenRcv)); } else if (lenRcv == 0) { /* session was closed, do clean-up */ - if(pSess->pSrv->bEmitMsgOnClose) { + if(pSess->pLstn->pSrv->bEmitMsgOnClose) { uchar *peerName; int lenPeer; prop.GetString(pSess->peerName, &peerName, &lenPeer); @@ -1445,6 +1468,8 @@ shutdownSrv(ptcpsrv_t *pSrv) pLstn = pSrv->pLstn; while(pLstn != NULL) { close(pLstn->sock); + statsobj.Destruct(&(pLstn->stats)); + /* now unlink listner */ lstnDel = pLstn; pLstn = pLstn->next; DBGPRINTF("imptcp shutdown listen socket %d\n", lstnDel->sock); @@ -1487,6 +1512,7 @@ CODESTARTmodExit pthread_attr_destroy(&wrkrThrdAttr); /* release objects we used */ objRelease(glbl, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); objRelease(net, LM_NET_FILENAME); objRelease(datetime, CORE_COMPONENT); @@ -1535,6 +1561,7 @@ CODESTARTmodInit CODEmodInit_QueryRegCFSLineHdlr /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index badad949..14032e1f 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -26,6 +26,7 @@ * A copy of the GPL can be found in the file "COPYING" in this distribution. */ #include "config.h" +#include <stdio.h> #include <stdlib.h> #include <assert.h> #include <string.h> @@ -70,8 +71,14 @@ DEFobjCurrIf(prop) DEFobjCurrIf(ruleset) DEFobjCurrIf(statsobj) -statsobj_t *modStats; -STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) + +static struct lstn_s { + struct lstn_s *next; + int sock; /* socket */ + ruleset_t *pRuleset; /* bound ruleset */ + statsobj_t *stats; /* listener stats */ + STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) +} *lcnfRoot = NULL, *lcnfLast = NULL; static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */ static int iMaxLine; /* maximum UDP message size supported */ @@ -79,9 +86,6 @@ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitte * This shall prevent remote DoS when the "discard on disallowed sender" * message is configured to be logged on occurance of such a case. */ -static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements - * read-only after init(), but beware of restart! */ -static ruleset_t **udpRulesets = NULL; /* ruleset to be used with sockets in question (entry 0 is empty) */ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc * it so that we can check available memory in willRun() and request * termination if we can not get it. -- rgerhards, 2007-12-27 @@ -170,14 +174,18 @@ static inline rsRetVal addListner(instanceConf_t *inst) { DEFiRet; + uchar *bindAddr; int *newSocks; - int *tmpSocks; - int iSrc, iDst; - ruleset_t **tmpRulesets; + int iSrc; + struct lstn_s *newlcnfinfo; + uchar *bindName; + uchar *port; + uchar statname[64]; /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 */ +#if 0 //<<<<<<< HEAD DBGPRINTF("imudp: trying to open port at %s:%s.\n", (inst->pszBindAddr == NULL) ? (uchar*)"*" : inst->pszBindAddr, inst->pszBindPort); @@ -222,10 +230,50 @@ addListner(instanceConf_t *inst) free(udpRulesets); udpRulesets = tmpRulesets; } +#else //======= + if(inst->pszBindAddr == NULL) + bindAddr = NULL; + else if(inst->pszBindAddr[0] == '*' && inst->pszBindAddr[1] == '\0') + bindAddr = NULL; + else + bindAddr = inst->pszBindAddr; + bindName = (bindAddr == NULL) ? (uchar*)"*" : bindAddr; + port = (inst->pszBindPort == NULL || *inst->pszBindPort == '\0') ? (uchar*) "514" : inst->pszBindPort; + + DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", bindName, inst->pszBindPort); + + newSocks = net.create_udp_socket(bindAddr, port, 1); + if(newSocks != NULL) { + /* we now need to add the new sockets to the existing set */ + /* ready to copy */ + for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) { + CHKmalloc(newlcnfinfo = (struct lstn_s*) MALLOC(sizeof(struct lstn_s))); + newlcnfinfo->next = NULL; + newlcnfinfo->sock = newSocks[iSrc]; + newlcnfinfo->pRuleset = inst->pBindRuleset; + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&(newlcnfinfo->stats))); + snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname)); + CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &(newlcnfinfo->ctrSubmit))); + CHKiRet(statsobj.ConstructFinalize(newlcnfinfo->stats)); + /* link to list. Order must be preserved to take care for + * conflicting matches. + */ + if(lcnfRoot == NULL) + lcnfRoot = newlcnfinfo; + if(lcnfLast == NULL) + lcnfLast = newlcnfinfo; + else + lcnfLast->next = newlcnfinfo; +#endif //>>>>>>> ef34821a2737799f48c3032b9616418e4f7fa34f } } finalize_it: + free(newSocks); RETiRet; } @@ -255,8 +303,7 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta * on scheduling order. -- rgerhards, 2008-10-02 */ static inline rsRetVal -processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, - ruleset_t *pRuleset) +processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) { DEFiRet; int iNbrTimeUsed; @@ -276,7 +323,7 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, if(pThrd->bShallStop == TRUE) ABORT_FINALIZE(RS_RET_FORCE_TERM); socklen = sizeof(struct sockaddr_storage); - lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); + lenRcvBuf = recvfrom(lstn->sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); if(lenRcvBuf < 0) { if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); @@ -321,7 +368,7 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, *pbIsPermitted = 1; /* no check -> everything permitted */ } - DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", fd, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); + DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); if(*pbIsPermitted != 0) { if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) { @@ -331,14 +378,14 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); MsgSetInputName(pMsg, pInputName); - MsgSetRuleset(pMsg, pRuleset); + MsgSetRuleset(pMsg, lstn->pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; if(*pbIsPermitted == 2) pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); CHKiRet(submitMsg(pMsg)); - STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit); + STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); } } @@ -491,6 +538,8 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) struct epoll_event *udpEPollEvt = NULL; struct epoll_event currEvt[NUM_EPOLL_EVENTS]; char errStr[1024]; + struct lstn_s *lstn; + int nLstn; /* start "name caching" algo by making sure the previous system indicator * is invalidated. @@ -498,7 +547,11 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); - CHKmalloc(udpEPollEvt = calloc(udpLstnSocks[0], sizeof(struct epoll_event))); + /* count num listeners -- do it here in order to avoid inconsistency */ + nLstn = 0; + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) + ++nLstn; + CHKmalloc(udpEPollEvt = calloc(nLstn, sizeof(struct epoll_event))); #if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) DBGPRINTF("imudp uses epoll_create1()\n"); @@ -518,16 +571,18 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) /* fill the epoll set - we need to do this only once, as the set * can not change dyamically. */ - for (i = 0; i < *udpLstnSocks; i++) { - if (udpLstnSocks[i+1] != -1) { + i = 0; + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) { + if(lstn->sock != -1) { udpEPollEvt[i].events = EPOLLIN | EPOLLET; - udpEPollEvt[i].data.u64 = i+1; - if(epoll_ctl(efd, EPOLL_CTL_ADD, udpLstnSocks[i+1], &(udpEPollEvt[i])) < 0) { + udpEPollEvt[i].data.u64 = (long long unsigned) lstn; + if(epoll_ctl(efd, EPOLL_CTL_ADD, lstn->sock, &(udpEPollEvt[i])) < 0) { rs_strerror_r(errno, errStr, sizeof(errStr)); errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n", - udpLstnSocks[i+1], errStr); + lstn->sock, errStr); } } + i++; } while(1) { @@ -539,8 +594,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) break; /* terminate input! */ for(i = 0 ; i < nfds ; ++i) { - processSocket(pThrd, udpLstnSocks[currEvt[i].data.u64], &frominetPrev, &bIsPermitted, - udpRulesets[currEvt[i].data.u64]); + processSocket(pThrd, (struct lstn_s*)currEvt[i].data.u64, &frominetPrev, &bIsPermitted); } } @@ -557,10 +611,10 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) DEFiRet; int maxfds; int nfds; - int i; fd_set readfds; struct sockaddr_storage frominetPrev; int bIsPermitted; + struct lstn_s *lstn; /* start "name caching" algo by making sure the previous system indicator * is invalidated. @@ -571,20 +625,17 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) while(1) { /* Add the Unix Domain Sockets to the list of read descriptors. - * rgerhards 2005-08-01: we must now check if there are - * any local sockets to listen to at all. If the -o option - * is given without -a, we do not need to listen at all.. */ maxfds = 0; FD_ZERO(&readfds); /* Add the UDP listen sockets to the list of read descriptors. */ - for (i = 0; i < *udpLstnSocks; i++) { - if (udpLstnSocks[i+1] != -1) { + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) { + if (lstn->sock != -1) { if(Debug) - net.debugListenInfo(udpLstnSocks[i+1], "UDP"); - FD_SET(udpLstnSocks[i+1], &readfds); - if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1]; + net.debugListenInfo(lstn->sock, "UDP"); + FD_SET(lstn->sock, &readfds); + if(lstn->sock>maxfds) maxfds=lstn->sock; } } if(Debug) { @@ -600,10 +651,9 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ - for(i = 0; nfds && i < *udpLstnSocks; i++) { - if(FD_ISSET(udpLstnSocks[i+1], &readfds)) { - processSocket(pThrd, udpLstnSocks[i+1], &frominetPrev, &bIsPermitted, - udpRulesets[i+1]); + for(lstn = lcnfRoot ; nfds && lstn != NULL ; lstn = lstn->next) { + if(FD_ISSET(lstn->sock, &readfds)) { + processSocket(pThrd, lstn, &frominetPrev, &bIsPermitted); --nfds; /* indicate we have processed one descriptor */ } } @@ -677,7 +727,7 @@ CODESTARTactivateCnfPrePrivDrop addListner(inst); } /* if we could not set up any listners, there is no point in running... */ - if(udpLstnSocks == NULL) { + if(lcnfRoot == NULL) { errmsg.LogError(0, NO_ERRCODE, "imudp: no listeners could be started, " "input not activated.\n"); ABORT_FINALIZE(RS_RET_NO_RUN); @@ -702,7 +752,7 @@ CODESTARTfreeCnf ENDfreeCnf /* This function is called to gather input. - * Note that udpLstnSocks must be non-NULL because otherwise we would not have + * Note that sock must be non-NULL because otherwise we would not have * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 */ BEGINrunInput @@ -720,15 +770,18 @@ ENDwillRun BEGINafterRun + struct lstn_s *lstn, *lstnDel; CODESTARTafterRun /* do cleanup here */ net.clearAllowedSenders((uchar*)"UDP"); - if(udpLstnSocks != NULL) { - net.closeUDPListenSockets(udpLstnSocks); - udpLstnSocks = NULL; - free(udpRulesets); - udpRulesets = NULL; + for(lstn = lcnfRoot ; lstn != NULL ; ) { + statsobj.Destruct(&(lstn->stats)); + close(lstn->sock); + lstnDel = lstn; + lstn = lstn->next; + free(lstnDel); } + lcnfRoot = lcnfLast = NULL; if(pRcvBuf != NULL) { free(pRcvBuf); pRcvBuf = NULL; @@ -741,8 +794,6 @@ CODESTARTmodExit if(pInputName != NULL) prop.Destruct(&pInputName); - statsobj.Destruct(&modStats); - /* release what we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); @@ -821,13 +872,6 @@ CODEmodInit_QueryRegCFSLineHdlr NULL, &cs.iTimeRequery, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); - - /* support statistics gathering */ - CHKiRet(statsobj.Construct(&modStats)); - CHKiRet(statsobj.SetName(modStats, UCHAR_CONSTANT("imudp"))); - CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("submitted"), - ctrType_IntCtr, &ctrSubmit)); - CHKiRet(statsobj.ConstructFinalize(modStats)); ENDmodInit /* vim:set ai: */ diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index c5d6b1f1..22ead0cf 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -158,7 +158,7 @@ static int sd_fds = 0; /* number of systemd activated sockets */ /* config vars for legacy config system */ #define DFLT_bCreatePath 0 -#define DFLT_ratelimitInterval 5 +#define DFLT_ratelimitInterval 0 #define DFLT_ratelimitBurst 200 #define DFLT_ratelimitSeverity 1 /* do not rate-limit emergency messages */ static struct configSettings_s { @@ -799,7 +799,6 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim lenRcv = toffs + 1; } - /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, &st, tt)); MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv); @@ -820,8 +819,6 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim * datestamp or not .. and advance the parse pointer accordingly. */ datetime.ParseTIMESTAMP3164(&dummyTS, &parse, &lenMsg); - parse += 16; /* just skip timestamp */ - lenMsg -= 16; } else { if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) { DBGPRINTF("we have a problem, invalid timestamp in msg!\n"); @@ -1161,7 +1158,6 @@ CODESTARTafterRun /* Clean-up files. */ for(i = startIndexUxLocalSockets; i < nfd; i++) if (listeners[i].sockName && listeners[i].fd != -1) { - /* If systemd passed us a socket it is systemd's job to clean it up. * Do not unlink it -- we will get same socket (node) from systemd * e.g. on restart again. diff --git a/plugins/omelasticsearch/Makefile.am b/plugins/omelasticsearch/Makefile.am new file mode 100644 index 00000000..a574c72f --- /dev/null +++ b/plugins/omelasticsearch/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = omelasticsearch.la + +omelasticsearch_la_SOURCES = omelasticsearch.c +omelasticsearch_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) +omelasticsearch_la_LDFLAGS = -module -avoid-version +omelasticsearch_la_LIBADD = $(CURL_LIBS) + +EXTRA_DIST = diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c new file mode 100644 index 00000000..3bec1838 --- /dev/null +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -0,0 +1,270 @@ +/* omelasticsearch.c + * This is the http://www.elasticsearch.org/ output module. + * + * NOTE: read comments in module-template.h for more specifics! + * + * Copyright 2011 Nathan Scott. + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <curl/curl.h> +#include <curl/types.h> +#include <curl/easy.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <time.h> +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "statsobj.h" +#include "cfsysline.h" + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP + +/* internal structures */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(statsobj) + +statsobj_t *indexStats; +STATSCOUNTER_DEF(indexConFail, mutIndexConFail) +STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit) +STATSCOUNTER_DEF(indexFailed, mutIndexFailed) +STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) + +/* REST API for elasticsearch hits this URL: + * http://<hostName>:<restPort>/<searchIndex>/<searchType> + */ +typedef struct curl_slist HEADER; +typedef struct _instanceData { + CURL *curlHandle; /* libcurl session handle */ + HEADER *postHeader; /* json POST request info */ +} instanceData; + +/* config variables */ +static int restPort = 9200; +static char *hostName = "localhost"; +static char *searchIndex = "system"; +static char *searchType = "events"; + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + +BEGINfreeInstance +CODESTARTfreeInstance + if (pData->postHeader) { + curl_slist_free_all(pData->postHeader); + pData->postHeader = NULL; + } + if (pData->curlHandle) { + curl_easy_cleanup(pData->curlHandle); + pData->curlHandle = NULL; + } +ENDfreeInstance + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + +BEGINtryResume +CODESTARTtryResume +ENDtryResume + +rsRetVal +curlPost(instanceData *instance, uchar *message) +{ + CURLcode code; + CURL *curl = instance->curlHandle; + int length = strlen((char *)message); + + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length); + code = curl_easy_perform(curl); + switch (code) { + case CURLE_COULDNT_RESOLVE_HOST: + case CURLE_COULDNT_RESOLVE_PROXY: + case CURLE_COULDNT_CONNECT: + case CURLE_WRITE_ERROR: + STATSCOUNTER_INC(indexConFail, mutIndexConFail); + return RS_RET_SUSPENDED; + default: + STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); + return RS_RET_OK; + } +} + +BEGINdoAction +CODESTARTdoAction + CHKiRet(curlPost(pData, ppString[0])); +finalize_it: +ENDdoAction + +/* elasticsearch POST result string ... useful for debugging */ +size_t +curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) +{ + unsigned int i; + char *p = (char *)ptr; + char *jsonData = (char *)userdata; + static char ok[] = "{\"ok\":true,"; + + ASSERT(size == 1); + + if (size == 1 && + nmemb > sizeof(ok)-1 && + strncmp(p, ok, sizeof(ok)-1) == 0) { + STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); + } else { + STATSCOUNTER_INC(indexFailed, mutIndexFailed); + if (Debug) { + DBGPRINTF("omelasticsearch request: %s\n", jsonData); + DBGPRINTF("omelasticsearch result: "); + for (i = 0; i < nmemb; i++) + DBGPRINTF("%c", p[i]); + DBGPRINTF("\n"); + } + } + return size * nmemb; +} + +static rsRetVal +curlSetup(instanceData *instance) +{ + char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + HEADER *header; + CURL *handle; + + handle = curl_easy_init(); + if (handle == NULL) { + return RS_RET_OBJ_CREATION_FAILED; + } + + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + hostName, restPort, searchIndex, searchType); + header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); + curl_easy_setopt(handle, CURLOPT_URL, restURL); + curl_easy_setopt(handle, CURLOPT_POST, 1); + + instance->curlHandle = handle; + instance->postHeader = header; + + DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL); + return RS_RET_OK; +} + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); + } + p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + + /* check if a non-standard template is to be applied */ + if(*(p-1) == ';') + --p; + CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt")); + + /* all good, we can now initialise our private data */ + CHKiRet(curlSetup(pData)); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + +BEGINmodExit +CODESTARTmodExit + curl_global_cleanup(); + statsobj.Destruct(&indexStats); + objRelease(errmsg, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); +ENDmodExit + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + DEFiRet; + restPort = 9200; + hostName = "localhost"; + searchIndex = "system"; + searchType = "events"; + RETiRet; +} + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + + /* register config file handlers */ + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); + + if (curl_global_init(CURL_GLOBAL_ALL) != 0) { + errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled"); + ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); + } + + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&indexStats)); + CHKiRet(statsobj.SetName(indexStats, (uchar *)"elasticsearch")); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"connfail", + ctrType_IntCtr, &indexConFail)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submits", + ctrType_IntCtr, &indexSubmit)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed", + ctrType_IntCtr, &indexFailed)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"success", + ctrType_IntCtr, &indexSuccess)); + CHKiRet(statsobj.ConstructFinalize(indexStats)); +ENDmodInit + +/* vi:set ai: + */ diff --git a/runtime/parser.c b/runtime/parser.c index 14ccb49a..0c201fa5 100644 --- a/runtime/parser.c +++ b/runtime/parser.c @@ -60,6 +60,7 @@ DEFobjCurrIf(ruleset) /* config data */ static uchar cCCEscapeChar = '#';/* character to be used to start an escape sequence for control chars */ static int bEscapeCCOnRcv = 1; /* escape control characters on reception: 0 - no, 1 - yes */ +static int bSpaceLFOnRcv = 0; /* replace newlines with spaces on reception: 0 - no, 1 - yes */ static int bEscape8BitChars = 0; /* escape characters > 127 on reception: 0 - no, 1 - yes */ static int bEscapeTab = 1; /* escape tab control character when doing CC escapes: 0 - no, 1 - yes */ static int bDropTrailingLF = 1; /* drop trailing LF's on reception? */ @@ -354,9 +355,13 @@ SanitizeMsg(msg_t *pMsg) int bNeedSanitize = 0; for(iSrc = 0 ; iSrc < lenMsg ; iSrc++) { if(iscntrl(pszMsg[iSrc])) { + if(bSpaceLFOnRcv && pszMsg[iSrc] == '\n') + pszMsg[iSrc] = ' '; + else if(pszMsg[iSrc] == '\0' || bEscapeCCOnRcv) { bNeedSanitize = 1; - break; + if (!bSpaceLFOnRcv) + break; } } else if(pszMsg[iSrc] > 127 && bEscape8BitChars) { bNeedSanitize = 1; @@ -645,6 +650,7 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus { cCCEscapeChar = '#'; bEscapeCCOnRcv = 1; /* default is to escape control characters */ + bSpaceLFOnRcv = 0; bEscape8BitChars = 0; /* default is to escape control characters */ bEscapeTab = 1; /* default is to escape control characters */ bDropTrailingLF = 1; /* default is to drop trailing LF's on reception */ @@ -698,6 +704,7 @@ BEGINObjClassInit(parser, 1, OBJ_IS_CORE_MODULE) /* class, version */ CHKiRet(regCfSysLineHdlr((uchar *)"controlcharacterescapeprefix", 0, eCmdHdlrGetChar, NULL, &cCCEscapeChar, NULL, eConfObjGlobal)); CHKiRet(regCfSysLineHdlr((uchar *)"droptrailinglfonreception", 0, eCmdHdlrBinary, NULL, &bDropTrailingLF, NULL, eConfObjGlobal)); CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscapeCCOnRcv, NULL, eConfObjGlobal)); + CHKiRet(regCfSysLineHdlr((uchar *)"spacelfonreceive", 0, eCmdHdlrBinary, NULL, &bSpaceLFOnRcv, NULL, eConfObjGlobal)); CHKiRet(regCfSysLineHdlr((uchar *)"escape8bitcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscape8BitChars, NULL, eConfObjGlobal)); CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactertab", 0, eCmdHdlrBinary, NULL, &bEscapeTab, NULL, eConfObjGlobal)); CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL, eConfObjGlobal)); diff --git a/runtime/rsconf.c b/runtime/rsconf.c index 61e8ca96..c061f16f 100644 --- a/runtime/rsconf.c +++ b/runtime/rsconf.c @@ -96,6 +96,7 @@ static uchar template_StdUsrMsgFmt[] = "\" %syslogtag%%msg%\n\r\""; static uchar template_StdDBFmt[] = "\"insert into SystemEvents (Message, Facility, FromHost, Priority, DeviceReportedTime, ReceivedAt, InfoUnitID, SysLogTag) values ('%msg%', %syslogfacility%, '%HOSTNAME%', %syslogpriority%, '%timereported:::date-mysql%', '%timegenerated:::date-mysql%', %iut%, '%syslogtag%')\",SQL"; static uchar template_StdPgSQLFmt[] = "\"insert into SystemEvents (Message, Facility, FromHost, Priority, DeviceReportedTime, ReceivedAt, InfoUnitID, SysLogTag) values ('%msg%', %syslogfacility%, '%HOSTNAME%', %syslogpriority%, '%timereported:::date-pgsql%', '%timegenerated:::date-pgsql%', %iut%, '%syslogtag%')\",STDSQL"; static uchar template_spoofadr[] = "\"%fromhost-ip%\""; +static uchar template_StdJSONFmt[] = "\"{\\\"message\\\":\\\"%msg%\\\",\\\"fromhost\\\":\\\"%HOSTNAME%\\\",\\\"facility\\\":\\\"%syslogfacility-text%\\\",\\\"priority\\\":\\\"%syslogpriority-text%\\\",\\\"timereported\\\":\\\"%timereported:::date-rfc3339%\\\",\\\"timegenerated\\\":\\\"%timegenerated:::date-rfc3339%\\\"}\",JSON"; /* end templates */ void cnfDoCfsysline(char *ln); @@ -1184,6 +1185,8 @@ initLegacyConf(void) tplAddLine(ourConf, " StdDBFmt", &pTmp); pTmp = template_StdPgSQLFmt; tplAddLine(ourConf, " StdPgSQLFmt", &pTmp); + pTmp = template_StdJSONFmt; + tplAddLine(ourConf, " StdJSONFmt", &pTmp); pTmp = template_spoofadr; tplLastStaticInit(ourConf, tplAddLine(ourConf, "RSYSLOG_omudpspoofDfltSourceTpl", &pTmp)); @@ -50,6 +50,15 @@ DEFobjCurrIf(regexp) static int bFirstRegexpErrmsg = 1; /**< did we already do a "can't load regexp" error message? */ #endif +#warning check this merge +#if 1 +enum { + NO_ESCAPE = 0, + SQL_ESCAPE, + STDSQL_ESCAPE, + JSON_ESCAPE, +}; +#endif /* helper to tplToString and strgen's, extends buffer */ #define ALLOC_INC 128 @@ -120,10 +129,12 @@ rsRetVal tplToString(struct template *pTpl, msg_t *pMsg, uchar **ppBuf, size_t * * but they are handled in this way because of legacy (don't break any * existing thing). */ - if(pTpl->optFormatForSQL == 1) - doSQLEscape(&pVal, &iLenVal, &bMustBeFreed, 1); - else if(pTpl->optFormatForSQL == 2) - doSQLEscape(&pVal, &iLenVal, &bMustBeFreed, 0); + if(pTpl->optFormatEscape == SQL_ESCAPE) + doEscape(&pVal, &iLenVal, &bMustBeFreed, SQL_ESCAPE); + else if(pTpl->optFormatEscape == JSON_ESCAPE) + doEscape(&pVal, &iLenVal, &bMustBeFreed, JSON_ESCAPE); + else if(pTpl->optFormatEscape == STDSQL_ESCAPE) + doEscape(&pVal, &iLenVal, &bMustBeFreed, STDSQL_ESCAPE); } /* got source, now copy over */ if(iLenVal > 0) { /* may be zero depending on property */ @@ -211,27 +222,29 @@ finalize_it: } -/* Helper to doSQLEscape. This is called if doSQLEscape +/* Helper to doEscape. This is called if doEscape * runs out of memory allocating the escaped string. * Then we are in trouble. We can * NOT simply return the unmodified string because this * may cause SQL injection. But we also can not simply * abort the run, this would be a DoS. I think an appropriate - * measure is to remove the dangerous \' characters. We + * measure is to remove the dangerous \' characters (SQL). We * replace them by \", which will break the message and * signatures eventually present - but this is the * best thing we can do now (or does anybody * have a better idea?). rgerhards 2004-11-23 - * added support for "escapeMode" (so doSQLEscape for details). - * if mode = 1, then backslashes are changed to slashes. + * added support for escape mode (see doEscape for details). + * if mode = SQL_ESCAPE, then backslashes are changed to slashes. * rgerhards 2005-09-22 */ -static void doSQLEmergencyEscape(register uchar *p, int escapeMode) +static void doEmergencyEscape(register uchar *p, int mode) { while(*p) { - if(*p == '\'') + if((mode == SQL_ESCAPE||mode == STDSQL_ESCAPE) && *p == '\'') *p = '"'; - else if((escapeMode == 1) && (*p == '\\')) + else if((mode == JSON_ESCAPE) && *p == '"') + *p = '\''; + else if((mode == SQL_ESCAPE) && *p == '\\') *p = '/'; ++p; } @@ -256,14 +269,16 @@ static void doSQLEmergencyEscape(register uchar *p, int escapeMode) * smartness depends on config settings. So we add a new option to this * function that allows the caller to select if they want to standard or * "smart" encoding ;) - * new parameter escapeMode is 0 - standard sql, 1 - "smart" engines + * -- + * Parameter "mode" is STDSQL_ESCAPE, SQL_ESCAPE "smart" SQL engines, or + * JSON_ESCAPE for everyone requiring escaped JSON (e.g. ElasticSearch). * 2005-09-22 rgerhards */ rsRetVal -doSQLEscape(uchar **pp, size_t *pLen, unsigned short *pbMustBeFreed, int escapeMode) +doEscape(uchar **pp, size_t *pLen, unsigned short *pbMustBeFreed, int mode) { DEFiRet; - uchar *p; + uchar *p = NULL; int iLen; cstr_t *pStrB = NULL; uchar *pszGenerated; @@ -274,26 +289,32 @@ doSQLEscape(uchar **pp, size_t *pLen, unsigned short *pbMustBeFreed, int escapeM assert(pbMustBeFreed != NULL); /* first check if we need to do anything at all... */ - if(escapeMode == 0) + if(mode == STDSQL_ESCAPE) for(p = *pp ; *p && *p != '\'' ; ++p) ; - else + else if(mode == SQL_ESCAPE) for(p = *pp ; *p && *p != '\'' && *p != '\\' ; ++p) ; + else if(mode == JSON_ESCAPE) + for(p = *pp ; *p && *p != '"' ; ++p) + ; /* when we get out of the loop, we are either at the - * string terminator or the first \'. */ - if(*p == '\0') + * string terminator or the first character to escape */ + if(p && *p == '\0') FINALIZE; /* nothing to do in this case! */ p = *pp; iLen = *pLen; CHKiRet(cstrConstruct(&pStrB)); - + while(*p) { - if(*p == '\'') { - CHKiRet(cstrAppendChar(pStrB, (escapeMode == 0) ? '\'' : '\\')); + if((mode == SQL_ESCAPE || mode == STDSQL_ESCAPE) && *p == '\'') { + CHKiRet(cstrAppendChar(pStrB, (mode == STDSQL_ESCAPE) ? '\'' : '\\')); + iLen++; /* reflect the extra character */ + } else if((mode == SQL_ESCAPE) && *p == '\\') { + CHKiRet(cstrAppendChar(pStrB, '\\')); iLen++; /* reflect the extra character */ - } else if((escapeMode == 1) && (*p == '\\')) { + } else if((mode == JSON_ESCAPE) && *p == '"') { CHKiRet(cstrAppendChar(pStrB, '\\')); iLen++; /* reflect the extra character */ } @@ -312,7 +333,7 @@ doSQLEscape(uchar **pp, size_t *pLen, unsigned short *pbMustBeFreed, int escapeM finalize_it: if(iRet != RS_RET_OK) { - doSQLEmergencyEscape(*pp, escapeMode); + doEmergencyEscape(*pp, mode); if(pStrB != NULL) cstrDestruct(&pStrB); } @@ -889,11 +910,14 @@ tplAddTplMod(struct template *pTpl, uchar** ppRestOfConfLine) * acknowledged implementing the option. -- rgerhards, 2011-03-21 */ if(lenMod > 6 && !strcasecmp((char*) szMod + lenMod - 7, ",stdsql")) { - pTpl->optFormatForSQL = 2; - DBGPRINTF("strgen suports the stdsql option\n"); + pTpl->optFormatEscape = STDSQL_ESCAPE; + DBGPRINTF("strgen supports the stdsql option\n"); } else if(lenMod > 3 && !strcasecmp((char*) szMod+ lenMod - 4, ",sql")) { - pTpl->optFormatForSQL = 1; - DBGPRINTF("strgen suports the sql option\n"); + pTpl->optFormatEscape = SQL_ESCAPE; + DBGPRINTF("strgen supports the sql option\n"); + } else if(lenMod > 4 && !strcasecmp((char*) szMod+ lenMod - 4, ",json")) { + pTpl->optFormatEscape = JSON_ESCAPE; + DBGPRINTF("strgen supports the json option\n"); } finalize_it: @@ -1021,11 +1045,13 @@ struct template *tplAddLine(rsconf_t *conf, char* pName, uchar** ppRestOfConfLin * it anyhow... ;) rgerhards 2004-11-22 */ if(!strcmp(optBuf, "stdsql")) { - pTpl->optFormatForSQL = 2; + pTpl->optFormatEscape = STDSQL_ESCAPE; + } else if(!strcmp(optBuf, "json")) { + pTpl->optFormatEscape = JSON_ESCAPE; } else if(!strcmp(optBuf, "sql")) { - pTpl->optFormatForSQL = 1; + pTpl->optFormatEscape = SQL_ESCAPE; } else if(!strcmp(optBuf, "nosql")) { - pTpl->optFormatForSQL = 0; + pTpl->optFormatEscape = NO_ESCAPE; } else { dbgprintf("Invalid option '%s' ignored.\n", optBuf); } @@ -1190,9 +1216,11 @@ void tplPrintList(rsconf_t *conf) pTpl = conf->templates.root; while(pTpl != NULL) { dbgprintf("Template: Name='%s' ", pTpl->pszName == NULL? "NULL" : pTpl->pszName); - if(pTpl->optFormatForSQL == 1) + if(pTpl->optFormatEscape == SQL_ESCAPE) dbgprintf("[SQL-Format (MySQL)] "); - else if(pTpl->optFormatForSQL == 2) + else if(pTpl->optFormatEscape == JSON_ESCAPE) + dbgprintf("[JSON-Escaped Format] "); + else if(pTpl->optFormatEscape == STDSQL_ESCAPE) dbgprintf("[SQL-Format (standard SQL)] "); dbgprintf("\n"); pTpe = pTpl->pEntryRoot; @@ -37,9 +37,10 @@ struct template { int tpenElements; /* number of elements in templateEntry list */ struct templateEntry *pEntryRoot; struct templateEntry *pEntryLast; - char optFormatForSQL; /* in text fields, 0 - do not escape, - * 1 - escape quotes by double quotes, - * 2 - escape "the MySQL way" + char optFormatEscape; /* in text fields, 0 - do not escape, + * 1 - escape "the MySQL way" + * 2 - escape quotes by double quotes, + * 3 - escape double quotes for JSON. */ /* following are options. All are 0/1 defined (either on or off). * we use chars because they are faster than bit fields and smaller @@ -133,7 +134,7 @@ rsRetVal ExtendBuf(uchar **pBuf, size_t *pLenBuf, size_t iMinSize); */ rsRetVal tplToArray(struct template *pTpl, msg_t *pMsg, uchar*** ppArr); rsRetVal tplToString(struct template *pTpl, msg_t *pMsg, uchar** ppSz, size_t *); -rsRetVal doSQLEscape(uchar **pp, size_t *pLen, unsigned short *pbMustBeFreed, int escapeMode); +rsRetVal doEscape(uchar **pp, size_t *pLen, unsigned short *pbMustBeFreed, int escapeMode); rsRetVal templateInit(); diff --git a/tools/omfwd.c b/tools/omfwd.c index 8669a8de..cbde234b 100644 --- a/tools/omfwd.c +++ b/tools/omfwd.c @@ -4,24 +4,23 @@ * NOTE: read comments in module-template.h to understand how this file * works! * - * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2012 Adiscon GmbH. * * This file is part of rsyslog. * - * Rsyslog is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Rsyslog is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. - * - * A copy of the GPL can be found in the file "COPYING" in this distribution. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * * TODO v6 config: * - permitted peer *list* @@ -172,22 +171,6 @@ pData->bIsConnected = 0; // TODO: remove this variable altogether } -/* get the syslog forward port from selector_t. The passed in - * struct must be one that is setup for forwarding. - * rgerhards, 2007-06-28 - * We may change the implementation to try to lookup the port - * if it is unspecified. So far, we use the IANA default auf 514. - */ -static char *getFwdPt(instanceData *pData) -{ - assert(pData != NULL); - if(pData->port == NULL) - return("514"); - else - return(pData->port); -} - - /* destruct the TCP helper objects * This, for example, is needed after something went wrong. * This function is void because it "can not" fail. @@ -420,7 +403,7 @@ static rsRetVal TCPSendInit(void *pvData) } /* params set, now connect */ CHKiRet(netstrm.Connect(pData->pNetstrm, glbl.GetDefPFFamily(), - (uchar*)getFwdPt(pData), (uchar*)pData->target)); + (uchar*)pData->port, (uchar*)pData->target)); } finalize_it: @@ -453,9 +436,9 @@ static rsRetVal doTryResume(instanceData *pData) hints.ai_flags = AI_NUMERICSERV; hints.ai_family = glbl.GetDefPFFamily(); hints.ai_socktype = SOCK_DGRAM; - if((iErr = (getaddrinfo(pData->target, getFwdPt(pData), &hints, &res))) != 0) { + if((iErr = (getaddrinfo(pData->target, pData->port, &hints, &res))) != 0) { dbgprintf("could not get addrinfo for hostname '%s':'%s': %d%s\n", - pData->target, getFwdPt(pData), iErr, gai_strerror(iErr)); + pData->target, pData->port, iErr, gai_strerror(iErr)); ABORT_FINALIZE(RS_RET_SUSPENDED); } dbgprintf("%s found, resuming.\n", pData->target); @@ -494,15 +477,18 @@ ENDbeginTransaction BEGINdoAction - char *psz = NULL; /* temporary buffering */ + char *psz; /* temporary buffering */ register unsigned l; int iMaxLine; +# ifdef USE_NETZIP + Bytef *out = NULL; /* for compression */ +# endif CODESTARTdoAction CHKiRet(doTryResume(pData)); iMaxLine = glbl.GetMaxLine(); - dbgprintf(" %s:%s/%s\n", pData->target, getFwdPt(pData), + dbgprintf(" %s:%s/%s\n", pData->target, pData->port, pData->protocol == FORW_UDP ? "udp" : "tcp"); psz = (char*) ppString[0]; @@ -520,7 +506,6 @@ CODESTARTdoAction * rgerhards, 2006-11-30 */ if(pData->compressionLevel && (l > CONF_MIN_SIZE_FOR_COMPRESS)) { - Bytef *out; uLongf destLen = iMaxLine + iMaxLine/100 +12; /* recommended value from zlib doc */ uLong srcLen = l; int ret; @@ -541,14 +526,11 @@ CODESTARTdoAction * rgerhards, 2006-11-30 */ dbgprintf("Compression failed, sending uncompressed message\n"); - free(out); } else if(destLen+1 < l) { /* only use compression if there is a gain in using it! */ dbgprintf("there is gain in compression, so we do it\n"); psz = (char*) out; l = destLen + 1; /* take care for the "z" at message start! */ - } else { - free(out); } ++destLen; } @@ -569,10 +551,7 @@ CODESTARTdoAction } finalize_it: # ifdef USE_NETZIP - if((psz != NULL) && (psz != (char*) ppString[0])) { - /* we need to free temporary buffer, alloced above - Naoya Nakazawa, 2010-01-11 */ - free(psz); - } + free(out); /* is NULL if it was never used... */ # endif ENDdoAction @@ -900,12 +879,16 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) if(pData->port == NULL) { errmsg.LogError(0, NO_ERRCODE, "Could not get memory to store syslog forwarding port, " "using default port, results may not be what you intend\n"); - /* we leave f_forw.port set to NULL, this is then handled by getFwdPt(). */ + /* we leave f_forw.port set to NULL, this is then handled below */ } else { memcpy(pData->port, tmp, i); *(pData->port + i) = '\0'; } } + /* check if no port is set. If so, we use the IANA-assigned port of 514 */ + if(pData->port == NULL) { + CHKmalloc(pData->port = strdup("514")); + } /* now skip to template */ while(*p && *p != ';' && *p != '#' && !isspace((int) *p)) diff --git a/tools/syslogd.c b/tools/syslogd.c index a24bef68..4cfbd377 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -356,8 +356,15 @@ void untty(void) #else { int i; + pid_t pid; if(!Debug) { + pid = getpid(); + if (setpgid(pid, pid) < 0) { + perror("setpgid"); + exit(1); + } + i = open(_PATH_TTY, O_RDWR|O_CLOEXEC); if (i >= 0) { # if !defined(__hpux) @@ -1382,7 +1389,6 @@ static void printVersion(void) } - /* Method to initialize all global classes and use the objects that we need. * rgerhards, 2008-01-04 * rgerhards, 2008-04-16: the actual initialization is now carried out by the runtime |