diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2012-01-18 13:48:39 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2012-01-18 13:48:39 +0100 |
commit | ef34821a2737799f48c3032b9616418e4f7fa34f (patch) | |
tree | 5b6bbd23c028cd77724d920f67ab0b3f1ab3a326 /plugins | |
parent | 37e4aab1ecc63db2b5e72fd8a5df5e4976ae82c8 (diff) | |
parent | b41fc354ede3baec9740cb7e6a2f609824269836 (diff) | |
download | rsyslog-ef34821a2737799f48c3032b9616418e4f7fa34f.tar.gz rsyslog-ef34821a2737799f48c3032b9616418e4f7fa34f.tar.xz rsyslog-ef34821a2737799f48c3032b9616418e4f7fa34f.zip |
Merge branch 'v5-stable-newstats' into v5-devel
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imptcp/imptcp.c | 56 |
1 files changed, 41 insertions, 15 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index d5855879..a735b0b2 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -66,6 +66,7 @@ #include "datetime.h" #include "ruleset.h" #include "msg.h" +#include "statsobj.h" #include "net.h" /* for permittedPeers, may be removed when this is removed */ /* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */ @@ -83,7 +84,7 @@ DEFobjCurrIf(prop) DEFobjCurrIf(datetime) DEFobjCurrIf(errmsg) DEFobjCurrIf(ruleset) - +DEFobjCurrIf(statsobj) /* config settings */ @@ -132,7 +133,8 @@ struct ptcpsrv_s { * includes support for doubly-linked list. */ struct ptcpsess_s { - ptcpsrv_t *pSrv; /* our server */ +// ptcpsrv_t *pSrv; /* our server TODO: check remove! */ + ptcplstn_t *pLstn; /* our listener */ ptcpsess_t *prev, *next; int sock; epolld_t *epd; @@ -160,6 +162,8 @@ struct ptcplstn_s { ptcplstn_t *prev, *next; int sock; epolld_t *epd; + statsobj_t *stats; /* listener stats */ + STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) }; @@ -197,7 +201,7 @@ static char rcvBuf[128*1024]; /* forward definitions */ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal); -static rsRetVal addLstn(ptcpsrv_t *pSrv, int sock); +static rsRetVal addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6); /* some simple constructors/destructors */ @@ -242,6 +246,7 @@ startupSrv(ptcpsrv_t *pSrv) int sockflags; struct addrinfo hints, *res = NULL, *r; uchar *lstnIP; + int isIPv6 = 0; lstnIP = pSrv->lstnIP == NULL ? UCHAR_CONSTANT("") : pSrv->lstnIP; @@ -274,8 +279,9 @@ startupSrv(ptcpsrv_t *pSrv) continue; } -#ifdef IPV6_V6ONLY if(r->ai_family == AF_INET6) { + isIPv6 = 1; +#ifdef IPV6_V6ONLY int iOn = 1; if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&iOn, sizeof (iOn)) < 0) { @@ -283,8 +289,8 @@ startupSrv(ptcpsrv_t *pSrv) sock = -1; continue; } - } #endif + } if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)) < 0 ) { DBGPRINTF("error %d setting tcp socket option\n", errno); close(sock); @@ -346,7 +352,7 @@ startupSrv(ptcpsrv_t *pSrv) /* if we reach this point, we were able to obtain a valid socket, so we can * create our listener object. -- rgerhards, 2010-08-10 */ - CHKiRet(addLstn(pSrv, sock)); + CHKiRet(addLstn(pSrv, sock, isIPv6)); ++numSocks; } @@ -573,22 +579,25 @@ static rsRetVal doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub) { msg_t *pMsg; + ptcpsrv_t *pSrv; DEFiRet; if(pThis->iMsg == 0) { DBGPRINTF("discarding zero-sized message\n"); FINALIZE; } + pSrv = pThis->pLstn->pSrv; /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pThis->pMsg, pThis->iMsg); - MsgSetInputName(pMsg, pThis->pSrv->pInputName); + MsgSetInputName(pMsg, pSrv->pInputName); MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; MsgSetRcvFrom(pMsg, pThis->peerName); CHKiRet(MsgSetRcvFromIP(pMsg, pThis->peerIP)); - MsgSetRuleset(pMsg, pThis->pSrv->pRuleset); + MsgSetRuleset(pMsg, pSrv->pRuleset); + STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit); if(pMultiSub == NULL) { CHKiRet(submitMsg(pMsg)); @@ -670,7 +679,8 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG } if(( (c == '\n') - || ((pThis->pSrv->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->iAddtlFrameDelim)) + || ((pThis->pLstn->pSrv->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) + && (c == pThis->pLstn->pSrv->iAddtlFrameDelim)) ) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */ doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub); pThis->inputState = eAtStrtFram; @@ -828,14 +838,25 @@ finalize_it: /* add a listener to the server */ static rsRetVal -addLstn(ptcpsrv_t *pSrv, int sock) +addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6) { DEFiRet; ptcplstn_t *pLstn; + uchar statname[64]; CHKmalloc(pLstn = malloc(sizeof(ptcplstn_t))); pLstn->pSrv = pSrv; pLstn->sock = sock; + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&(pLstn->stats))); + snprintf((char*)statname, sizeof(statname), "imptcp(%s/%s/%s)", + (pSrv->lstnIP == NULL) ? "*" : (char*)pSrv->lstnIP, pSrv->port, + isIPv6 ? "IPv6" : "IPv4"); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(pLstn->stats, statname)); + CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &(pLstn->ctrSubmit))); + CHKiRet(statsobj.ConstructFinalize(pLstn->stats)); /* add to start of server's listener list */ pLstn->prev = NULL; @@ -854,14 +875,15 @@ finalize_it: /* add a session to the server */ static rsRetVal -addSess(ptcpsrv_t *pSrv, int sock, prop_t *peerName, prop_t *peerIP) +addSess(ptcplstn_t *pLstn, int sock, prop_t *peerName, prop_t *peerIP) { DEFiRet; ptcpsess_t *pSess = NULL; + ptcpsrv_t *pSrv = pLstn->pSrv; CHKmalloc(pSess = malloc(sizeof(ptcpsess_t))); CHKmalloc(pSess->pMsg = malloc(iMaxLine * sizeof(uchar))); - pSess->pSrv = pSrv; + pSess->pLstn = pLstn; pSess->sock = sock; pSess->inputState = eAtStrtFram; pSess->iMsg = 0; @@ -905,7 +927,7 @@ closeSess(ptcpsess_t *pSess) pSess->next->prev = pSess->prev; if(pSess->prev == NULL) { /* need to update root! */ - pSess->pSrv->pSess = pSess->next; + pSess->pLstn->pSrv->pSess = pSess->next; } else { pSess->prev->next = pSess->next; } @@ -1034,7 +1056,7 @@ lstnActivity(ptcplstn_t *pLstn) if(localRet == RS_RET_NO_MORE_DATA) break; CHKiRet(localRet); - CHKiRet(addSess(pLstn->pSrv, newSock, peerName, peerIP)); + CHKiRet(addSess(pLstn, newSock, peerName, peerIP)); } finalize_it: @@ -1064,7 +1086,7 @@ sessActivity(ptcpsess_t *pSess) CHKiRet(DataRcvd(pSess, rcvBuf, lenRcv)); } else if (lenRcv == 0) { /* session was closed, do clean-up */ - if(pSess->pSrv->bEmitMsgOnClose) { + if(pSess->pLstn->pSrv->bEmitMsgOnClose) { uchar *peerName; int lenPeer; prop.GetString(pSess->peerName, &peerName, &lenPeer); @@ -1170,6 +1192,8 @@ shutdownSrv(ptcpsrv_t *pSrv) pLstn = pSrv->pLstn; while(pLstn != NULL) { close(pLstn->sock); + statsobj.Destruct(&(pLstn->stats)); + /* now unlink listner */ lstnDel = pLstn; pLstn = pLstn->next; DBGPRINTF("imptcp shutdown listen socket %d\n", lstnDel->sock); @@ -1217,6 +1241,7 @@ CODESTARTmodExit /* release objects we used */ objRelease(glbl, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); objRelease(net, LM_NET_FILENAME); objRelease(datetime, CORE_COMPONENT); @@ -1256,6 +1281,7 @@ CODEmodInit_QueryRegCFSLineHdlr initConfigSettings(); /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); |