diff options
Diffstat (limited to 'plugins/imptcp/imptcp.c')
-rw-r--r-- | plugins/imptcp/imptcp.c | 283 |
1 files changed, 210 insertions, 73 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index e8cffbb0..33277148 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -10,7 +10,7 @@ * * File begun on 2010-08-10 by RGerhards * - * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -84,7 +84,8 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(errmsg) DEFobjCurrIf(ruleset) - +/* forward references */ +static void * wrkr(void *myself); /* config settings */ typedef struct configSettings_s { @@ -93,6 +94,7 @@ typedef struct configSettings_s { uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ uchar *lstnIP; /* which IP we should listen on? */ ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + int wrkrMax; /* max number of workers (actually "helper workers") */ } configSettings_t; static configSettings_t cs; @@ -118,6 +120,7 @@ struct ptcpsrv_s { ruleset_t *pRuleset; ptcplstn_t *pLstn; /* root of our listeners */ ptcpsess_t *pSess; /* root of our sessions */ + pthread_mutex_t mutSessLst; }; /* the ptcp session object. Describes a single active session. @@ -155,6 +158,20 @@ struct ptcplstn_s { }; +/* The following structure controls the worker threads. Global data is + * needed for their access. + */ +static struct wrkrInfo_s { + pthread_t tid; /* the worker's thread ID */ + pthread_cond_t run; + struct epoll_event *event; /* event == NULL -> idle */ + long long unsigned numCalled; /* how often was this called */ +} wrkrInfo[16]; +static pthread_mutex_t wrkrMut; +static pthread_cond_t wrkrIdle; +static int wrkrRunning; + + /* type of object stored in epoll descriptor */ typedef enum { epolld_lstn, @@ -172,20 +189,10 @@ struct epolld_s { /* global data */ -//static permittedPeers_t *pPermPeersRoot = NULL; +pthread_attr_t wrkrThrdAttr; /* Attribute for session threads; read only after startup */ static ptcpsrv_t *pSrvRoot = NULL; static int epollfd = -1; /* (sole) descriptor for epoll */ static int iMaxLine; /* maximum size of a single message */ -/* we use a single static receive buffer, as this module is not multi-threaded. Keeping - * the buffer in the data segment is probably a little bit more efficient than on the stack - * (but at least I can't believe it will ever be less efficient ;) -- rgerhards, 2010-08-10 - * Note that we do NOT (yet?) provide a config setting to set the buffer size. For usual - * syslog traffic, it should be large enough. Also keep in mind that we run under a virtual - * memory system, so if we do not use large parts of the buffer, that's no issue at - * all -- it'll just use up address space. On the other hand, it would be silly to page in - * or page out some data just to get space for the IO buffer. - */ -static char rcvBuf[128*1024]; /* forward definitions */ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal); @@ -210,6 +217,7 @@ static void destructSrv(ptcpsrv_t *pSrv) { prop.Destruct(&pSrv->pInputName); + pthread_mutex_destroy(&pSrv->mutSessLst); free(pSrv->port); free(pSrv); } @@ -679,6 +687,7 @@ static inline void initConfigSettings(void) { cs.bEmitMsgOnClose = 0; + cs.wrkrMax = 2; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; cs.pszInputName = NULL; cs.pRuleset = NULL; @@ -791,10 +800,12 @@ addSess(ptcpsrv_t *pSrv, int sock, prop_t *peerName, prop_t *peerIP) /* add to start of server's listener list */ pSess->prev = NULL; + pthread_mutex_lock(&pSrv->mutSessLst); pSess->next = pSrv->pSess; if(pSrv->pSess != NULL) pSrv->pSess->prev = pSess; pSrv->pSess = pSess; + pthread_mutex_unlock(&pSrv->mutSessLst); iRet = addEPollSock(epolld_sess, pSess, sock, &pSess->epd); @@ -817,10 +828,8 @@ closeSess(ptcpsess_t *pSess) CHKiRet(removeEPollSock(sock, pSess->epd)); close(sock); + pthread_mutex_lock(&pSess->pSrv->mutSessLst); /* finally unlink session from structures */ -//fprintf(stderr, "closing session %d next %p, prev %p\n", pSess->sock, pSess->next, pSess->prev); -//DBGPRINTF("imptcp: pSess->next %p\n", pSess->next); -//DBGPRINTF("imptcp: pSess->prev %p\n", pSess->prev); if(pSess->next != NULL) pSess->next->prev = pSess->prev; if(pSess->prev == NULL) { @@ -829,6 +838,7 @@ closeSess(ptcpsess_t *pSess) } else { pSess->prev->next = pSess->next; } + pthread_mutex_unlock(&pSess->pSrv->mutSessLst); /* unlinked, now remove structure */ destructSess(pSess); @@ -839,21 +849,6 @@ finalize_it: } -#if 0 -/* set permitted peer -- rgerhards, 2008-05-19 - */ -static rsRetVal -setPermittedPeer(void __attribute__((unused)) *pVal, uchar *pszID) -{ - DEFiRet; - CHKiRet(net.AddPermittedPeer(&pPermPeersRoot, pszID)); - free(pszID); /* no longer needed, but we need to free as of interface def */ -finalize_it: - RETiRet; -} -#endif - - /* accept a new ruleset to bind. Checks if it exists and complains, if not */ static rsRetVal setRuleset(void __attribute__((unused)) *pVal, uchar *pszName) { @@ -881,6 +876,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa ptcpsrv_t *pSrv; CHKmalloc(pSrv = malloc(sizeof(ptcpsrv_t))); + pthread_mutex_init(&pSrv->mutSessLst, NULL); pSrv->pSess = NULL; pSrv->pLstn = NULL; pSrv->bEmitMsgOnClose = cs.bEmitMsgOnClose; @@ -912,6 +908,46 @@ finalize_it: } +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +startWorkerPool(void) +{ + int i; + wrkrRunning = 0; + if(cs.wrkrMax > 16) + cs.wrkrMax = 16; /* TODO: make dynamic? */ + pthread_mutex_init(&wrkrMut, NULL); + pthread_cond_init(&wrkrIdle, NULL); + for(i = 0 ; i < cs.wrkrMax ; ++i) { + /* init worker info structure! */ + pthread_cond_init(&wrkrInfo[i].run, NULL); + wrkrInfo[i].event = NULL; + wrkrInfo[i].numCalled = 0; + pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i])); + } + +} + +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +stopWorkerPool(void) +{ + int i; + for(i = 0 ; i < cs.wrkrMax ; ++i) { + pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ + pthread_join(wrkrInfo[i].tid, NULL); + DBGPRINTF("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); + pthread_cond_destroy(&wrkrInfo[i].run); + } + pthread_cond_destroy(&wrkrIdle); + pthread_mutex_destroy(&wrkrMut); + +} + + + /* start up all listeners * This is a one-time stop once the module is set to start. */ @@ -923,7 +959,7 @@ startupServers() pSrv = pSrvRoot; while(pSrv != NULL) { - DBGPRINTF("Starting up ptcp server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName); + DBGPRINTF("imptcp: starting up server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName); startupSrv(pSrv); pSrv = pSrv->pNext; } @@ -945,9 +981,9 @@ lstnActivity(ptcplstn_t *pLstn) DEFiRet; DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock); - while(1) { + while(glbl.GetGlobalInputTermState() == 0) { localRet = AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP); - if(localRet == RS_RET_NO_MORE_DATA) + if(localRet == RS_RET_NO_MORE_DATA || glbl.GetGlobalInputTermState() == 1) break; CHKiRet(localRet); CHKiRet(addSess(pLstn->pSrv, newSock, peerName, peerIP)); @@ -966,6 +1002,7 @@ sessActivity(ptcpsess_t *pSess) { int lenRcv; int lenBuf; + char rcvBuf[128*1024]; DEFiRet; DBGPRINTF("imptcp: new activity on session socket %d\n", pSess->sock); @@ -1003,35 +1040,127 @@ finalize_it: } +/* This function is called to process a single request. This may + * be carried out by the main worker or a helper. It can be run + * concurrently. + */ +static inline void +processWorkItem(struct epoll_event *event) +{ + epolld_t *epd; + + epd = (epolld_t*) event->data.ptr; + switch(epd->typ) { + case epolld_lstn: + lstnActivity((ptcplstn_t *) epd->ptr); + break; + case epolld_sess: + sessActivity((ptcpsess_t *) epd->ptr); + break; + default: + errmsg.LogError(0, RS_RET_INTERNAL_ERROR, + "error: invalid epolld_type_t %d after epoll", epd->typ); + break; + } +} + + +/* This function is called to process a complete workset, that + * is a set of events returned from epoll. + */ +static inline void +processWorkSet(int nEvents, struct epoll_event events[]) +{ + int iEvt; + int i; + int remainEvents; + + remainEvents = nEvents; + for(iEvt = 0 ; (iEvt < nEvents) && (glbl.GetGlobalInputTermState() == 0) ; ++iEvt) { + if(remainEvents == 1) { + /* process self, save context switch */ + processWorkItem(events+iEvt); + } else { + pthread_mutex_lock(&wrkrMut); + /* check if there is a free worker */ + for(i = 0 ; (i < cs.wrkrMax) && (wrkrInfo[i].event != NULL) ; ++i) + /*do search*/; + if(i < cs.wrkrMax) { + /* worker free -> use it! */ + wrkrInfo[i].event = events+iEvt; + ++wrkrRunning; + pthread_cond_signal(&wrkrInfo[i].run); + pthread_mutex_unlock(&wrkrMut); + } else { + pthread_mutex_unlock(&wrkrMut); + /* no free worker, so we process this one ourselfs */ + processWorkItem(events+iEvt); + } + } + --remainEvents; + } + + if(nEvents > 1) { + /* we now need to wait until all workers finish. This is because the + * rest of this module can not handle the concurrency introduced + * by workers running during the epoll call. + */ + pthread_mutex_lock(&wrkrMut); + while(wrkrRunning > 0) { + pthread_cond_wait(&wrkrIdle, &wrkrMut); + } + pthread_mutex_unlock(&wrkrMut); + } + +} + + +/* worker to process incoming requests + */ +static void * +wrkr(void *myself) +{ + struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself; + + pthread_mutex_lock(&wrkrMut); + while(1) { + while(me->event == NULL && glbl.GetGlobalInputTermState() == 0) { + pthread_cond_wait(&me->run, &wrkrMut); + } + if(glbl.GetGlobalInputTermState() == 1) + break; + pthread_mutex_unlock(&wrkrMut); + + ++me->numCalled; + processWorkItem(me->event); + + pthread_mutex_lock(&wrkrMut); + me->event = NULL; /* indicate we are free again */ + --wrkrRunning; + pthread_cond_signal(&wrkrIdle); + } + pthread_mutex_unlock(&wrkrMut); + + return NULL; +} + + /* This function is called to gather input. */ BEGINrunInput - int i; - int nfds; - struct epoll_event events[1]; - epolld_t *epd; + int nEvents; + struct epoll_event events[128]; CODESTARTrunInput - DBGPRINTF("imptcp now beginning to process input data\n"); - /* v5 TODO: consentual termination mode */ - while(1) { + startWorkerPool(); + DBGPRINTF("imptcp: now beginning to process input data\n"); + while(glbl.GetGlobalInputTermState() == 0) { DBGPRINTF("imptcp going on epoll_wait\n"); - nfds = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1); - for(i = 0 ; i < nfds ; ++i) { /* support for larger batches (later, TODO) */ - epd = (epolld_t*) events[i].data.ptr; - switch(epd->typ) { - case epolld_lstn: - lstnActivity((ptcplstn_t *) epd->ptr); - break; - case epolld_sess: - sessActivity((ptcpsess_t *) epd->ptr); - break; - default: - errmsg.LogError(0, RS_RET_INTERNAL_ERROR, - "error: invalid epolld_type_t %d after epoll", epd->typ); - break; - } - } + nEvents = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1); + DBGPRINTF("imptcp: epoll returned %d events\n", nEvents); + processWorkSet(nEvents, events); } + DBGPRINTF("imptcp: successfully terminated\n"); + /* we stop the worker pool in AfterRun, in case we get cancelled for some reason (old Interface) */ ENDrunInput @@ -1039,7 +1168,6 @@ ENDrunInput BEGINwillRun CODESTARTwillRun /* first apply some config settings */ - //net.PrintAllowedSenders(2); /* TCP */ iMaxLine = glbl.GetMaxLine(); /* get maximum size we currently support */ if(pSrvRoot == NULL) { @@ -1108,8 +1236,8 @@ shutdownSrv(ptcpsrv_t *pSrv) BEGINafterRun ptcpsrv_t *pSrv, *srvDel; CODESTARTafterRun - /* do cleanup here */ - //net.clearAllowedSenders(UCHAR_CONSTANT("TCP")); + stopWorkerPool(); + /* we need to close everything that is still open */ pSrv = pSrvRoot; while(pSrv != NULL) { @@ -1125,12 +1253,7 @@ ENDafterRun BEGINmodExit CODESTARTmodExit -#if 0 - if(pPermPeersRoot != NULL) { - net.DestructPermittedPeers(&pPermPeersRoot); - } -#endif - + pthread_attr_destroy(&wrkrThrdAttr); /* release objects we used */ objRelease(glbl, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); @@ -1145,6 +1268,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { cs.bEmitMsgOnClose = 0; + cs.wrkrMax = 2; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; free(cs.pszInputName); cs.pszInputName = NULL; @@ -1154,10 +1278,17 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus } +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -1174,21 +1305,27 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); + /* initialize "read-only" thread attributes */ + pthread_attr_init(&wrkrThrdAttr); + pthread_attr_setstacksize(&wrkrThrdAttr, 2048*1024); + /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord, - addTCPListener, NULL, STD_LOADABLE_MODULE_ID)); + addTCPListener, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpservernotifyonconnectionclose"), 0, - eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID)); + eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt, - NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID)); + NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt, + NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0, - eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID)); + eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0, - eCmdHdlrGetWord, NULL, &cs.lstnIP, STD_LOADABLE_MODULE_ID)); + eCmdHdlrGetWord, NULL, &cs.lstnIP, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverbindruleset"), 0, - eCmdHdlrGetWord, setRuleset, NULL, STD_LOADABLE_MODULE_ID)); + eCmdHdlrGetWord, setRuleset, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler, - resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); + resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID, eConfObjGlobal)); ENDmodInit |