From 298c3fa835ed5ad241246fe90c6039fc7781625c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 31 Jan 2011 09:14:06 +0100 Subject: cleanup of imptcp; doc added; config statement for workers added --- ChangeLog | 2 ++ doc/imptcp.html | 5 +++++ plugins/imptcp/imptcp.c | 40 +++++++++++++++++----------------------- 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/ChangeLog b/ChangeLog index 3c0312c3..46cc2d58 100644 --- a/ChangeLog +++ b/ChangeLog @@ -7,6 +7,8 @@ Version 6.1.3 [DEVEL] (rgerhards), 2010-12-?? Now 128, what should result in performance improvement (less API calls) on busy systems. Most importantly affects imtcp. - imptcp now supports non-cancel termination mode, a plus in stability +- imptcp speedup: multiple worker threads can now be used to read data +- new directive $InputIMPTcpHelperThreads added - bugfix: fixed build problems on some platforms namely those that have 32bit atomic operations but not 64 bit ones - bugfix: local hostname was pulled too-early, so that some config diff --git a/doc/imptcp.html b/doc/imptcp.html index d4228185..c7a0e599 100644 --- a/doc/imptcp.html +++ b/doc/imptcp.html @@ -53,6 +53,11 @@ name is not strictly necessary, but can be useful to apply filtering based on wh the message was received from.
  • $InputPTCPServerBindRuleset <name>
    Binds specified ruleset to next server defined. +
  • $InputPTCPHelperThreads <number>
    +Number of helper worker threads to process incoming messages. These +threads are utilized to pull data off the network. On a busy system, additional +helper threads (but not more than there are CPUs/Cores) can help improving +performance. The default value is two.
  • $InputPTCPServerListenIP <name>
    On multi-homed machines, specifies to which local address the next listerner should be bound. diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 6e3a67ab..cb7e7ab8 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -93,6 +93,7 @@ typedef struct configSettings_s { uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ uchar *lstnIP; /* which IP we should listen on? */ ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + int wrkrMax; /* max number of workers (actually "helper workers") */ } configSettings_t; static configSettings_t cs; @@ -166,7 +167,6 @@ static struct wrkrInfo_s { } wrkrInfo[4]; static pthread_mutex_t wrkrMut; static pthread_cond_t wrkrIdle; -static int wrkrMax = 4; static int wrkrRunning; @@ -187,7 +187,7 @@ struct epolld_s { /* global data */ -//static permittedPeers_t *pPermPeersRoot = NULL; +pthread_attr_t wrkrThrdAttr; /* Attribute for session threads; read only after startup */ static ptcpsrv_t *pSrvRoot = NULL; static int epollfd = -1; /* (sole) descriptor for epoll */ static int iMaxLine; /* maximum size of a single message */ @@ -684,6 +684,7 @@ static inline void initConfigSettings(void) { cs.bEmitMsgOnClose = 0; + cs.wrkrMax = 2; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; cs.pszInputName = NULL; cs.pRuleset = NULL; @@ -823,9 +824,6 @@ closeSess(ptcpsess_t *pSess) close(sock); /* finally unlink session from structures */ -//fprintf(stderr, "closing session %d next %p, prev %p\n", pSess->sock, pSess->next, pSess->prev); -//DBGPRINTF("imptcp: pSess->next %p\n", pSess->next); -//DBGPRINTF("imptcp: pSess->prev %p\n", pSess->prev); if(pSess->next != NULL) pSess->next->prev = pSess->prev; if(pSess->prev == NULL) { @@ -911,12 +909,12 @@ startWorkerPool(void) wrkrRunning = 0; pthread_mutex_init(&wrkrMut, NULL); pthread_cond_init(&wrkrIdle, NULL); - for(i = 0 ; i < wrkrMax ; ++i) { + for(i = 0 ; i < cs.wrkrMax ; ++i) { /* init worker info structure! */ pthread_cond_init(&wrkrInfo[i].run, NULL); wrkrInfo[i].event = NULL; wrkrInfo[i].numCalled = 0; - pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i])); + pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i])); } } @@ -927,11 +925,10 @@ static inline void stopWorkerPool(void) { int i; - for(i = 0 ; i < wrkrMax ; ++i) { + for(i = 0 ; i < cs.wrkrMax ; ++i) { pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ pthread_join(wrkrInfo[i].tid, NULL); DBGPRINTF("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); - printf("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); pthread_cond_destroy(&wrkrInfo[i].run); } pthread_cond_destroy(&wrkrIdle); @@ -952,7 +949,7 @@ startupServers() pSrv = pSrvRoot; while(pSrv != NULL) { - DBGPRINTF("Starting up ptcp server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName); + DBGPRINTF("imptcp: starting up server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName); startupSrv(pSrv); pSrv = pSrv->pNext; } @@ -1070,26 +1067,20 @@ processWorkSet(int nEvents, struct epoll_event events[]) remainEvents = nEvents; for(iEvt = 0 ; (iEvt < nEvents) && (glbl.GetGlobalInputTermState() == 0) ; ++iEvt) { -dbgprintf("XXX: remain entries during processing %d\n", remainEvents); if(remainEvents == 1) { -dbgprintf("XXX: processWorkset 1\n"); /* process self, save context switch */ processWorkItem(events+iEvt); } else { -dbgprintf("XXX: processWorkset 2\n"); pthread_mutex_lock(&wrkrMut); /* check if there is a free worker */ - for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].event != NULL) ; ++i) -dbgprintf("XXX: wrkrinfo[%d].event %p\n", i, wrkrInfo[i].event); + for(i = 0 ; (i < cs.wrkrMax) && (wrkrInfo[i].event != NULL) ; ++i) /*do search*/; - if(i < wrkrMax) { -dbgprintf("XXX: processWorkset 2.1, event=%p, wrkrRunnig %d, max %d\n", events+iEvt, wrkrRunning, wrkrMax); + if(i < cs.wrkrMax) { /* worker free -> use it! */ wrkrInfo[i].event = events+iEvt; pthread_cond_signal(&wrkrInfo[i].run); pthread_mutex_unlock(&wrkrMut); } else { -dbgprintf("XXX: processWorkset 2.2\n"); pthread_mutex_unlock(&wrkrMut); /* no free worker, so we process this one ourselfs */ processWorkItem(events+iEvt); @@ -1104,11 +1095,8 @@ dbgprintf("XXX: processWorkset 2.2\n"); * by workers running during the epoll call. */ pthread_mutex_lock(&wrkrMut); -dbgprintf("XXX: processWorkset: waiting for all workers to idle, curr = %d\n", wrkrRunning); while(wrkrRunning > 0) { -dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, curr = %d\n", wrkrRunning); pthread_cond_wait(&wrkrIdle, &wrkrMut); -dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, after wait, curr = %d\n", wrkrRunning); } pthread_mutex_unlock(&wrkrMut); } @@ -1133,7 +1121,6 @@ wrkr(void *myself) ++wrkrRunning; pthread_mutex_unlock(&wrkrMut); -dbgprintf("XXX: worker %p activated\n", pthread_self()); ++me->numCalled; processWorkItem(me->event); @@ -1141,7 +1128,6 @@ dbgprintf("XXX: worker %p activated\n", pthread_self()); me->event = NULL; /* indicate we are free again */ --wrkrRunning; pthread_cond_signal(&wrkrIdle); -dbgprintf("XXX: worker %p idling, currently running %d\n", pthread_self(), wrkrRunning); } pthread_mutex_unlock(&wrkrMut); @@ -1254,6 +1240,7 @@ ENDafterRun BEGINmodExit CODESTARTmodExit + pthread_attr_destroy(&wrkrThrdAttr); /* release objects we used */ objRelease(glbl, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); @@ -1268,6 +1255,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { cs.bEmitMsgOnClose = 0; + cs.wrkrMax = 2; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; free(cs.pszInputName); cs.pszInputName = NULL; @@ -1304,6 +1292,10 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); + /* initialize "read-only" thread attributes */ + pthread_attr_init(&wrkrThrdAttr); + pthread_attr_setstacksize(&wrkrThrdAttr, 2048*1024); + /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord, addTCPListener, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); @@ -1311,6 +1303,8 @@ CODEmodInit_QueryRegCFSLineHdlr eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt, NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt, + NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0, eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0, -- cgit