summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2011-01-31 09:14:06 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2011-01-31 09:14:06 +0100
commit298c3fa835ed5ad241246fe90c6039fc7781625c (patch)
tree8b7a0404eb83f6c0ec514b11372cc61820f7173b
parent48b6f191fea317fd2bb031769a574c47accd2923 (diff)
downloadrsyslog-298c3fa835ed5ad241246fe90c6039fc7781625c.tar.gz
rsyslog-298c3fa835ed5ad241246fe90c6039fc7781625c.tar.xz
rsyslog-298c3fa835ed5ad241246fe90c6039fc7781625c.zip
cleanup of imptcp; doc added; config statement for workers added
-rw-r--r--ChangeLog2
-rw-r--r--doc/imptcp.html5
-rw-r--r--plugins/imptcp/imptcp.c40
3 files changed, 24 insertions, 23 deletions
diff --git a/ChangeLog b/ChangeLog
index 3c0312c3..46cc2d58 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -7,6 +7,8 @@ Version 6.1.3 [DEVEL] (rgerhards), 2010-12-??
Now 128, what should result in performance improvement (less API
calls) on busy systems. Most importantly affects imtcp.
- imptcp now supports non-cancel termination mode, a plus in stability
+- imptcp speedup: multiple worker threads can now be used to read data
+- new directive $InputIMPTcpHelperThreads added
- bugfix: fixed build problems on some platforms
namely those that have 32bit atomic operations but not 64 bit ones
- bugfix: local hostname was pulled too-early, so that some config
diff --git a/doc/imptcp.html b/doc/imptcp.html
index d4228185..c7a0e599 100644
--- a/doc/imptcp.html
+++ b/doc/imptcp.html
@@ -53,6 +53,11 @@ name is not strictly necessary, but can be useful to apply filtering based on wh
the message was received from.
<li>$InputPTCPServerBindRuleset &lt;name&gt;<br>
Binds specified ruleset to next server defined.
+<li>$InputPTCPHelperThreads &lt;number&gt;<br>
+Number of helper worker threads to process incoming messages. These
+threads are utilized to pull data off the network. On a busy system, additional
+helper threads (but not more than there are CPUs/Cores) can help improving
+performance. The default value is two.
<li>$InputPTCPServerListenIP &lt;name&gt;<br>
On multi-homed machines, specifies to which local address the next listerner should
be bound.
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 6e3a67ab..cb7e7ab8 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -93,6 +93,7 @@ typedef struct configSettings_s {
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
uchar *lstnIP; /* which IP we should listen on? */
ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ int wrkrMax; /* max number of workers (actually "helper workers") */
} configSettings_t;
static configSettings_t cs;
@@ -166,7 +167,6 @@ static struct wrkrInfo_s {
} wrkrInfo[4];
static pthread_mutex_t wrkrMut;
static pthread_cond_t wrkrIdle;
-static int wrkrMax = 4;
static int wrkrRunning;
@@ -187,7 +187,7 @@ struct epolld_s {
/* global data */
-//static permittedPeers_t *pPermPeersRoot = NULL;
+pthread_attr_t wrkrThrdAttr; /* Attribute for session threads; read only after startup */
static ptcpsrv_t *pSrvRoot = NULL;
static int epollfd = -1; /* (sole) descriptor for epoll */
static int iMaxLine; /* maximum size of a single message */
@@ -684,6 +684,7 @@ static inline void
initConfigSettings(void)
{
cs.bEmitMsgOnClose = 0;
+ cs.wrkrMax = 2;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
cs.pszInputName = NULL;
cs.pRuleset = NULL;
@@ -823,9 +824,6 @@ closeSess(ptcpsess_t *pSess)
close(sock);
/* finally unlink session from structures */
-//fprintf(stderr, "closing session %d next %p, prev %p\n", pSess->sock, pSess->next, pSess->prev);
-//DBGPRINTF("imptcp: pSess->next %p\n", pSess->next);
-//DBGPRINTF("imptcp: pSess->prev %p\n", pSess->prev);
if(pSess->next != NULL)
pSess->next->prev = pSess->prev;
if(pSess->prev == NULL) {
@@ -911,12 +909,12 @@ startWorkerPool(void)
wrkrRunning = 0;
pthread_mutex_init(&wrkrMut, NULL);
pthread_cond_init(&wrkrIdle, NULL);
- for(i = 0 ; i < wrkrMax ; ++i) {
+ for(i = 0 ; i < cs.wrkrMax ; ++i) {
/* init worker info structure! */
pthread_cond_init(&wrkrInfo[i].run, NULL);
wrkrInfo[i].event = NULL;
wrkrInfo[i].numCalled = 0;
- pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i]));
+ pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i]));
}
}
@@ -927,11 +925,10 @@ static inline void
stopWorkerPool(void)
{
int i;
- for(i = 0 ; i < wrkrMax ; ++i) {
+ for(i = 0 ; i < cs.wrkrMax ; ++i) {
pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */
pthread_join(wrkrInfo[i].tid, NULL);
DBGPRINTF("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
- printf("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
pthread_cond_destroy(&wrkrInfo[i].run);
}
pthread_cond_destroy(&wrkrIdle);
@@ -952,7 +949,7 @@ startupServers()
pSrv = pSrvRoot;
while(pSrv != NULL) {
- DBGPRINTF("Starting up ptcp server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName);
+ DBGPRINTF("imptcp: starting up server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName);
startupSrv(pSrv);
pSrv = pSrv->pNext;
}
@@ -1070,26 +1067,20 @@ processWorkSet(int nEvents, struct epoll_event events[])
remainEvents = nEvents;
for(iEvt = 0 ; (iEvt < nEvents) && (glbl.GetGlobalInputTermState() == 0) ; ++iEvt) {
-dbgprintf("XXX: remain entries during processing %d\n", remainEvents);
if(remainEvents == 1) {
-dbgprintf("XXX: processWorkset 1\n");
/* process self, save context switch */
processWorkItem(events+iEvt);
} else {
-dbgprintf("XXX: processWorkset 2\n");
pthread_mutex_lock(&wrkrMut);
/* check if there is a free worker */
- for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].event != NULL) ; ++i)
-dbgprintf("XXX: wrkrinfo[%d].event %p\n", i, wrkrInfo[i].event);
+ for(i = 0 ; (i < cs.wrkrMax) && (wrkrInfo[i].event != NULL) ; ++i)
/*do search*/;
- if(i < wrkrMax) {
-dbgprintf("XXX: processWorkset 2.1, event=%p, wrkrRunnig %d, max %d\n", events+iEvt, wrkrRunning, wrkrMax);
+ if(i < cs.wrkrMax) {
/* worker free -> use it! */
wrkrInfo[i].event = events+iEvt;
pthread_cond_signal(&wrkrInfo[i].run);
pthread_mutex_unlock(&wrkrMut);
} else {
-dbgprintf("XXX: processWorkset 2.2\n");
pthread_mutex_unlock(&wrkrMut);
/* no free worker, so we process this one ourselfs */
processWorkItem(events+iEvt);
@@ -1104,11 +1095,8 @@ dbgprintf("XXX: processWorkset 2.2\n");
* by workers running during the epoll call.
*/
pthread_mutex_lock(&wrkrMut);
-dbgprintf("XXX: processWorkset: waiting for all workers to idle, curr = %d\n", wrkrRunning);
while(wrkrRunning > 0) {
-dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, curr = %d\n", wrkrRunning);
pthread_cond_wait(&wrkrIdle, &wrkrMut);
-dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, after wait, curr = %d\n", wrkrRunning);
}
pthread_mutex_unlock(&wrkrMut);
}
@@ -1133,7 +1121,6 @@ wrkr(void *myself)
++wrkrRunning;
pthread_mutex_unlock(&wrkrMut);
-dbgprintf("XXX: worker %p activated\n", pthread_self());
++me->numCalled;
processWorkItem(me->event);
@@ -1141,7 +1128,6 @@ dbgprintf("XXX: worker %p activated\n", pthread_self());
me->event = NULL; /* indicate we are free again */
--wrkrRunning;
pthread_cond_signal(&wrkrIdle);
-dbgprintf("XXX: worker %p idling, currently running %d\n", pthread_self(), wrkrRunning);
}
pthread_mutex_unlock(&wrkrMut);
@@ -1254,6 +1240,7 @@ ENDafterRun
BEGINmodExit
CODESTARTmodExit
+ pthread_attr_destroy(&wrkrThrdAttr);
/* release objects we used */
objRelease(glbl, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
@@ -1268,6 +1255,7 @@ static rsRetVal
resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
cs.bEmitMsgOnClose = 0;
+ cs.wrkrMax = 2;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
free(cs.pszInputName);
cs.pszInputName = NULL;
@@ -1304,6 +1292,10 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
+ /* initialize "read-only" thread attributes */
+ pthread_attr_init(&wrkrThrdAttr);
+ pthread_attr_setstacksize(&wrkrThrdAttr, 2048*1024);
+
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord,
addTCPListener, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
@@ -1311,6 +1303,8 @@ CODEmodInit_QueryRegCFSLineHdlr
eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt,
NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt,
+ NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0,
eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID, eConfObjGlobal));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0,