diff options
Diffstat (limited to 'plugins/imptcp/imptcp.c')
-rw-r--r-- | plugins/imptcp/imptcp.c | 596 |
1 files changed, 492 insertions, 104 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 3973041b..e712dc58 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -49,6 +49,7 @@ #include <sys/types.h> #include <sys/socket.h> #include <sys/epoll.h> +#include <netinet/tcp.h> #if HAVE_FCNTL_H #include <fcntl.h> #endif @@ -74,6 +75,7 @@ MODULE_TYPE_INPUT MODULE_TYPE_NOKEEP +MODULE_CNFNAME("imptcp") /* static data */ DEF_IMOD_STATIC_DATA @@ -85,19 +87,52 @@ DEFobjCurrIf(errmsg) DEFobjCurrIf(ruleset) DEFobjCurrIf(statsobj) +/* forward references */ +static void * wrkr(void *myself); /* config settings */ typedef struct configSettings_s { + int bKeepAlive; /* support keep-alive packets */ + int iKeepAliveIntvl; + int iKeepAliveProbes; + int iKeepAliveTime; int bEmitMsgOnClose; /* emit an informational message on close by remote peer */ int bSuppOctetFram; /* support octet-counted framing? */ int iAddtlFrameDelim; /* addtl frame delimiter, e.g. for netscreen, default none */ uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ uchar *lstnIP; /* which IP we should listen on? */ - ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + uchar *pszBindRuleset; + int wrkrMax; /* max number of workers (actually "helper workers") */ } configSettings_t; - static configSettings_t cs; +struct instanceConf_s { + int bKeepAlive; /* support keep-alive packets */ + int iKeepAliveIntvl; + int iKeepAliveProbes; + int iKeepAliveTime; + int bEmitMsgOnClose; + int bSuppOctetFram; /* support octet-counted framing? */ + int iAddtlFrameDelim; + uchar *pszBindPort; /* port to bind to */ + uchar *pszBindAddr; /* IP to bind socket to */ + uchar *pszBindRuleset; /* name of ruleset to bind to */ + uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */ + ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + struct instanceConf_s *next; +}; + + +struct modConfData_s { + rsconf_t *pConf; /* our overall config object */ + instanceConf_t *root, *tail; + int wrkrMax; +}; + +static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */ +static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */ + +#include "im-helper.h" /* must be included AFTER the type definitions! */ /* data elements describing our running config */ typedef struct ptcpsrv_s ptcpsrv_t; typedef struct ptcplstn_s ptcplstn_t; @@ -112,14 +147,19 @@ struct ptcpsrv_s { ptcpsrv_t *pNext; /* linked list maintenance */ uchar *port; /* Port to listen to */ uchar *lstnIP; /* which IP we should listen on? */ - sbool bEmitMsgOnClose; sbool bSuppOctetFram; int iAddtlFrameDelim; + int iKeepAliveIntvl; + int iKeepAliveProbes; + int iKeepAliveTime; uchar *pszInputName; prop_t *pInputName; /* InputName in (fast to process) property format */ ruleset_t *pRuleset; ptcplstn_t *pLstn; /* root of our listeners */ ptcpsess_t *pSess; /* root of our sessions */ + pthread_mutex_t mutSessLst; + sbool bKeepAlive; /* support keep-alive packets */ + sbool bEmitMsgOnClose; }; /* the ptcp session object. Describes a single active session. @@ -162,6 +202,20 @@ struct ptcplstn_s { }; +/* The following structure controls the worker threads. Global data is + * needed for their access. + */ +static struct wrkrInfo_s { + pthread_t tid; /* the worker's thread ID */ + pthread_cond_t run; + struct epoll_event *event; /* event == NULL -> idle */ + long long unsigned numCalled; /* how often was this called */ +} wrkrInfo[16]; +static pthread_mutex_t wrkrMut; +static pthread_cond_t wrkrIdle; +static int wrkrRunning; + + /* type of object stored in epoll descriptor */ typedef enum { epolld_lstn, @@ -179,20 +233,10 @@ struct epolld_s { /* global data */ -//static permittedPeers_t *pPermPeersRoot = NULL; +pthread_attr_t wrkrThrdAttr; /* Attribute for session threads; read only after startup */ static ptcpsrv_t *pSrvRoot = NULL; static int epollfd = -1; /* (sole) descriptor for epoll */ static int iMaxLine; /* maximum size of a single message */ -/* we use a single static receive buffer, as this module is not multi-threaded. Keeping - * the buffer in the data segment is probably a little bit more efficient than on the stack - * (but at least I can't believe it will ever be less efficient ;) -- rgerhards, 2010-08-10 - * Note that we do NOT (yet?) provide a config setting to set the buffer size. For usual - * syslog traffic, it should be large enough. Also keep in mind that we run under a virtual - * memory system, so if we do not use large parts of the buffer, that's no issue at - * all -- it'll just use up address space. On the other hand, it would be silly to page in - * or page out some data just to get space for the IO buffer. - */ -static char rcvBuf[128*1024]; /* forward definitions */ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal); @@ -217,6 +261,7 @@ static void destructSrv(ptcpsrv_t *pSrv) { prop.Destruct(&pSrv->pInputName); + pthread_mutex_destroy(&pSrv->mutSessLst); free(pSrv->port); free(pSrv); } @@ -245,7 +290,7 @@ startupSrv(ptcpsrv_t *pSrv) lstnIP = pSrv->lstnIP == NULL ? UCHAR_CONSTANT("") : pSrv->lstnIP; - DBGPRINTF("imptcp creating listen socket on server '%s', port %s\n", lstnIP, pSrv->port); + DBGPRINTF("imptcp: creating listen socket on server '%s', port %s\n", lstnIP, pSrv->port); memset(&hints, 0, sizeof(hints)); hints.ai_flags = AI_PASSIVE; @@ -440,12 +485,80 @@ finalize_it: } +/* Enable KEEPALIVE handling on the socket. */ +static inline rsRetVal +EnableKeepAlive(ptcplstn_t *pLstn, int sock) +{ + int ret; + int optval; + socklen_t optlen; + DEFiRet; + + optval = 1; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen); + if(ret < 0) { + dbgprintf("EnableKeepAlive socket call returns error %d\n", ret); + ABORT_FINALIZE(RS_RET_ERR); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveProbes > 0) { + optval = pLstn->pSrv->iKeepAliveProbes; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + errmsg.LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive probes - ignored"); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveTime > 0) { + optval = pLstn->pSrv->iKeepAliveTime; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + errmsg.LogError(ret, NO_ERRCODE, "imptcp cannot set keepalive time - ignored"); + } + +# if defined(TCP_KEEPCNT) + if(pLstn->pSrv->iKeepAliveIntvl > 0) { + optval = pLstn->pSrv->iKeepAliveIntvl; + optlen = sizeof(optval); + ret = setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen); + } else { + ret = 0; + } +# else + ret = -1; +# endif + if(ret < 0) { + errmsg.LogError(errno, NO_ERRCODE, "imptcp cannot set keepalive intvl - ignored"); + } + + dbgprintf("KEEPALIVE enabled for socket %d\n", sock); + +finalize_it: + RETiRet; +} + /* accept an incoming connection request * rgerhards, 2008-04-22 */ static rsRetVal -AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP) +AcceptConnReq(ptcplstn_t *pLstn, int *newSock, prop_t **peerName, prop_t **peerIP) { int sockflags; struct sockaddr_storage addr; @@ -454,13 +567,17 @@ AcceptConnReq(int sock, int *newSock, prop_t **peerName, prop_t **peerIP) DEFiRet; - iNewSock = accept(sock, (struct sockaddr*) &addr, &addrlen); + iNewSock = accept(pLstn->sock, (struct sockaddr*) &addr, &addrlen); if(iNewSock < 0) { if(errno == EAGAIN || errno == EWOULDBLOCK) ABORT_FINALIZE(RS_RET_NO_MORE_DATA); ABORT_FINALIZE(RS_RET_ACCEPT_ERR); } + if(pLstn->pSrv->bKeepAlive) + EnableKeepAlive(pLstn, iNewSock);/* we ignore errors, best to do! */ + + CHKiRet(getPeerNames(peerName, peerIP, (struct sockaddr*) &addr)); /* set the new socket to non-blocking IO */ @@ -694,10 +811,12 @@ static inline void initConfigSettings(void) { cs.bEmitMsgOnClose = 0; + cs.wrkrMax = 2; cs.bSuppOctetFram = 1; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; cs.pszInputName = NULL; - cs.pRuleset = NULL; + cs.pszBindRuleset = NULL; + cs.pszInputName = NULL; cs.lstnIP = NULL; } @@ -822,10 +941,12 @@ addSess(ptcplstn_t *pLstn, int sock, prop_t *peerName, prop_t *peerIP) /* add to start of server's listener list */ pSess->prev = NULL; + pthread_mutex_lock(&pSrv->mutSessLst); pSess->next = pSrv->pSess; if(pSrv->pSess != NULL) pSrv->pSess->prev = pSess; pSrv->pSess = pSess; + pthread_mutex_unlock(&pSrv->mutSessLst); iRet = addEPollSock(epolld_sess, pSess, sock, &pSess->epd); @@ -848,10 +969,8 @@ closeSess(ptcpsess_t *pSess) CHKiRet(removeEPollSock(sock, pSess->epd)); close(sock); + pthread_mutex_lock(&pSess->pLstn->pSrv->mutSessLst); /* finally unlink session from structures */ -//fprintf(stderr, "closing session %d next %p, prev %p\n", pSess->sock, pSess->next, pSess->prev); -//DBGPRINTF("imptcp: pSess->next %p\n", pSess->next); -//DBGPRINTF("imptcp: pSess->prev %p\n", pSess->prev); if(pSess->next != NULL) pSess->next->prev = pSess->prev; if(pSess->prev == NULL) { @@ -860,6 +979,7 @@ closeSess(ptcpsess_t *pSess) } else { pSess->prev->next = pSess->next; } + pthread_mutex_unlock(&pSess->pLstn->pSrv->mutSessLst); /* unlinked, now remove structure */ destructSess(pSess); @@ -870,59 +990,88 @@ finalize_it: } -#if 0 -/* set permitted peer -- rgerhards, 2008-05-19 +/* This function is called when a new listener instace shall be added to + * the current config object via the legacy config system. It just shuffles + * all parameters to the listener in-memory instance. */ -static rsRetVal -setPermittedPeer(void __attribute__((unused)) *pVal, uchar *pszID) -{ - DEFiRet; - CHKiRet(net.AddPermittedPeer(&pPermPeersRoot, pszID)); - free(pszID); /* no longer needed, but we need to free as of interface def */ -finalize_it: - RETiRet; -} -#endif - - -/* accept a new ruleset to bind. Checks if it exists and complains, if not */ -static rsRetVal setRuleset(void __attribute__((unused)) *pVal, uchar *pszName) +static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal) { - ruleset_t *pRuleset; - rsRetVal localRet; + instanceConf_t *inst; DEFiRet; - localRet = ruleset.GetRuleset(&pRuleset, pszName); - if(localRet == RS_RET_NOT_FOUND) { - errmsg.LogError(0, NO_ERRCODE, "error: ruleset '%s' not found - ignored", pszName); + CHKmalloc(inst = MALLOC(sizeof(instanceConf_t))); + if(pNewVal == NULL || *pNewVal == '\0') { + errmsg.LogError(0, NO_ERRCODE, "imptcp: port number must be specified, listener ignored"); + } + if((pNewVal == NULL) || (pNewVal == '\0')) { + inst->pszBindPort = NULL; + } else { + CHKmalloc(inst->pszBindPort = ustrdup(pNewVal)); + } + if((cs.lstnIP == NULL) || (cs.lstnIP[0] == '\0')) { + inst->pszBindAddr = NULL; + } else { + CHKmalloc(inst->pszBindAddr = ustrdup(cs.lstnIP)); + } + if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) { + inst->pszBindRuleset = NULL; + } else { + CHKmalloc(inst->pszBindRuleset = ustrdup(cs.pszBindRuleset)); + } + if((cs.pszInputName == NULL) || (cs.pszInputName[0] == '\0')) { + inst->pszInputName = NULL; + } else { + CHKmalloc(inst->pszInputName = ustrdup(cs.pszInputName)); + } + inst->pBindRuleset = NULL; + inst->bSuppOctetFram = cs.bSuppOctetFram; + inst->bKeepAlive = cs.bKeepAlive; + inst->iKeepAliveIntvl = cs.iKeepAliveTime; + inst->iKeepAliveProbes = cs.iKeepAliveProbes; + inst->iKeepAliveTime = cs.iKeepAliveTime; + inst->bEmitMsgOnClose = cs.bEmitMsgOnClose; + inst->iAddtlFrameDelim = cs.iAddtlFrameDelim; + inst->next = NULL; + + /* node created, let's add to config */ + if(loadModConf->tail == NULL) { + loadModConf->tail = loadModConf->root = inst; + } else { + loadModConf->tail->next = inst; + loadModConf->tail = inst; } - CHKiRet(localRet); - cs.pRuleset = pRuleset; - DBGPRINTF("imptcp current bind ruleset %p: '%s'\n", pRuleset, pszName); finalize_it: - free(pszName); /* no longer needed */ + free(pNewVal); RETiRet; } -static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVal) +static inline rsRetVal +addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) { DEFiRet; ptcpsrv_t *pSrv; - CHKmalloc(pSrv = malloc(sizeof(ptcpsrv_t))); + CHKmalloc(pSrv = MALLOC(sizeof(ptcpsrv_t))); + pthread_mutex_init(&pSrv->mutSessLst, NULL); pSrv->pSess = NULL; pSrv->pLstn = NULL; - pSrv->bSuppOctetFram = cs.bSuppOctetFram; - pSrv->bEmitMsgOnClose = cs.bEmitMsgOnClose; - pSrv->port = pNewVal; - pSrv->iAddtlFrameDelim = cs.iAddtlFrameDelim; - cs.pszInputName = NULL; /* moved over to pSrv, we do not own */ - pSrv->lstnIP = cs.lstnIP; - cs.lstnIP = NULL; /* moved over to pSrv, we do not own */ - pSrv->pRuleset = cs.pRuleset; - pSrv->pszInputName = (cs.pszInputName == NULL) ? UCHAR_CONSTANT("imptcp") : cs.pszInputName; + pSrv->bSuppOctetFram = inst->bSuppOctetFram; + pSrv->bKeepAlive = inst->bKeepAlive; + pSrv->iKeepAliveIntvl = inst->iKeepAliveTime; + pSrv->iKeepAliveProbes = inst->iKeepAliveProbes; + pSrv->iKeepAliveTime = inst->iKeepAliveTime; + pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose; + CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort)); + pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim; + if(inst->pszBindAddr == NULL) + pSrv->lstnIP = NULL; + else { + CHKmalloc(pSrv->lstnIP = ustrdup(inst->pszBindAddr)); + } + pSrv->pRuleset = inst->pBindRuleset; + pSrv->pszInputName = (inst->pszInputName == NULL) ? UCHAR_CONSTANT("imptcp") : ustrdup(inst->pszInputName); CHKiRet(prop.Construct(&pSrv->pInputName)); CHKiRet(prop.SetString(pSrv->pInputName, pSrv->pszInputName, ustrlen(pSrv->pszInputName))); CHKiRet(prop.ConstructFinalize(pSrv->pInputName)); @@ -944,6 +1093,46 @@ finalize_it: } +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +startWorkerPool(void) +{ + int i; + wrkrRunning = 0; + if(runModConf->wrkrMax > 16) + runModConf->wrkrMax = 16; /* TODO: make dynamic? */ + pthread_mutex_init(&wrkrMut, NULL); + pthread_cond_init(&wrkrIdle, NULL); + for(i = 0 ; i < runModConf->wrkrMax ; ++i) { + /* init worker info structure! */ + pthread_cond_init(&wrkrInfo[i].run, NULL); + wrkrInfo[i].event = NULL; + wrkrInfo[i].numCalled = 0; + pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i])); + } + +} + +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +stopWorkerPool(void) +{ + int i; + for(i = 0 ; i < runModConf->wrkrMax ; ++i) { + pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ + pthread_join(wrkrInfo[i].tid, NULL); + DBGPRINTF("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); + pthread_cond_destroy(&wrkrInfo[i].run); + } + pthread_cond_destroy(&wrkrIdle); + pthread_mutex_destroy(&wrkrMut); + +} + + + /* start up all listeners * This is a one-time stop once the module is set to start. */ @@ -951,15 +1140,29 @@ static inline rsRetVal startupServers() { DEFiRet; + rsRetVal localRet, lastErr; + int iOK; + int iAll; ptcpsrv_t *pSrv; + iAll = iOK = 0; + lastErr = RS_RET_ERR; pSrv = pSrvRoot; while(pSrv != NULL) { - DBGPRINTF("Starting up ptcp server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName); - startupSrv(pSrv); + DBGPRINTF("imptcp: starting up server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName); + localRet = startupSrv(pSrv); + if(localRet == RS_RET_OK) + iOK++; + else + lastErr = localRet; + ++iAll; pSrv = pSrv->pNext; } + DBGPRINTF("imptcp: %d out of %d servers started successfully\n", iOK, iAll); + if(iOK == 0) /* iff all fails, we report an error */ + iRet = lastErr; + RETiRet; } @@ -977,9 +1180,9 @@ lstnActivity(ptcplstn_t *pLstn) DEFiRet; DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock); - while(1) { - localRet = AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP); - if(localRet == RS_RET_NO_MORE_DATA) + while(glbl.GetGlobalInputTermState() == 0) { + localRet = AcceptConnReq(pLstn, &newSock, &peerName, &peerIP); + if(localRet == RS_RET_NO_MORE_DATA || glbl.GetGlobalInputTermState() == 1) break; CHKiRet(localRet); CHKiRet(addSess(pLstn, newSock, peerName, peerIP)); @@ -998,6 +1201,7 @@ sessActivity(ptcpsess_t *pSess) { int lenRcv; int lenBuf; + char rcvBuf[128*1024]; DEFiRet; DBGPRINTF("imptcp: new activity on session socket %d\n", pSess->sock); @@ -1035,47 +1239,160 @@ finalize_it: } -/* This function is called to gather input. +/* This function is called to process a single request. This may + * be carried out by the main worker or a helper. It can be run + * concurrently. */ -BEGINrunInput - int i; - int nfds; - struct epoll_event events[1]; +static inline void +processWorkItem(struct epoll_event *event) +{ epolld_t *epd; -CODESTARTrunInput - DBGPRINTF("imptcp now beginning to process input data\n"); - /* v5 TODO: consentual termination mode */ - while(1) { - DBGPRINTF("imptcp going on epoll_wait\n"); - nfds = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1); - for(i = 0 ; i < nfds ; ++i) { /* support for larger batches (later, TODO) */ - epd = (epolld_t*) events[i].data.ptr; - switch(epd->typ) { - case epolld_lstn: - lstnActivity((ptcplstn_t *) epd->ptr); - break; - case epolld_sess: - sessActivity((ptcpsess_t *) epd->ptr); - break; - default: - errmsg.LogError(0, RS_RET_INTERNAL_ERROR, - "error: invalid epolld_type_t %d after epoll", epd->typ); - break; + + epd = (epolld_t*) event->data.ptr; + switch(epd->typ) { + case epolld_lstn: + lstnActivity((ptcplstn_t *) epd->ptr); + break; + case epolld_sess: + sessActivity((ptcpsess_t *) epd->ptr); + break; + default: + errmsg.LogError(0, RS_RET_INTERNAL_ERROR, + "error: invalid epolld_type_t %d after epoll", epd->typ); + break; + } +} + + +/* This function is called to process a complete workset, that + * is a set of events returned from epoll. + */ +static inline void +processWorkSet(int nEvents, struct epoll_event events[]) +{ + int iEvt; + int i; + int remainEvents; + + remainEvents = nEvents; + for(iEvt = 0 ; (iEvt < nEvents) && (glbl.GetGlobalInputTermState() == 0) ; ++iEvt) { + if(remainEvents == 1) { + /* process self, save context switch */ + processWorkItem(events+iEvt); + } else { + pthread_mutex_lock(&wrkrMut); + /* check if there is a free worker */ + for(i = 0 ; (i < runModConf->wrkrMax) && (wrkrInfo[i].event != NULL) ; ++i) + /*do search*/; + if(i < runModConf->wrkrMax) { + /* worker free -> use it! */ + wrkrInfo[i].event = events+iEvt; + ++wrkrRunning; + pthread_cond_signal(&wrkrInfo[i].run); + pthread_mutex_unlock(&wrkrMut); + } else { + pthread_mutex_unlock(&wrkrMut); + /* no free worker, so we process this one ourselfs */ + processWorkItem(events+iEvt); } } + --remainEvents; } -ENDrunInput + if(nEvents > 1) { + /* we now need to wait until all workers finish. This is because the + * rest of this module can not handle the concurrency introduced + * by workers running during the epoll call. + */ + pthread_mutex_lock(&wrkrMut); + while(wrkrRunning > 0) { + pthread_cond_wait(&wrkrIdle, &wrkrMut); + } + pthread_mutex_unlock(&wrkrMut); + } -/* initialize and return if will run or not */ -BEGINwillRun -CODESTARTwillRun - /* first apply some config settings */ - //net.PrintAllowedSenders(2); /* TCP */ +} + + +/* worker to process incoming requests + */ +static void * +wrkr(void *myself) +{ + struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself; + + pthread_mutex_lock(&wrkrMut); + while(1) { + while(me->event == NULL && glbl.GetGlobalInputTermState() == 0) { + pthread_cond_wait(&me->run, &wrkrMut); + } + if(glbl.GetGlobalInputTermState() == 1) + break; + pthread_mutex_unlock(&wrkrMut); + + ++me->numCalled; + processWorkItem(me->event); + + pthread_mutex_lock(&wrkrMut); + me->event = NULL; /* indicate we are free again */ + --wrkrRunning; + pthread_cond_signal(&wrkrIdle); + } + pthread_mutex_unlock(&wrkrMut); + + return NULL; +} + + +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; + /* init legacy config vars */ + initConfigSettings(); +ENDbeginCnfLoad + + +BEGINendCnfLoad +CODESTARTendCnfLoad + /* persist module-specific settings from legacy config system */ + loadModConf->wrkrMax = cs.wrkrMax; + + loadModConf = NULL; /* done loading */ + /* free legacy config vars */ + free(cs.pszInputName); + free(cs.lstnIP); +ENDendCnfLoad + + +/* function to generate error message if framework does not find requested ruleset */ +static inline void +std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, instanceConf_t *inst) +{ + errmsg.LogError(0, NO_ERRCODE, "imptcp: ruleset '%s' for port %s not found - " + "using default ruleset instead", inst->pszBindRuleset, + inst->pszBindPort); +} +BEGINcheckCnf + instanceConf_t *inst; +CODESTARTcheckCnf + for(inst = pModConf->root ; inst != NULL ; inst = inst->next) { + std_checkRuleset(pModConf, inst); + } +ENDcheckCnf + + +BEGINactivateCnfPrePrivDrop + instanceConf_t *inst; +CODESTARTactivateCnfPrePrivDrop iMaxLine = glbl.GetMaxLine(); /* get maximum size we currently support */ + runModConf = pModConf; + for(inst = runModConf->root ; inst != NULL ; inst = inst->next) { + addListner(pModConf, inst); + } if(pSrvRoot == NULL) { - errmsg.LogError(0, RS_RET_NO_LSTN_DEFINED, "error: no ptcp server defined, module can not run."); + errmsg.LogError(0, RS_RET_NO_LSTN_DEFINED, "imptcp: no ptcp server defined, module can not run."); ABORT_FINALIZE(RS_RET_NO_RUN); } @@ -1102,6 +1419,52 @@ CODESTARTwillRun CHKiRet(startupServers()); DBGPRINTF("imptcp started up, but not yet receiving data\n"); finalize_it: +ENDactivateCnfPrePrivDrop + + +BEGINactivateCnf +CODESTARTactivateCnf + /* nothing to do, all done pre priv drop */ +ENDactivateCnf + + +BEGINfreeCnf + instanceConf_t *inst, *del; +CODESTARTfreeCnf + for(inst = pModConf->root ; inst != NULL ; ) { + free(inst->pszBindPort); + free(inst->pszBindAddr); + free(inst->pszBindRuleset); + free(inst->pszInputName); + del = inst; + inst = inst->next; + free(del); + } +ENDfreeCnf + + +/* This function is called to gather input. + */ +BEGINrunInput + int nEvents; + struct epoll_event events[128]; +CODESTARTrunInput + startWorkerPool(); + DBGPRINTF("imptcp: now beginning to process input data\n"); + while(glbl.GetGlobalInputTermState() == 0) { + DBGPRINTF("imptcp going on epoll_wait\n"); + nEvents = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1); + DBGPRINTF("imptcp: epoll returned %d events\n", nEvents); + processWorkSet(nEvents, events); + } + DBGPRINTF("imptcp: successfully terminated\n"); + /* we stop the worker pool in AfterRun, in case we get cancelled for some reason (old Interface) */ +ENDrunInput + + +/* initialize and return if will run or not */ +BEGINwillRun +CODESTARTwillRun ENDwillRun @@ -1142,8 +1505,8 @@ shutdownSrv(ptcpsrv_t *pSrv) BEGINafterRun ptcpsrv_t *pSrv, *srvDel; CODESTARTafterRun - /* do cleanup here */ - //net.clearAllowedSenders(UCHAR_CONSTANT("TCP")); + stopWorkerPool(); + /* we need to close everything that is still open */ pSrv = pSrvRoot; while(pSrv != NULL) { @@ -1159,12 +1522,7 @@ ENDafterRun BEGINmodExit CODESTARTmodExit -#if 0 - if(pPermPeersRoot != NULL) { - net.DestructPermittedPeers(&pPermPeersRoot); - } -#endif - + pthread_attr_destroy(&wrkrThrdAttr); /* release objects we used */ objRelease(glbl, CORE_COMPONENT); objRelease(statsobj, CORE_COMPONENT); @@ -1180,6 +1538,11 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { cs.bEmitMsgOnClose = 0; + cs.wrkrMax = 2; + cs.bKeepAlive = 0; + cs.iKeepAliveProbes = 0; + cs.iKeepAliveTime = 0; + cs.iKeepAliveIntvl = 0; cs.bSuppOctetFram = 1; cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER; free(cs.pszInputName); @@ -1190,10 +1553,19 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus } +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt @@ -1201,7 +1573,6 @@ BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ CODEmodInit_QueryRegCFSLineHdlr - initConfigSettings(); /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); @@ -1211,21 +1582,38 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); + /* initialize "read-only" thread attributes */ + pthread_attr_init(&wrkrThrdAttr); + pthread_attr_setstacksize(&wrkrThrdAttr, 2048*1024); + + /* init legacy config settings */ + initConfigSettings(); + /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord, - addTCPListener, NULL, STD_LOADABLE_MODULE_ID)); + addInstance, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive"), 0, eCmdHdlrBinary, + NULL, &cs.bKeepAlive, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_probes"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveProbes, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_time"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveTime, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverkeepalive_intvl"), 0, eCmdHdlrInt, + NULL, &cs.iKeepAliveIntvl, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserversupportoctetcountedframing"), 0, eCmdHdlrBinary, NULL, &cs.bSuppOctetFram, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpservernotifyonconnectionclose"), 0, eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt, NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt, + NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0, eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0, eCmdHdlrGetWord, NULL, &cs.lstnIP, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverbindruleset"), 0, - eCmdHdlrGetWord, setRuleset, NULL, STD_LOADABLE_MODULE_ID)); + eCmdHdlrGetWord, NULL, cs.pszBindRuleset, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("resetconfigvariables"), 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit |