diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2011-01-31 09:14:06 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2011-01-31 09:14:06 +0100 |
commit | 298c3fa835ed5ad241246fe90c6039fc7781625c (patch) | |
tree | 8b7a0404eb83f6c0ec514b11372cc61820f7173b /plugins | |
parent | 48b6f191fea317fd2bb031769a574c47accd2923 (diff) | |
download | rsyslog-298c3fa835ed5ad241246fe90c6039fc7781625c.tar.gz rsyslog-298c3fa835ed5ad241246fe90c6039fc7781625c.tar.xz rsyslog-298c3fa835ed5ad241246fe90c6039fc7781625c.zip |
cleanup of imptcp; doc added; config statement for workers added
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imptcp/imptcp.c | 40 |
1 files changed, 17 insertions, 23 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 6e3a67ab..cb7e7ab8 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -93,6 +93,7 @@ typedef struct configSettings_s { uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ uchar *lstnIP; /* which IP we should listen on? */ ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + int wrkrMax; /* max number of workers (actually "helper workers") */ } configSettings_t; static configSettings_t cs; @@ -166,7 +167,6 @@ static struct wrkrInfo_s { } wrkrInfo[4]; static pthread_mutex_t wrkrMut; static pthread_cond_t wrkrIdle; -static int wrkrMax = 4; static int wrkrRunning; @@ -187,7 +187,7 @@ struct epolld_s { /* global data */ -//static permittedPeers_t *pPermPeersRoot = NULL; +pthread_attr_t wrkrThrdAttr; /* Attribute for session threads; read only after startup */ static ptcpsrv_t *pSrvRoot = NULL; static int epollfd = -1; /* (sole) descriptor for epoll */ static int iMaxLine; /* maximum size of a single message */ @@ -684,6 +684,7 @@ static inline void initConfigSettings(void) { cs.bEmitMsgOnClose = 0; + cs.wrkrMax = 2; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; cs.pszInputName = NULL; cs.pRuleset = NULL; @@ -823,9 +824,6 @@ closeSess(ptcpsess_t *pSess) close(sock); /* finally unlink session from structures */ -//fprintf(stderr, "closing session %d next %p, prev %p\n", pSess->sock, pSess->next, pSess->prev); -//DBGPRINTF("imptcp: pSess->next %p\n", pSess->next); -//DBGPRINTF("imptcp: pSess->prev %p\n", pSess->prev); if(pSess->next != NULL) pSess->next->prev = pSess->prev; if(pSess->prev == NULL) { @@ -911,12 +909,12 @@ startWorkerPool(void) wrkrRunning = 0; pthread_mutex_init(&wrkrMut, NULL); pthread_cond_init(&wrkrIdle, NULL); - for(i = 0 ; i < wrkrMax ; ++i) { + for(i = 0 ; i < cs.wrkrMax ; ++i) { /* init worker info structure! */ pthread_cond_init(&wrkrInfo[i].run, NULL); wrkrInfo[i].event = NULL; wrkrInfo[i].numCalled = 0; - pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i])); + pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i])); } } @@ -927,11 +925,10 @@ static inline void stopWorkerPool(void) { int i; - for(i = 0 ; i < wrkrMax ; ++i) { + for(i = 0 ; i < cs.wrkrMax ; ++i) { pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ pthread_join(wrkrInfo[i].tid, NULL); DBGPRINTF("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); - printf("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); pthread_cond_destroy(&wrkrInfo[i].run); } pthread_cond_destroy(&wrkrIdle); @@ -952,7 +949,7 @@ startupServers() pSrv = pSrvRoot; while(pSrv != NULL) { - DBGPRINTF("Starting up ptcp server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName); + DBGPRINTF("imptcp: starting up server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName); startupSrv(pSrv); pSrv = pSrv->pNext; } @@ -1070,26 +1067,20 @@ processWorkSet(int nEvents, struct epoll_event events[]) remainEvents = nEvents; for(iEvt = 0 ; (iEvt < nEvents) && (glbl.GetGlobalInputTermState() == 0) ; ++iEvt) { -dbgprintf("XXX: remain entries during processing %d\n", remainEvents); if(remainEvents == 1) { -dbgprintf("XXX: processWorkset 1\n"); /* process self, save context switch */ processWorkItem(events+iEvt); } else { -dbgprintf("XXX: processWorkset 2\n"); pthread_mutex_lock(&wrkrMut); /* check if there is a free worker */ - for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].event != NULL) ; ++i) -dbgprintf("XXX: wrkrinfo[%d].event %p\n", i, wrkrInfo[i].event); + for(i = 0 ; (i < cs.wrkrMax) && (wrkrInfo[i].event != NULL) ; ++i) /*do search*/; - if(i < wrkrMax) { -dbgprintf("XXX: processWorkset 2.1, event=%p, wrkrRunnig %d, max %d\n", events+iEvt, wrkrRunning, wrkrMax); + if(i < cs.wrkrMax) { /* worker free -> use it! */ wrkrInfo[i].event = events+iEvt; pthread_cond_signal(&wrkrInfo[i].run); pthread_mutex_unlock(&wrkrMut); } else { -dbgprintf("XXX: processWorkset 2.2\n"); pthread_mutex_unlock(&wrkrMut); /* no free worker, so we process this one ourselfs */ processWorkItem(events+iEvt); @@ -1104,11 +1095,8 @@ dbgprintf("XXX: processWorkset 2.2\n"); * by workers running during the epoll call. */ pthread_mutex_lock(&wrkrMut); -dbgprintf("XXX: processWorkset: waiting for all workers to idle, curr = %d\n", wrkrRunning); while(wrkrRunning > 0) { -dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, curr = %d\n", wrkrRunning); pthread_cond_wait(&wrkrIdle, &wrkrMut); -dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, after wait, curr = %d\n", wrkrRunning); } pthread_mutex_unlock(&wrkrMut); } @@ -1133,7 +1121,6 @@ wrkr(void *myself) ++wrkrRunning; pthread_mutex_unlock(&wrkrMut); -dbgprintf("XXX: worker %p activated\n", pthread_self()); ++me->numCalled; processWorkItem(me->event); @@ -1141,7 +1128,6 @@ dbgprintf("XXX: worker %p activated\n", pthread_self()); me->event = NULL; /* indicate we are free again */ --wrkrRunning; pthread_cond_signal(&wrkrIdle); -dbgprintf("XXX: worker %p idling, currently running %d\n", pthread_self(), wrkrRunning); } pthread_mutex_unlock(&wrkrMut); @@ -1254,6 +1240,7 @@ ENDafterRun BEGINmodExit CODESTARTmodExit + pthread_attr_destroy(&wrkrThrdAttr); /* release objects we used */ objRelease(glbl, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); @@ -1268,6 +1255,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { cs.bEmitMsgOnClose = 0; + cs.wrkrMax = 2; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; free(cs.pszInputName); cs.pszInputName = NULL; @@ -1304,6 +1292,10 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); + /* initialize "read-only" thread attributes */ + pthread_attr_init(&wrkrThrdAttr); + pthread_attr_setstacksize(&wrkrThrdAttr, 2048*1024); + /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord, addTCPListener, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); @@ -1311,6 +1303,8 @@ CODEmodInit_QueryRegCFSLineHdlr eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt, NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt, + NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0, eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0, |