summaryrefslogtreecommitdiffstats
path: root/plugins/imptcp/imptcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imptcp/imptcp.c')
-rw-r--r--plugins/imptcp/imptcp.c267
1 files changed, 202 insertions, 65 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 91bdf8b3..92383f90 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -83,7 +83,8 @@ DEFobjCurrIf(datetime)
DEFobjCurrIf(errmsg)
DEFobjCurrIf(ruleset)
-
+/* forward references */
+static void * wrkr(void *myself);
/* config settings */
typedef struct configSettings_s {
@@ -92,6 +93,7 @@ typedef struct configSettings_s {
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
uchar *lstnIP; /* which IP we should listen on? */
ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ int wrkrMax; /* max number of workers (actually "helper workers") */
} configSettings_t;
static configSettings_t cs;
@@ -117,6 +119,7 @@ struct ptcpsrv_s {
ruleset_t *pRuleset;
ptcplstn_t *pLstn; /* root of our listeners */
ptcpsess_t *pSess; /* root of our sessions */
+ pthread_mutex_t mutSessLst;
};
/* the ptcp session object. Describes a single active session.
@@ -154,6 +157,20 @@ struct ptcplstn_s {
};
+/* The following structure controls the worker threads. Global data is
+ * needed for their access.
+ */
+static struct wrkrInfo_s {
+ pthread_t tid; /* the worker's thread ID */
+ pthread_cond_t run;
+ struct epoll_event *event; /* event == NULL -> idle */
+ long long unsigned numCalled; /* how often was this called */
+} wrkrInfo[16];
+static pthread_mutex_t wrkrMut;
+static pthread_cond_t wrkrIdle;
+static int wrkrRunning;
+
+
/* type of object stored in epoll descriptor */
typedef enum {
epolld_lstn,
@@ -171,20 +188,10 @@ struct epolld_s {
/* global data */
-//static permittedPeers_t *pPermPeersRoot = NULL;
+pthread_attr_t wrkrThrdAttr; /* Attribute for session threads; read only after startup */
static ptcpsrv_t *pSrvRoot = NULL;
static int epollfd = -1; /* (sole) descriptor for epoll */
static int iMaxLine; /* maximum size of a single message */
-/* we use a single static receive buffer, as this module is not multi-threaded. Keeping
- * the buffer in the data segment is probably a little bit more efficient than on the stack
- * (but at least I can't believe it will ever be less efficient ;) -- rgerhards, 2010-08-10
- * Note that we do NOT (yet?) provide a config setting to set the buffer size. For usual
- * syslog traffic, it should be large enough. Also keep in mind that we run under a virtual
- * memory system, so if we do not use large parts of the buffer, that's no issue at
- * all -- it'll just use up address space. On the other hand, it would be silly to page in
- * or page out some data just to get space for the IO buffer.
- */
-static char rcvBuf[128*1024];
/* forward definitions */
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal);
@@ -209,6 +216,7 @@ static void
destructSrv(ptcpsrv_t *pSrv)
{
prop.Destruct(&pSrv->pInputName);
+ pthread_mutex_destroy(&pSrv->mutSessLst);
free(pSrv->port);
free(pSrv);
}
@@ -678,6 +686,7 @@ static inline void
initConfigSettings(void)
{
cs.bEmitMsgOnClose = 0;
+ cs.wrkrMax = 2;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
cs.pszInputName = NULL;
cs.pRuleset = NULL;
@@ -790,10 +799,12 @@ addSess(ptcpsrv_t *pSrv, int sock, prop_t *peerName, prop_t *peerIP)
/* add to start of server's listener list */
pSess->prev = NULL;
+ pthread_mutex_lock(&pSrv->mutSessLst);
pSess->next = pSrv->pSess;
if(pSrv->pSess != NULL)
pSrv->pSess->prev = pSess;
pSrv->pSess = pSess;
+ pthread_mutex_unlock(&pSrv->mutSessLst);
iRet = addEPollSock(epolld_sess, pSess, sock, &pSess->epd);
@@ -816,10 +827,8 @@ closeSess(ptcpsess_t *pSess)
CHKiRet(removeEPollSock(sock, pSess->epd));
close(sock);
+ pthread_mutex_lock(&pSess->pSrv->mutSessLst);
/* finally unlink session from structures */
-//fprintf(stderr, "closing session %d next %p, prev %p\n", pSess->sock, pSess->next, pSess->prev);
-//DBGPRINTF("imptcp: pSess->next %p\n", pSess->next);
-//DBGPRINTF("imptcp: pSess->prev %p\n", pSess->prev);
if(pSess->next != NULL)
pSess->next->prev = pSess->prev;
if(pSess->prev == NULL) {
@@ -828,6 +837,7 @@ closeSess(ptcpsess_t *pSess)
} else {
pSess->prev->next = pSess->next;
}
+ pthread_mutex_unlock(&pSess->pSrv->mutSessLst);
/* unlinked, now remove structure */
destructSess(pSess);
@@ -838,21 +848,6 @@ finalize_it:
}
-#if 0
-/* set permitted peer -- rgerhards, 2008-05-19
- */
-static rsRetVal
-setPermittedPeer(void __attribute__((unused)) *pVal, uchar *pszID)
-{
- DEFiRet;
- CHKiRet(net.AddPermittedPeer(&pPermPeersRoot, pszID));
- free(pszID); /* no longer needed, but we need to free as of interface def */
-finalize_it:
- RETiRet;
-}
-#endif
-
-
/* accept a new ruleset to bind. Checks if it exists and complains, if not */
static rsRetVal setRuleset(void __attribute__((unused)) *pVal, uchar *pszName)
{
@@ -880,6 +875,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
ptcpsrv_t *pSrv;
CHKmalloc(pSrv = malloc(sizeof(ptcpsrv_t)));
+ pthread_mutex_init(&pSrv->mutSessLst, NULL);
pSrv->pSess = NULL;
pSrv->pLstn = NULL;
pSrv->bEmitMsgOnClose = cs.bEmitMsgOnClose;
@@ -911,6 +907,46 @@ finalize_it:
}
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+startWorkerPool(void)
+{
+ int i;
+ wrkrRunning = 0;
+ if(cs.wrkrMax > 16)
+ cs.wrkrMax = 16; /* TODO: make dynamic? */
+ pthread_mutex_init(&wrkrMut, NULL);
+ pthread_cond_init(&wrkrIdle, NULL);
+ for(i = 0 ; i < cs.wrkrMax ; ++i) {
+ /* init worker info structure! */
+ pthread_cond_init(&wrkrInfo[i].run, NULL);
+ wrkrInfo[i].event = NULL;
+ wrkrInfo[i].numCalled = 0;
+ pthread_create(&wrkrInfo[i].tid, &wrkrThrdAttr, wrkr, &(wrkrInfo[i]));
+ }
+
+}
+
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+stopWorkerPool(void)
+{
+ int i;
+ for(i = 0 ; i < cs.wrkrMax ; ++i) {
+ pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */
+ pthread_join(wrkrInfo[i].tid, NULL);
+ DBGPRINTF("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
+ pthread_cond_destroy(&wrkrInfo[i].run);
+ }
+ pthread_cond_destroy(&wrkrIdle);
+ pthread_mutex_destroy(&wrkrMut);
+
+}
+
+
+
/* start up all listeners
* This is a one-time stop once the module is set to start.
*/
@@ -922,7 +958,7 @@ startupServers()
pSrv = pSrvRoot;
while(pSrv != NULL) {
- DBGPRINTF("Starting up ptcp server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName);
+ DBGPRINTF("imptcp: starting up server for port %s, name '%s'\n", pSrv->port, pSrv->pszInputName);
startupSrv(pSrv);
pSrv = pSrv->pNext;
}
@@ -944,9 +980,9 @@ lstnActivity(ptcplstn_t *pLstn)
DEFiRet;
DBGPRINTF("imptcp: new connection on listen socket %d\n", pLstn->sock);
- while(1) {
+ while(glbl.GetGlobalInputTermState() == 0) {
localRet = AcceptConnReq(pLstn->sock, &newSock, &peerName, &peerIP);
- if(localRet == RS_RET_NO_MORE_DATA)
+ if(localRet == RS_RET_NO_MORE_DATA || glbl.GetGlobalInputTermState() == 1)
break;
CHKiRet(localRet);
CHKiRet(addSess(pLstn->pSrv, newSock, peerName, peerIP));
@@ -965,6 +1001,7 @@ sessActivity(ptcpsess_t *pSess)
{
int lenRcv;
int lenBuf;
+ char rcvBuf[128*1024];
DEFiRet;
DBGPRINTF("imptcp: new activity on session socket %d\n", pSess->sock);
@@ -1002,35 +1039,127 @@ finalize_it:
}
+/* This function is called to process a single request. This may
+ * be carried out by the main worker or a helper. It can be run
+ * concurrently.
+ */
+static inline void
+processWorkItem(struct epoll_event *event)
+{
+ epolld_t *epd;
+
+ epd = (epolld_t*) event->data.ptr;
+ switch(epd->typ) {
+ case epolld_lstn:
+ lstnActivity((ptcplstn_t *) epd->ptr);
+ break;
+ case epolld_sess:
+ sessActivity((ptcpsess_t *) epd->ptr);
+ break;
+ default:
+ errmsg.LogError(0, RS_RET_INTERNAL_ERROR,
+ "error: invalid epolld_type_t %d after epoll", epd->typ);
+ break;
+ }
+}
+
+
+/* This function is called to process a complete workset, that
+ * is a set of events returned from epoll.
+ */
+static inline void
+processWorkSet(int nEvents, struct epoll_event events[])
+{
+ int iEvt;
+ int i;
+ int remainEvents;
+
+ remainEvents = nEvents;
+ for(iEvt = 0 ; (iEvt < nEvents) && (glbl.GetGlobalInputTermState() == 0) ; ++iEvt) {
+ if(remainEvents == 1) {
+ /* process self, save context switch */
+ processWorkItem(events+iEvt);
+ } else {
+ pthread_mutex_lock(&wrkrMut);
+ /* check if there is a free worker */
+ for(i = 0 ; (i < cs.wrkrMax) && (wrkrInfo[i].event != NULL) ; ++i)
+ /*do search*/;
+ if(i < cs.wrkrMax) {
+ /* worker free -> use it! */
+ wrkrInfo[i].event = events+iEvt;
+ ++wrkrRunning;
+ pthread_cond_signal(&wrkrInfo[i].run);
+ pthread_mutex_unlock(&wrkrMut);
+ } else {
+ pthread_mutex_unlock(&wrkrMut);
+ /* no free worker, so we process this one ourselfs */
+ processWorkItem(events+iEvt);
+ }
+ }
+ --remainEvents;
+ }
+
+ if(nEvents > 1) {
+ /* we now need to wait until all workers finish. This is because the
+ * rest of this module can not handle the concurrency introduced
+ * by workers running during the epoll call.
+ */
+ pthread_mutex_lock(&wrkrMut);
+ while(wrkrRunning > 0) {
+ pthread_cond_wait(&wrkrIdle, &wrkrMut);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+ }
+
+}
+
+
+/* worker to process incoming requests
+ */
+static void *
+wrkr(void *myself)
+{
+ struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself;
+
+ pthread_mutex_lock(&wrkrMut);
+ while(1) {
+ while(me->event == NULL && glbl.GetGlobalInputTermState() == 0) {
+ pthread_cond_wait(&me->run, &wrkrMut);
+ }
+ if(glbl.GetGlobalInputTermState() == 1)
+ break;
+ pthread_mutex_unlock(&wrkrMut);
+
+ ++me->numCalled;
+ processWorkItem(me->event);
+
+ pthread_mutex_lock(&wrkrMut);
+ me->event = NULL; /* indicate we are free again */
+ --wrkrRunning;
+ pthread_cond_signal(&wrkrIdle);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+
+ return NULL;
+}
+
+
/* This function is called to gather input.
*/
BEGINrunInput
- int i;
- int nfds;
- struct epoll_event events[1];
- epolld_t *epd;
+ int nEvents;
+ struct epoll_event events[128];
CODESTARTrunInput
- DBGPRINTF("imptcp now beginning to process input data\n");
- /* v5 TODO: consentual termination mode */
- while(1) {
+ startWorkerPool();
+ DBGPRINTF("imptcp: now beginning to process input data\n");
+ while(glbl.GetGlobalInputTermState() == 0) {
DBGPRINTF("imptcp going on epoll_wait\n");
- nfds = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1);
- for(i = 0 ; i < nfds ; ++i) { /* support for larger batches (later, TODO) */
- epd = (epolld_t*) events[i].data.ptr;
- switch(epd->typ) {
- case epolld_lstn:
- lstnActivity((ptcplstn_t *) epd->ptr);
- break;
- case epolld_sess:
- sessActivity((ptcpsess_t *) epd->ptr);
- break;
- default:
- errmsg.LogError(0, RS_RET_INTERNAL_ERROR,
- "error: invalid epolld_type_t %d after epoll", epd->typ);
- break;
- }
- }
+ nEvents = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1);
+ DBGPRINTF("imptcp: epoll returned %d events\n", nEvents);
+ processWorkSet(nEvents, events);
}
+ DBGPRINTF("imptcp: successfully terminated\n");
+ /* we stop the worker pool in AfterRun, in case we get cancelled for some reason (old Interface) */
ENDrunInput
@@ -1038,7 +1167,6 @@ ENDrunInput
BEGINwillRun
CODESTARTwillRun
/* first apply some config settings */
- //net.PrintAllowedSenders(2); /* TCP */
iMaxLine = glbl.GetMaxLine(); /* get maximum size we currently support */
if(pSrvRoot == NULL) {
@@ -1107,8 +1235,8 @@ shutdownSrv(ptcpsrv_t *pSrv)
BEGINafterRun
ptcpsrv_t *pSrv, *srvDel;
CODESTARTafterRun
- /* do cleanup here */
- //net.clearAllowedSenders(UCHAR_CONSTANT("TCP"));
+ stopWorkerPool();
+
/* we need to close everything that is still open */
pSrv = pSrvRoot;
while(pSrv != NULL) {
@@ -1124,12 +1252,7 @@ ENDafterRun
BEGINmodExit
CODESTARTmodExit
-#if 0
- if(pPermPeersRoot != NULL) {
- net.DestructPermittedPeers(&pPermPeersRoot);
- }
-#endif
-
+ pthread_attr_destroy(&wrkrThrdAttr);
/* release objects we used */
objRelease(glbl, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
@@ -1144,6 +1267,7 @@ static rsRetVal
resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
cs.bEmitMsgOnClose = 0;
+ cs.wrkrMax = 2;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
free(cs.pszInputName);
cs.pszInputName = NULL;
@@ -1153,10 +1277,17 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus
}
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
@@ -1173,6 +1304,10 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
+ /* initialize "read-only" thread attributes */
+ pthread_attr_init(&wrkrThrdAttr);
+ pthread_attr_setstacksize(&wrkrThrdAttr, 2048*1024);
+
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord,
addTCPListener, NULL, STD_LOADABLE_MODULE_ID));
@@ -1180,6 +1315,8 @@ CODEmodInit_QueryRegCFSLineHdlr
eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt,
NULL, &cs.iAddtlFrameDelim, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverhelperthreads"), 0, eCmdHdlrInt,
+ NULL, &cs.wrkrMax, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverinputname"), 0,
eCmdHdlrGetWord, NULL, &cs.pszInputName, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverlistenip"), 0,