diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2012-01-18 14:51:33 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2012-01-18 14:51:33 +0100 |
commit | 03be2fcd7cfe3355b8108fe8368a6a65ab98e9e9 (patch) | |
tree | 5b56babfece1bc2f5e3ee6cbf030859b72232eca /plugins | |
parent | ac9afc4149db314d9c480232d70216960342e3e4 (diff) | |
parent | ef34821a2737799f48c3032b9616418e4f7fa34f (diff) | |
download | rsyslog-03be2fcd7cfe3355b8108fe8368a6a65ab98e9e9.tar.gz rsyslog-03be2fcd7cfe3355b8108fe8368a6a65ab98e9e9.tar.xz rsyslog-03be2fcd7cfe3355b8108fe8368a6a65ab98e9e9.zip |
Merge branch 'v5-devel' into master
Conflicts:
ChangeLog
Makefile.am
configure.ac
doc/manual.html
plugins/imptcp/imptcp.c
plugins/imudp/imudp.c
plugins/imuxsock/imuxsock.c
runtime/parser.c
template.c
tools/omfwd.c
tools/syslogd.c
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imptcp/imptcp.c | 59 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 146 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 6 | ||||
-rw-r--r-- | plugins/omelasticsearch/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 270 |
5 files changed, 417 insertions, 72 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 428f5123..f5868311 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -66,6 +66,7 @@ #include "datetime.h" #include "ruleset.h" #include "msg.h" +#include "statsobj.h" #include "net.h" /* for permittedPeers, may be removed when this is removed */ /* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */ @@ -84,6 +85,7 @@ DEFobjCurrIf(prop) DEFobjCurrIf(datetime) DEFobjCurrIf(errmsg) DEFobjCurrIf(ruleset) +DEFobjCurrIf(statsobj) /* forward references */ static void * wrkr(void *myself); @@ -161,7 +163,8 @@ struct ptcpsrv_s { * includes support for doubly-linked list. */ struct ptcpsess_s { - ptcpsrv_t *pSrv; /* our server */ +// ptcpsrv_t *pSrv; /* our server TODO: check remove! */ + ptcplstn_t *pLstn; /* our listener */ ptcpsess_t *prev, *next; int sock; epolld_t *epd; @@ -189,6 +192,8 @@ struct ptcplstn_s { ptcplstn_t *prev, *next; int sock; epolld_t *epd; + statsobj_t *stats; /* listener stats */ + STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) }; @@ -230,7 +235,7 @@ static int iMaxLine; /* maximum size of a single message */ /* forward definitions */ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal); -static rsRetVal addLstn(ptcpsrv_t *pSrv, int sock); +static rsRetVal addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6); /* some simple constructors/destructors */ @@ -276,6 +281,7 @@ startupSrv(ptcpsrv_t *pSrv) int sockflags; struct addrinfo hints, *res = NULL, *r; uchar *lstnIP; + int isIPv6 = 0; lstnIP = pSrv->lstnIP == NULL ? UCHAR_CONSTANT("") : pSrv->lstnIP; @@ -308,8 +314,9 @@ startupSrv(ptcpsrv_t *pSrv) continue; } -#ifdef IPV6_V6ONLY if(r->ai_family == AF_INET6) { + isIPv6 = 1; +#ifdef IPV6_V6ONLY int iOn = 1; if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&iOn, sizeof (iOn)) < 0) { @@ -317,8 +324,8 @@ startupSrv(ptcpsrv_t *pSrv) sock = -1; continue; } - } #endif + } if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)) < 0 ) { DBGPRINTF("error %d setting tcp socket option\n", errno); close(sock); @@ -380,7 +387,7 @@ startupSrv(ptcpsrv_t *pSrv) /* if we reach this point, we were able to obtain a valid socket, so we can * create our listener object. -- rgerhards, 2010-08-10 */ - CHKiRet(addLstn(pSrv, sock)); + CHKiRet(addLstn(pSrv, sock, isIPv6)); ++numSocks; } @@ -607,22 +614,25 @@ static rsRetVal doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub) { msg_t *pMsg; + ptcpsrv_t *pSrv; DEFiRet; if(pThis->iMsg == 0) { DBGPRINTF("discarding zero-sized message\n"); FINALIZE; } + pSrv = pThis->pLstn->pSrv; /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pThis->pMsg, pThis->iMsg); - MsgSetInputName(pMsg, pThis->pSrv->pInputName); + MsgSetInputName(pMsg, pSrv->pInputName); MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; MsgSetRcvFrom(pMsg, pThis->peerName); CHKiRet(MsgSetRcvFromIP(pMsg, pThis->peerIP)); - MsgSetRuleset(pMsg, pThis->pSrv->pRuleset); + MsgSetRuleset(pMsg, pSrv->pRuleset); + STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit); if(pMultiSub == NULL) { CHKiRet(submitMsg(pMsg)); @@ -704,7 +714,8 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG } if(( (c == '\n') - || ((pThis->pSrv->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->iAddtlFrameDelim)) + || ((pThis->pLstn->pSrv->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) + && (c == pThis->pLstn->pSrv->iAddtlFrameDelim)) ) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */ doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); pThis->inputState = eAtStrtFram; @@ -864,14 +875,25 @@ finalize_it: /* add a listener to the server */ static rsRetVal -addLstn(ptcpsrv_t *pSrv, int sock) +addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6) { DEFiRet; ptcplstn_t *pLstn; + uchar statname[64]; CHKmalloc(pLstn = malloc(sizeof(ptcplstn_t))); pLstn->pSrv = pSrv; pLstn->sock = sock; + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&(pLstn->stats))); + snprintf((char*)statname, sizeof(statname), "imptcp(%s/%s/%s)", + (pSrv->lstnIP == NULL) ? "*" : (char*)pSrv->lstnIP, pSrv->port, + isIPv6 ? "IPv6" : "IPv4"); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(pLstn->stats, statname)); + CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &(pLstn->ctrSubmit))); + CHKiRet(statsobj.ConstructFinalize(pLstn->stats)); /* add to start of server's listener list */ pLstn->prev = NULL; @@ -890,14 +912,15 @@ finalize_it: /* add a session to the server */ static rsRetVal -addSess(ptcpsrv_t *pSrv, int sock, prop_t *peerName, prop_t *peerIP) +addSess(ptcplstn_t *pLstn, int sock, prop_t *peerName, prop_t *peerIP) { DEFiRet; ptcpsess_t *pSess = NULL; + ptcpsrv_t *pSrv = pLstn->pSrv; CHKmalloc(pSess = malloc(sizeof(ptcpsess_t))); CHKmalloc(pSess->pMsg = malloc(iMaxLine * sizeof(uchar))); - pSess->pSrv = pSrv; + pSess->pLstn = pLstn; pSess->sock = sock; pSess->inputState = eAtStrtFram; pSess->iMsg = 0; @@ -935,17 +958,17 @@ closeSess(ptcpsess_t *pSess) CHKiRet(removeEPollSock(sock, pSess->epd)); close(sock); - pthread_mutex_lock(&pSess->pSrv->mutSessLst); + pthread_mutex_lock(&pSess->pLstn->pSrv->mutSessLst); /* finally unlink session from structures */ if(pSess->next != NULL) pSess->next->prev = pSess->prev; if(pSess->prev == NULL) { /* need to update root! */ - pSess->pSrv->pSess = pSess->next; + pSess->pLstn->pSrv->pSess = pSess->next; } else { pSess->prev->next = pSess->next; } - pthread_mutex_unlock(&pSess->pSrv->mutSessLst); + pthread_mutex_unlock(&pSess->pLstn->pSrv->mutSessLst); /* unlinked, now remove structure */ destructSess(pSess); @@ -1149,7 +1172,7 @@ lstnActivity(ptcplstn_t *pLstn) if(localRet == RS_RET_NO_MORE_DATA || glbl.GetGlobalInputTermState() == 1) break; CHKiRet(localRet); - CHKiRet(addSess(pLstn->pSrv, newSock, peerName, peerIP)); + CHKiRet(addSess(pLstn, newSock, peerName, peerIP)); } finalize_it: @@ -1180,7 +1203,7 @@ sessActivity(ptcpsess_t *pSess) CHKiRet(DataRcvd(pSess, rcvBuf, lenRcv)); } else if (lenRcv == 0) { /* session was closed, do clean-up */ - if(pSess->pSrv->bEmitMsgOnClose) { + if(pSess->pLstn->pSrv->bEmitMsgOnClose) { uchar *peerName; int lenPeer; prop.GetString(pSess->peerName, &peerName, &lenPeer); @@ -1445,6 +1468,8 @@ shutdownSrv(ptcpsrv_t *pSrv) pLstn = pSrv->pLstn; while(pLstn != NULL) { close(pLstn->sock); + statsobj.Destruct(&(pLstn->stats)); + /* now unlink listner */ lstnDel = pLstn; pLstn = pLstn->next; DBGPRINTF("imptcp shutdown listen socket %d\n", lstnDel->sock); @@ -1487,6 +1512,7 @@ CODESTARTmodExit pthread_attr_destroy(&wrkrThrdAttr); /* release objects we used */ objRelease(glbl, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); objRelease(net, LM_NET_FILENAME); objRelease(datetime, CORE_COMPONENT); @@ -1535,6 +1561,7 @@ CODESTARTmodInit CODEmodInit_QueryRegCFSLineHdlr /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index badad949..14032e1f 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -26,6 +26,7 @@ * A copy of the GPL can be found in the file "COPYING" in this distribution. */ #include "config.h" +#include <stdio.h> #include <stdlib.h> #include <assert.h> #include <string.h> @@ -70,8 +71,14 @@ DEFobjCurrIf(prop) DEFobjCurrIf(ruleset) DEFobjCurrIf(statsobj) -statsobj_t *modStats; -STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) + +static struct lstn_s { + struct lstn_s *next; + int sock; /* socket */ + ruleset_t *pRuleset; /* bound ruleset */ + statsobj_t *stats; /* listener stats */ + STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) +} *lcnfRoot = NULL, *lcnfLast = NULL; static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */ static int iMaxLine; /* maximum UDP message size supported */ @@ -79,9 +86,6 @@ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitte * This shall prevent remote DoS when the "discard on disallowed sender" * message is configured to be logged on occurance of such a case. */ -static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements - * read-only after init(), but beware of restart! */ -static ruleset_t **udpRulesets = NULL; /* ruleset to be used with sockets in question (entry 0 is empty) */ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc * it so that we can check available memory in willRun() and request * termination if we can not get it. -- rgerhards, 2007-12-27 @@ -170,14 +174,18 @@ static inline rsRetVal addListner(instanceConf_t *inst) { DEFiRet; + uchar *bindAddr; int *newSocks; - int *tmpSocks; - int iSrc, iDst; - ruleset_t **tmpRulesets; + int iSrc; + struct lstn_s *newlcnfinfo; + uchar *bindName; + uchar *port; + uchar statname[64]; /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 */ +#if 0 //<<<<<<< HEAD DBGPRINTF("imudp: trying to open port at %s:%s.\n", (inst->pszBindAddr == NULL) ? (uchar*)"*" : inst->pszBindAddr, inst->pszBindPort); @@ -222,10 +230,50 @@ addListner(instanceConf_t *inst) free(udpRulesets); udpRulesets = tmpRulesets; } +#else //======= + if(inst->pszBindAddr == NULL) + bindAddr = NULL; + else if(inst->pszBindAddr[0] == '*' && inst->pszBindAddr[1] == '\0') + bindAddr = NULL; + else + bindAddr = inst->pszBindAddr; + bindName = (bindAddr == NULL) ? (uchar*)"*" : bindAddr; + port = (inst->pszBindPort == NULL || *inst->pszBindPort == '\0') ? (uchar*) "514" : inst->pszBindPort; + + DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", bindName, inst->pszBindPort); + + newSocks = net.create_udp_socket(bindAddr, port, 1); + if(newSocks != NULL) { + /* we now need to add the new sockets to the existing set */ + /* ready to copy */ + for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) { + CHKmalloc(newlcnfinfo = (struct lstn_s*) MALLOC(sizeof(struct lstn_s))); + newlcnfinfo->next = NULL; + newlcnfinfo->sock = newSocks[iSrc]; + newlcnfinfo->pRuleset = inst->pBindRuleset; + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&(newlcnfinfo->stats))); + snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname)); + CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &(newlcnfinfo->ctrSubmit))); + CHKiRet(statsobj.ConstructFinalize(newlcnfinfo->stats)); + /* link to list. Order must be preserved to take care for + * conflicting matches. + */ + if(lcnfRoot == NULL) + lcnfRoot = newlcnfinfo; + if(lcnfLast == NULL) + lcnfLast = newlcnfinfo; + else + lcnfLast->next = newlcnfinfo; +#endif //>>>>>>> ef34821a2737799f48c3032b9616418e4f7fa34f } } finalize_it: + free(newSocks); RETiRet; } @@ -255,8 +303,7 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta * on scheduling order. -- rgerhards, 2008-10-02 */ static inline rsRetVal -processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, - ruleset_t *pRuleset) +processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) { DEFiRet; int iNbrTimeUsed; @@ -276,7 +323,7 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, if(pThrd->bShallStop == TRUE) ABORT_FINALIZE(RS_RET_FORCE_TERM); socklen = sizeof(struct sockaddr_storage); - lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); + lenRcvBuf = recvfrom(lstn->sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); if(lenRcvBuf < 0) { if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); @@ -321,7 +368,7 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, *pbIsPermitted = 1; /* no check -> everything permitted */ } - DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", fd, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); + DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); if(*pbIsPermitted != 0) { if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) { @@ -331,14 +378,14 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); MsgSetInputName(pMsg, pInputName); - MsgSetRuleset(pMsg, pRuleset); + MsgSetRuleset(pMsg, lstn->pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; if(*pbIsPermitted == 2) pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); CHKiRet(submitMsg(pMsg)); - STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit); + STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); } } @@ -491,6 +538,8 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) struct epoll_event *udpEPollEvt = NULL; struct epoll_event currEvt[NUM_EPOLL_EVENTS]; char errStr[1024]; + struct lstn_s *lstn; + int nLstn; /* start "name caching" algo by making sure the previous system indicator * is invalidated. @@ -498,7 +547,11 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); - CHKmalloc(udpEPollEvt = calloc(udpLstnSocks[0], sizeof(struct epoll_event))); + /* count num listeners -- do it here in order to avoid inconsistency */ + nLstn = 0; + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) + ++nLstn; + CHKmalloc(udpEPollEvt = calloc(nLstn, sizeof(struct epoll_event))); #if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) DBGPRINTF("imudp uses epoll_create1()\n"); @@ -518,16 +571,18 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) /* fill the epoll set - we need to do this only once, as the set * can not change dyamically. */ - for (i = 0; i < *udpLstnSocks; i++) { - if (udpLstnSocks[i+1] != -1) { + i = 0; + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) { + if(lstn->sock != -1) { udpEPollEvt[i].events = EPOLLIN | EPOLLET; - udpEPollEvt[i].data.u64 = i+1; - if(epoll_ctl(efd, EPOLL_CTL_ADD, udpLstnSocks[i+1], &(udpEPollEvt[i])) < 0) { + udpEPollEvt[i].data.u64 = (long long unsigned) lstn; + if(epoll_ctl(efd, EPOLL_CTL_ADD, lstn->sock, &(udpEPollEvt[i])) < 0) { rs_strerror_r(errno, errStr, sizeof(errStr)); errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n", - udpLstnSocks[i+1], errStr); + lstn->sock, errStr); } } + i++; } while(1) { @@ -539,8 +594,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) break; /* terminate input! */ for(i = 0 ; i < nfds ; ++i) { - processSocket(pThrd, udpLstnSocks[currEvt[i].data.u64], &frominetPrev, &bIsPermitted, - udpRulesets[currEvt[i].data.u64]); + processSocket(pThrd, (struct lstn_s*)currEvt[i].data.u64, &frominetPrev, &bIsPermitted); } } @@ -557,10 +611,10 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) DEFiRet; int maxfds; int nfds; - int i; fd_set readfds; struct sockaddr_storage frominetPrev; int bIsPermitted; + struct lstn_s *lstn; /* start "name caching" algo by making sure the previous system indicator * is invalidated. @@ -571,20 +625,17 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) while(1) { /* Add the Unix Domain Sockets to the list of read descriptors. - * rgerhards 2005-08-01: we must now check if there are - * any local sockets to listen to at all. If the -o option - * is given without -a, we do not need to listen at all.. */ maxfds = 0; FD_ZERO(&readfds); /* Add the UDP listen sockets to the list of read descriptors. */ - for (i = 0; i < *udpLstnSocks; i++) { - if (udpLstnSocks[i+1] != -1) { + for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) { + if (lstn->sock != -1) { if(Debug) - net.debugListenInfo(udpLstnSocks[i+1], "UDP"); - FD_SET(udpLstnSocks[i+1], &readfds); - if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1]; + net.debugListenInfo(lstn->sock, "UDP"); + FD_SET(lstn->sock, &readfds); + if(lstn->sock>maxfds) maxfds=lstn->sock; } } if(Debug) { @@ -600,10 +651,9 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd) if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ - for(i = 0; nfds && i < *udpLstnSocks; i++) { - if(FD_ISSET(udpLstnSocks[i+1], &readfds)) { - processSocket(pThrd, udpLstnSocks[i+1], &frominetPrev, &bIsPermitted, - udpRulesets[i+1]); + for(lstn = lcnfRoot ; nfds && lstn != NULL ; lstn = lstn->next) { + if(FD_ISSET(lstn->sock, &readfds)) { + processSocket(pThrd, lstn, &frominetPrev, &bIsPermitted); --nfds; /* indicate we have processed one descriptor */ } } @@ -677,7 +727,7 @@ CODESTARTactivateCnfPrePrivDrop addListner(inst); } /* if we could not set up any listners, there is no point in running... */ - if(udpLstnSocks == NULL) { + if(lcnfRoot == NULL) { errmsg.LogError(0, NO_ERRCODE, "imudp: no listeners could be started, " "input not activated.\n"); ABORT_FINALIZE(RS_RET_NO_RUN); @@ -702,7 +752,7 @@ CODESTARTfreeCnf ENDfreeCnf /* This function is called to gather input. - * Note that udpLstnSocks must be non-NULL because otherwise we would not have + * Note that sock must be non-NULL because otherwise we would not have * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02 */ BEGINrunInput @@ -720,15 +770,18 @@ ENDwillRun BEGINafterRun + struct lstn_s *lstn, *lstnDel; CODESTARTafterRun /* do cleanup here */ net.clearAllowedSenders((uchar*)"UDP"); - if(udpLstnSocks != NULL) { - net.closeUDPListenSockets(udpLstnSocks); - udpLstnSocks = NULL; - free(udpRulesets); - udpRulesets = NULL; + for(lstn = lcnfRoot ; lstn != NULL ; ) { + statsobj.Destruct(&(lstn->stats)); + close(lstn->sock); + lstnDel = lstn; + lstn = lstn->next; + free(lstnDel); } + lcnfRoot = lcnfLast = NULL; if(pRcvBuf != NULL) { free(pRcvBuf); pRcvBuf = NULL; @@ -741,8 +794,6 @@ CODESTARTmodExit if(pInputName != NULL) prop.Destruct(&pInputName); - statsobj.Destruct(&modStats); - /* release what we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); @@ -821,13 +872,6 @@ CODEmodInit_QueryRegCFSLineHdlr NULL, &cs.iTimeRequery, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); - - /* support statistics gathering */ - CHKiRet(statsobj.Construct(&modStats)); - CHKiRet(statsobj.SetName(modStats, UCHAR_CONSTANT("imudp"))); - CHKiRet(statsobj.AddCounter(modStats, UCHAR_CONSTANT("submitted"), - ctrType_IntCtr, &ctrSubmit)); - CHKiRet(statsobj.ConstructFinalize(modStats)); ENDmodInit /* vim:set ai: */ diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index c5d6b1f1..22ead0cf 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -158,7 +158,7 @@ static int sd_fds = 0; /* number of systemd activated sockets */ /* config vars for legacy config system */ #define DFLT_bCreatePath 0 -#define DFLT_ratelimitInterval 5 +#define DFLT_ratelimitInterval 0 #define DFLT_ratelimitBurst 200 #define DFLT_ratelimitSeverity 1 /* do not rate-limit emergency messages */ static struct configSettings_s { @@ -799,7 +799,6 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim lenRcv = toffs + 1; } - /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, &st, tt)); MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv); @@ -820,8 +819,6 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim * datestamp or not .. and advance the parse pointer accordingly. */ datetime.ParseTIMESTAMP3164(&dummyTS, &parse, &lenMsg); - parse += 16; /* just skip timestamp */ - lenMsg -= 16; } else { if(datetime.ParseTIMESTAMP3164(&(pMsg->tTIMESTAMP), &parse, &lenMsg) != RS_RET_OK) { DBGPRINTF("we have a problem, invalid timestamp in msg!\n"); @@ -1161,7 +1158,6 @@ CODESTARTafterRun /* Clean-up files. */ for(i = startIndexUxLocalSockets; i < nfd; i++) if (listeners[i].sockName && listeners[i].fd != -1) { - /* If systemd passed us a socket it is systemd's job to clean it up. * Do not unlink it -- we will get same socket (node) from systemd * e.g. on restart again. diff --git a/plugins/omelasticsearch/Makefile.am b/plugins/omelasticsearch/Makefile.am new file mode 100644 index 00000000..a574c72f --- /dev/null +++ b/plugins/omelasticsearch/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = omelasticsearch.la + +omelasticsearch_la_SOURCES = omelasticsearch.c +omelasticsearch_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) +omelasticsearch_la_LDFLAGS = -module -avoid-version +omelasticsearch_la_LIBADD = $(CURL_LIBS) + +EXTRA_DIST = diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c new file mode 100644 index 00000000..3bec1838 --- /dev/null +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -0,0 +1,270 @@ +/* omelasticsearch.c + * This is the http://www.elasticsearch.org/ output module. + * + * NOTE: read comments in module-template.h for more specifics! + * + * Copyright 2011 Nathan Scott. + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * -or- + * see COPYING.ASL20 in the source distribution + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <curl/curl.h> +#include <curl/types.h> +#include <curl/easy.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <time.h> +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "statsobj.h" +#include "cfsysline.h" + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP + +/* internal structures */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(statsobj) + +statsobj_t *indexStats; +STATSCOUNTER_DEF(indexConFail, mutIndexConFail) +STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit) +STATSCOUNTER_DEF(indexFailed, mutIndexFailed) +STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) + +/* REST API for elasticsearch hits this URL: + * http://<hostName>:<restPort>/<searchIndex>/<searchType> + */ +typedef struct curl_slist HEADER; +typedef struct _instanceData { + CURL *curlHandle; /* libcurl session handle */ + HEADER *postHeader; /* json POST request info */ +} instanceData; + +/* config variables */ +static int restPort = 9200; +static char *hostName = "localhost"; +static char *searchIndex = "system"; +static char *searchType = "events"; + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + +BEGINfreeInstance +CODESTARTfreeInstance + if (pData->postHeader) { + curl_slist_free_all(pData->postHeader); + pData->postHeader = NULL; + } + if (pData->curlHandle) { + curl_easy_cleanup(pData->curlHandle); + pData->curlHandle = NULL; + } +ENDfreeInstance + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + +BEGINtryResume +CODESTARTtryResume +ENDtryResume + +rsRetVal +curlPost(instanceData *instance, uchar *message) +{ + CURLcode code; + CURL *curl = instance->curlHandle; + int length = strlen((char *)message); + + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, length); + code = curl_easy_perform(curl); + switch (code) { + case CURLE_COULDNT_RESOLVE_HOST: + case CURLE_COULDNT_RESOLVE_PROXY: + case CURLE_COULDNT_CONNECT: + case CURLE_WRITE_ERROR: + STATSCOUNTER_INC(indexConFail, mutIndexConFail); + return RS_RET_SUSPENDED; + default: + STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); + return RS_RET_OK; + } +} + +BEGINdoAction +CODESTARTdoAction + CHKiRet(curlPost(pData, ppString[0])); +finalize_it: +ENDdoAction + +/* elasticsearch POST result string ... useful for debugging */ +size_t +curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) +{ + unsigned int i; + char *p = (char *)ptr; + char *jsonData = (char *)userdata; + static char ok[] = "{\"ok\":true,"; + + ASSERT(size == 1); + + if (size == 1 && + nmemb > sizeof(ok)-1 && + strncmp(p, ok, sizeof(ok)-1) == 0) { + STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); + } else { + STATSCOUNTER_INC(indexFailed, mutIndexFailed); + if (Debug) { + DBGPRINTF("omelasticsearch request: %s\n", jsonData); + DBGPRINTF("omelasticsearch result: "); + for (i = 0; i < nmemb; i++) + DBGPRINTF("%c", p[i]); + DBGPRINTF("\n"); + } + } + return size * nmemb; +} + +static rsRetVal +curlSetup(instanceData *instance) +{ + char restURL[2048]; /* libcurl makes a copy, using the stack here is OK */ + HEADER *header; + CURL *handle; + + handle = curl_easy_init(); + if (handle == NULL) { + return RS_RET_OBJ_CREATION_FAILED; + } + + snprintf(restURL, sizeof(restURL)-1, "http://%s:%d/%s/%s", + hostName, restPort, searchIndex, searchType); + header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); + curl_easy_setopt(handle, CURLOPT_URL, restURL); + curl_easy_setopt(handle, CURLOPT_POST, 1); + + instance->curlHandle = handle; + instance->postHeader = header; + + DBGPRINTF("omelasticsearch setup, using REST URL: %s\n", restURL); + return RS_RET_OK; +} + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(strncmp((char*) p, ":omelasticsearch:", sizeof(":omelasticsearch:") - 1)) { + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); + } + p += sizeof(":omelasticsearch:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + + /* check if a non-standard template is to be applied */ + if(*(p-1) == ';') + --p; + CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) " StdJSONFmt")); + + /* all good, we can now initialise our private data */ + CHKiRet(curlSetup(pData)); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + +BEGINmodExit +CODESTARTmodExit + curl_global_cleanup(); + statsobj.Destruct(&indexStats); + objRelease(errmsg, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); +ENDmodExit + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + DEFiRet; + restPort = 9200; + hostName = "localhost"; + searchIndex = "system"; + searchType = "events"; + RETiRet; +} + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + + /* register config file handlers */ + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchindex", 0, eCmdHdlrGetWord, NULL, &searchIndex, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchtype", 0, eCmdHdlrGetWord, NULL, &searchType, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchhost", 0, eCmdHdlrGetWord, NULL, &hostName, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"elasticsearchport", 0, eCmdHdlrInt, NULL, &restPort, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); + + if (curl_global_init(CURL_GLOBAL_ALL) != 0) { + errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled"); + ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED); + } + + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&indexStats)); + CHKiRet(statsobj.SetName(indexStats, (uchar *)"elasticsearch")); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"connfail", + ctrType_IntCtr, &indexConFail)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submits", + ctrType_IntCtr, &indexSubmit)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed", + ctrType_IntCtr, &indexFailed)); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"success", + ctrType_IntCtr, &indexSuccess)); + CHKiRet(statsobj.ConstructFinalize(indexStats)); +ENDmodInit + +/* vi:set ai: + */ |