diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2012-08-20 12:58:48 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2012-08-20 12:58:48 +0200 |
commit | 0f52727043736b1e411bc60309ed18d41a4f1234 (patch) | |
tree | 97fd6c1defe93abc97717c4ae3d598166f7ad402 /tcpsrv.c | |
parent | 7c1ad1c77cc3884931bd2c9e2d2f45db892a15a0 (diff) | |
parent | 4a24d8e1b63a27d8c20f2d3d614fcdc2bcb52a01 (diff) | |
download | rsyslog-0f52727043736b1e411bc60309ed18d41a4f1234.tar.gz rsyslog-0f52727043736b1e411bc60309ed18d41a4f1234.tar.xz rsyslog-0f52727043736b1e411bc60309ed18d41a4f1234.zip |
Merge branch 'beta' into v6-stable
Diffstat (limited to 'tcpsrv.c')
-rw-r--r-- | tcpsrv.c | 112 |
1 files changed, 90 insertions, 22 deletions
@@ -39,8 +39,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #include "config.h" +#include <stdio.h> #include <stdlib.h> #include <assert.h> #include <string.h> @@ -95,7 +95,9 @@ DEFobjCurrIf(netstrm) DEFobjCurrIf(nssel) DEFobjCurrIf(nspoll) DEFobjCurrIf(prop) +DEFobjCurrIf(statsobj) +static void startWorkerPool(void); /* The following structure controls the worker threads. Global data is * needed for their access. @@ -107,8 +109,10 @@ static struct wrkrInfo_s { tcpsrv_t *pSrv; /* pSrv == NULL -> idle */ nspoll_t *pPoll; void *pUsr; + sbool enabled; long long unsigned numCalled; /* how often was this called */ } wrkrInfo[4]; +static sbool bWrkrRunning; /* are the worker threads running? */ static pthread_mutex_t wrkrMut; static pthread_cond_t wrkrIdle; static int wrkrMax = 4; @@ -118,9 +122,10 @@ static int wrkrRunning; * rgerhards, 2009-05-21 */ static inline rsRetVal -addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort) +addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort, int bSuppOctetFram) { tcpLstnPortList_t *pEntry; + uchar statname[64]; DEFiRet; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -130,6 +135,7 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort) pEntry->pszPort = pszPort; pEntry->pSrv = pThis; pEntry->pRuleset = pThis->pRuleset; + pEntry->bSuppOctetFram = bSuppOctetFram; /* we need to create a property */ CHKiRet(prop.Construct(&pEntry->pInputName)); @@ -140,6 +146,16 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort) pEntry->pNext = pThis->pLstnPorts; pThis->pLstnPorts = pEntry; + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&(pEntry->stats))); + snprintf((char*)statname, sizeof(statname), "%s(%s)", pThis->pszInputName, pszPort); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(pEntry->stats, statname)); + STATSCOUNTER_INIT(pEntry->ctrSubmit, pEntry->mutCtrSubmit); + CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &(pEntry->ctrSubmit))); + CHKiRet(statsobj.ConstructFinalize(pEntry->stats)); + finalize_it: RETiRet; } @@ -150,7 +166,7 @@ finalize_it: * rgerhards, 2008-03-20 */ static rsRetVal -configureTCPListen(tcpsrv_t *pThis, uchar *pszPort) +configureTCPListen(tcpsrv_t *pThis, uchar *pszPort, int bSuppOctetFram) { int i; uchar *pPort = pszPort; @@ -166,7 +182,7 @@ configureTCPListen(tcpsrv_t *pThis, uchar *pszPort) } if(i >= 0 && i <= 65535) { - CHKiRet(addNewLstnPort(pThis, pszPort)); + CHKiRet(addNewLstnPort(pThis, pszPort, bSuppOctetFram)); } else { errmsg.LogError(0, NO_ERRCODE, "Invalid TCP listen port %s - ignored.\n", pszPort); } @@ -418,6 +434,10 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess, ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED); } + if(pThis->bUseKeepAlive) { + CHKiRet(netstrm.EnableKeepAlive(pNewStrm)); + } + /* we found a free spot and can construct our session object */ CHKiRet(tcps_sess.Construct(&pSess)); CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis)); @@ -533,9 +553,7 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n", (*ppSess)->pStrm, pszPeer); } - //pthread_mutex_lock(&mut); CHKiRet(closeSess(pThis, ppSess, pPoll)); - //pthread_mutex_unlock(&mut); break; case RS_RET_RETRY: /* we simply ignore retry - this is not an error, but we also have not received anything */ @@ -569,16 +587,18 @@ finalize_it: static inline rsRetVal processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) { - tcps_sess_t *pNewSess; + tcps_sess_t *pNewSess = NULL; DEFiRet; - dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr); + DBGPRINTF("tcpsrv: processing item %d, pUsr %p, bAbortConn\n", idx, pUsr); if(pUsr == pThis->ppLstn) { DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); if(iRet == RS_RET_OK) { - if(pPoll != NULL) + if(pPoll != NULL) { + dbgprintf("XXXXXX: processWorksetItem trying nspoll.ctl\n"); CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + } DBGPRINTF("New session created with NSD %p.\n", pNewSess); } else { DBGPRINTF("tcpsrv: error %d during accept\n", iRet); @@ -608,8 +628,10 @@ wrkr(void *myself) while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) { pthread_cond_wait(&me->run, &wrkrMut); } - if(glbl.GetGlobalInputTermState() == 1) + if(glbl.GetGlobalInputTermState() == 1) { + --wrkrRunning; break; + } pthread_mutex_unlock(&wrkrMut); ++me->numCalled; @@ -620,6 +642,7 @@ wrkr(void *myself) --wrkrRunning; pthread_cond_signal(&wrkrIdle); } + me->enabled = 0; /* indicate we are no longer available */ pthread_mutex_unlock(&wrkrMut); return NULL; @@ -649,7 +672,7 @@ processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t } else { pthread_mutex_lock(&wrkrMut); /* check if there is a free worker */ - for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i) + for(i = 0 ; (i < wrkrMax) && ((wrkrInfo[i].pSrv != NULL) || (wrkrInfo[i].enabled == 0)) ; ++i) /*do search*/; if(i < wrkrMax) { /* worker free -> use it! */ @@ -819,6 +842,16 @@ Run(tcpsrv_t *pThis) ISOBJ_TYPE_assert(pThis, tcpsrv); + /* check if we need to start the worker pool. Once it is running, all is + * well. Shutdown is done on modExit. + */ + d_pthread_mutex_lock(&wrkrMut); + if(!bWrkrRunning) { + bWrkrRunning = 1; + startWorkerPool(); + } + d_pthread_mutex_unlock(&wrkrMut); + /* this is an endless loop - it is terminated by the framework canelling * this thread. Thus, we also need to instantiate a cancel cleanup handler * to prevent us from leaking anything. -- rgerhards, 20080-04-24 @@ -1028,6 +1061,15 @@ SetUsrP(tcpsrv_t *pThis, void *pUsr) } static rsRetVal +SetKeepAlive(tcpsrv_t *pThis, int iVal) +{ + DEFiRet; + dbgprintf("tcpsrv: keep-alive set to %d\n", iVal); + pThis->bUseKeepAlive = iVal; + RETiRet; +} + +static rsRetVal SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int)) { DEFiRet; @@ -1206,6 +1248,7 @@ CODESTARTobjQueryInterface(tcpsrv) pIf->create_tcp_socket = create_tcp_socket; pIf->Run = Run; + pIf->SetKeepAlive = SetKeepAlive; pIf->SetUsrP = SetUsrP; pIf->SetInputName = SetInputName; pIf->SetAddtlFrameDelim = SetAddtlFrameDelim; @@ -1243,6 +1286,7 @@ CODESTARTObjClassExit(tcpsrv) objRelease(tcps_sess, DONT_LOAD_LIB); objRelease(conf, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); objRelease(ruleset, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); objRelease(errmsg, CORE_COMPONENT); @@ -1269,6 +1313,7 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE CHKiRet(objUse(conf, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); /* set our own handlers */ @@ -1277,28 +1322,42 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE ENDObjClassInit(tcpsrv) -/* destroy worker pool structures and wait for workers to terminate +/* start worker threads + * Important: if we fork, this MUST be done AFTER forking */ -static inline void +static void startWorkerPool(void) { int i; + int r; + pthread_attr_t sessThrdAttr; + wrkrRunning = 0; - pthread_mutex_init(&wrkrMut, NULL); pthread_cond_init(&wrkrIdle, NULL); + pthread_attr_init(&sessThrdAttr); + pthread_attr_setstacksize(&sessThrdAttr, 4096*1024); for(i = 0 ; i < wrkrMax ; ++i) { /* init worker info structure! */ pthread_cond_init(&wrkrInfo[i].run, NULL); wrkrInfo[i].pSrv = NULL; wrkrInfo[i].numCalled = 0; - pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i])); + r = pthread_create(&wrkrInfo[i].tid, &sessThrdAttr, wrkr, &(wrkrInfo[i])); + if(r == 0) { + wrkrInfo[i].enabled = 1; + } else { + char errStr[1024]; + wrkrInfo[i].enabled = 0; + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, NO_ERRCODE, "tcpsrv error creating thread %d: " + "%s", i, errStr); + } } - + pthread_attr_destroy(&sessThrdAttr); } /* destroy worker pool structures and wait for workers to terminate */ -static inline void +static void stopWorkerPool(void) { int i; @@ -1318,9 +1377,7 @@ stopWorkerPool(void) BEGINmodExit CODESTARTmodExit -dbgprintf("tcpsrv: modExit\n"); stopWorkerPool(); - /* de-init in reverse order! */ tcpsrvClassExit(); tcps_sessClassExit(); @@ -1336,13 +1393,24 @@ ENDqueryEtryPt BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ + /* we just init the worker mutex, but do not start the workers themselves. This is deferred + * to the first call of Run(). Reasons for this: + * 1. depending on load order, tcpsrv gets loaded during rsyslog startup BEFORE + * it forks, in which case the workers would be running in the then-killed parent, + * leading to a defuncnt child (we actually had this bug). + * 2. depending on circumstances, Run() would possibly never be called, in which case + * the worker threads would be totally useless. + * Note that in order to guarantee a non-racy worker start, we need to guard the + * startup sequence by a mutex, which is why we init it here (no problem with fork() + * in this case as the mutex is a pure-memory structure). + * rgerhards, 2012-05-18 + */ + pthread_mutex_init(&wrkrMut, NULL); + bWrkrRunning = 0; /* Initialize all classes that are in our module - this includes ourselfs */ CHKiRet(tcps_sessClassInit(pModInfo)); CHKiRet(tcpsrvClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */ - - startWorkerPool(); - ENDmodInit /* vim:set ai: |