summaryrefslogtreecommitdiffstats
path: root/tcpsrv.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2012-08-20 12:58:48 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2012-08-20 12:58:48 +0200
commit0f52727043736b1e411bc60309ed18d41a4f1234 (patch)
tree97fd6c1defe93abc97717c4ae3d598166f7ad402 /tcpsrv.c
parent7c1ad1c77cc3884931bd2c9e2d2f45db892a15a0 (diff)
parent4a24d8e1b63a27d8c20f2d3d614fcdc2bcb52a01 (diff)
downloadrsyslog-0f52727043736b1e411bc60309ed18d41a4f1234.tar.gz
rsyslog-0f52727043736b1e411bc60309ed18d41a4f1234.tar.xz
rsyslog-0f52727043736b1e411bc60309ed18d41a4f1234.zip
Merge branch 'beta' into v6-stable
Diffstat (limited to 'tcpsrv.c')
-rw-r--r--tcpsrv.c112
1 files changed, 90 insertions, 22 deletions
diff --git a/tcpsrv.c b/tcpsrv.c
index 5d68dc3c..e5fe0d71 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -39,8 +39,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#include "config.h"
+#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
@@ -95,7 +95,9 @@ DEFobjCurrIf(netstrm)
DEFobjCurrIf(nssel)
DEFobjCurrIf(nspoll)
DEFobjCurrIf(prop)
+DEFobjCurrIf(statsobj)
+static void startWorkerPool(void);
/* The following structure controls the worker threads. Global data is
* needed for their access.
@@ -107,8 +109,10 @@ static struct wrkrInfo_s {
tcpsrv_t *pSrv; /* pSrv == NULL -> idle */
nspoll_t *pPoll;
void *pUsr;
+ sbool enabled;
long long unsigned numCalled; /* how often was this called */
} wrkrInfo[4];
+static sbool bWrkrRunning; /* are the worker threads running? */
static pthread_mutex_t wrkrMut;
static pthread_cond_t wrkrIdle;
static int wrkrMax = 4;
@@ -118,9 +122,10 @@ static int wrkrRunning;
* rgerhards, 2009-05-21
*/
static inline rsRetVal
-addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort)
+addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort, int bSuppOctetFram)
{
tcpLstnPortList_t *pEntry;
+ uchar statname[64];
DEFiRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
@@ -130,6 +135,7 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort)
pEntry->pszPort = pszPort;
pEntry->pSrv = pThis;
pEntry->pRuleset = pThis->pRuleset;
+ pEntry->bSuppOctetFram = bSuppOctetFram;
/* we need to create a property */
CHKiRet(prop.Construct(&pEntry->pInputName));
@@ -140,6 +146,16 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort)
pEntry->pNext = pThis->pLstnPorts;
pThis->pLstnPorts = pEntry;
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&(pEntry->stats)));
+ snprintf((char*)statname, sizeof(statname), "%s(%s)", pThis->pszInputName, pszPort);
+ statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(statsobj.SetName(pEntry->stats, statname));
+ STATSCOUNTER_INIT(pEntry->ctrSubmit, pEntry->mutCtrSubmit);
+ CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, &(pEntry->ctrSubmit)));
+ CHKiRet(statsobj.ConstructFinalize(pEntry->stats));
+
finalize_it:
RETiRet;
}
@@ -150,7 +166,7 @@ finalize_it:
* rgerhards, 2008-03-20
*/
static rsRetVal
-configureTCPListen(tcpsrv_t *pThis, uchar *pszPort)
+configureTCPListen(tcpsrv_t *pThis, uchar *pszPort, int bSuppOctetFram)
{
int i;
uchar *pPort = pszPort;
@@ -166,7 +182,7 @@ configureTCPListen(tcpsrv_t *pThis, uchar *pszPort)
}
if(i >= 0 && i <= 65535) {
- CHKiRet(addNewLstnPort(pThis, pszPort));
+ CHKiRet(addNewLstnPort(pThis, pszPort, bSuppOctetFram));
} else {
errmsg.LogError(0, NO_ERRCODE, "Invalid TCP listen port %s - ignored.\n", pszPort);
}
@@ -418,6 +434,10 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess,
ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED);
}
+ if(pThis->bUseKeepAlive) {
+ CHKiRet(netstrm.EnableKeepAlive(pNewStrm));
+ }
+
/* we found a free spot and can construct our session object */
CHKiRet(tcps_sess.Construct(&pSess));
CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis));
@@ -533,9 +553,7 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n",
(*ppSess)->pStrm, pszPeer);
}
- //pthread_mutex_lock(&mut);
CHKiRet(closeSess(pThis, ppSess, pPoll));
- //pthread_mutex_unlock(&mut);
break;
case RS_RET_RETRY:
/* we simply ignore retry - this is not an error, but we also have not received anything */
@@ -569,16 +587,18 @@ finalize_it:
static inline rsRetVal
processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr)
{
- tcps_sess_t *pNewSess;
+ tcps_sess_t *pNewSess = NULL;
DEFiRet;
- dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr);
+ DBGPRINTF("tcpsrv: processing item %d, pUsr %p, bAbortConn\n", idx, pUsr);
if(pUsr == pThis->ppLstn) {
DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]);
iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]);
if(iRet == RS_RET_OK) {
- if(pPoll != NULL)
+ if(pPoll != NULL) {
+ dbgprintf("XXXXXX: processWorksetItem trying nspoll.ctl\n");
CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
+ }
DBGPRINTF("New session created with NSD %p.\n", pNewSess);
} else {
DBGPRINTF("tcpsrv: error %d during accept\n", iRet);
@@ -608,8 +628,10 @@ wrkr(void *myself)
while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) {
pthread_cond_wait(&me->run, &wrkrMut);
}
- if(glbl.GetGlobalInputTermState() == 1)
+ if(glbl.GetGlobalInputTermState() == 1) {
+ --wrkrRunning;
break;
+ }
pthread_mutex_unlock(&wrkrMut);
++me->numCalled;
@@ -620,6 +642,7 @@ wrkr(void *myself)
--wrkrRunning;
pthread_cond_signal(&wrkrIdle);
}
+ me->enabled = 0; /* indicate we are no longer available */
pthread_mutex_unlock(&wrkrMut);
return NULL;
@@ -649,7 +672,7 @@ processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t
} else {
pthread_mutex_lock(&wrkrMut);
/* check if there is a free worker */
- for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i)
+ for(i = 0 ; (i < wrkrMax) && ((wrkrInfo[i].pSrv != NULL) || (wrkrInfo[i].enabled == 0)) ; ++i)
/*do search*/;
if(i < wrkrMax) {
/* worker free -> use it! */
@@ -819,6 +842,16 @@ Run(tcpsrv_t *pThis)
ISOBJ_TYPE_assert(pThis, tcpsrv);
+ /* check if we need to start the worker pool. Once it is running, all is
+ * well. Shutdown is done on modExit.
+ */
+ d_pthread_mutex_lock(&wrkrMut);
+ if(!bWrkrRunning) {
+ bWrkrRunning = 1;
+ startWorkerPool();
+ }
+ d_pthread_mutex_unlock(&wrkrMut);
+
/* this is an endless loop - it is terminated by the framework canelling
* this thread. Thus, we also need to instantiate a cancel cleanup handler
* to prevent us from leaking anything. -- rgerhards, 20080-04-24
@@ -1028,6 +1061,15 @@ SetUsrP(tcpsrv_t *pThis, void *pUsr)
}
static rsRetVal
+SetKeepAlive(tcpsrv_t *pThis, int iVal)
+{
+ DEFiRet;
+ dbgprintf("tcpsrv: keep-alive set to %d\n", iVal);
+ pThis->bUseKeepAlive = iVal;
+ RETiRet;
+}
+
+static rsRetVal
SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int))
{
DEFiRet;
@@ -1206,6 +1248,7 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->create_tcp_socket = create_tcp_socket;
pIf->Run = Run;
+ pIf->SetKeepAlive = SetKeepAlive;
pIf->SetUsrP = SetUsrP;
pIf->SetInputName = SetInputName;
pIf->SetAddtlFrameDelim = SetAddtlFrameDelim;
@@ -1243,6 +1286,7 @@ CODESTARTObjClassExit(tcpsrv)
objRelease(tcps_sess, DONT_LOAD_LIB);
objRelease(conf, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
objRelease(ruleset, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
@@ -1269,6 +1313,7 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
CHKiRet(objUse(conf, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
/* set our own handlers */
@@ -1277,28 +1322,42 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
ENDObjClassInit(tcpsrv)
-/* destroy worker pool structures and wait for workers to terminate
+/* start worker threads
+ * Important: if we fork, this MUST be done AFTER forking
*/
-static inline void
+static void
startWorkerPool(void)
{
int i;
+ int r;
+ pthread_attr_t sessThrdAttr;
+
wrkrRunning = 0;
- pthread_mutex_init(&wrkrMut, NULL);
pthread_cond_init(&wrkrIdle, NULL);
+ pthread_attr_init(&sessThrdAttr);
+ pthread_attr_setstacksize(&sessThrdAttr, 4096*1024);
for(i = 0 ; i < wrkrMax ; ++i) {
/* init worker info structure! */
pthread_cond_init(&wrkrInfo[i].run, NULL);
wrkrInfo[i].pSrv = NULL;
wrkrInfo[i].numCalled = 0;
- pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i]));
+ r = pthread_create(&wrkrInfo[i].tid, &sessThrdAttr, wrkr, &(wrkrInfo[i]));
+ if(r == 0) {
+ wrkrInfo[i].enabled = 1;
+ } else {
+ char errStr[1024];
+ wrkrInfo[i].enabled = 0;
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(0, NO_ERRCODE, "tcpsrv error creating thread %d: "
+ "%s", i, errStr);
+ }
}
-
+ pthread_attr_destroy(&sessThrdAttr);
}
/* destroy worker pool structures and wait for workers to terminate
*/
-static inline void
+static void
stopWorkerPool(void)
{
int i;
@@ -1318,9 +1377,7 @@ stopWorkerPool(void)
BEGINmodExit
CODESTARTmodExit
-dbgprintf("tcpsrv: modExit\n");
stopWorkerPool();
-
/* de-init in reverse order! */
tcpsrvClassExit();
tcps_sessClassExit();
@@ -1336,13 +1393,24 @@ ENDqueryEtryPt
BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+ /* we just init the worker mutex, but do not start the workers themselves. This is deferred
+ * to the first call of Run(). Reasons for this:
+ * 1. depending on load order, tcpsrv gets loaded during rsyslog startup BEFORE
+ * it forks, in which case the workers would be running in the then-killed parent,
+ * leading to a defuncnt child (we actually had this bug).
+ * 2. depending on circumstances, Run() would possibly never be called, in which case
+ * the worker threads would be totally useless.
+ * Note that in order to guarantee a non-racy worker start, we need to guard the
+ * startup sequence by a mutex, which is why we init it here (no problem with fork()
+ * in this case as the mutex is a pure-memory structure).
+ * rgerhards, 2012-05-18
+ */
+ pthread_mutex_init(&wrkrMut, NULL);
+ bWrkrRunning = 0;
/* Initialize all classes that are in our module - this includes ourselfs */
CHKiRet(tcps_sessClassInit(pModInfo));
CHKiRet(tcpsrvClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */
-
- startWorkerPool();
-
ENDmodInit
/* vim:set ai: