summaryrefslogtreecommitdiffstats
path: root/plugins/imptcp/imptcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imptcp/imptcp.c')
-rw-r--r--plugins/imptcp/imptcp.c229
1 files changed, 182 insertions, 47 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 33bfa81c..6e3a67ab 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -10,7 +10,7 @@
*
* File begun on 2010-08-10 by RGerhards
*
- * Copyright 2007-2010 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -83,7 +83,8 @@ DEFobjCurrIf(datetime)
DEFobjCurrIf(errmsg)
DEFobjCurrIf(ruleset)
-
+/* forward references */
+static void * wrkr(void *myself);
/* config settings */
typedef struct configSettings_s {
@@ -154,6 +155,21 @@ struct ptcplstn_s {
};
+/* The following structure controls the worker threads. Global data is
+ * needed for their access.
+ */
+static struct wrkrInfo_s {
+ pthread_t tid; /* the worker's thread ID */
+ pthread_cond_t run;
+ struct epoll_event *event; /* event == NULL -> idle */
+ long long unsigned numCalled; /* how often was this called */
+} wrkrInfo[4];
+static pthread_mutex_t wrkrMut;
+static pthread_cond_t wrkrIdle;
+static int wrkrMax = 4;
+static int wrkrRunning;
+
+
/* type of object stored in epoll descriptor */
typedef enum {
epolld_lstn,
@@ -175,16 +191,6 @@ struct epolld_s {
static ptcpsrv_t *pSrvRoot = NULL;
static int epollfd = -1; /* (sole) descriptor for epoll */
static int iMaxLine; /* maximum size of a single message */
-/* we use a single static receive buffer, as this module is not multi-threaded. Keeping
- * the buffer in the data segment is probably a little bit more efficient than on the stack
- * (but at least I can't believe it will ever be less efficient ;) -- rgerhards, 2010-08-10
- * Note that we do NOT (yet?) provide a config setting to set the buffer size. For usual
- * syslog traffic, it should be large enough. Also keep in mind that we run under a virtual
- * memory system, so if we do not use large parts of the buffer, that's no issue at
- * all -- it'll just use up address space. On the other hand, it would be silly to page in
- * or page out some data just to get space for the IO buffer.
- */
-static char rcvBuf[128*1024];
/* forward definitions */
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal);
@@ -838,21 +844,6 @@ finalize_it:
}
-#if 0
-/* set permitted peer -- rgerhards, 2008-05-19
- */
-static rsRetVal
-setPermittedPeer(void __attribute__((unused)) *pVal, uchar *pszID)
-{
- DEFiRet;
- CHKiRet(net.AddPermittedPeer(&pPermPeersRoot, pszID));
- free(pszID); /* no longer needed, but we need to free as of interface def */
-finalize_it:
- RETiRet;
-}
-#endif
-
-
/* accept a new ruleset to bind. Checks if it exists and complains, if not */
static rsRetVal setRuleset(void __attribute__((unused)) *pVal, uchar *pszName)
{
@@ -911,6 +902,45 @@ finalize_it:
}
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+startWorkerPool(void)
+{
+ int i;
+ wrkrRunning = 0;
+ pthread_mutex_init(&wrkrMut, NULL);
+ pthread_cond_init(&wrkrIdle, NULL);
+ for(i = 0 ; i < wrkrMax ; ++i) {
+ /* init worker info structure! */
+ pthread_cond_init(&wrkrInfo[i].run, NULL);
+ wrkrInfo[i].event = NULL;
+ wrkrInfo[i].numCalled = 0;
+ pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i]));
+ }
+
+}
+
+/* destroy worker pool structures and wait for workers to terminate
+ */
+static inline void
+stopWorkerPool(void)
+{
+ int i;
+ for(i = 0 ; i < wrkrMax ; ++i) {
+ pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */
+ pthread_join(wrkrInfo[i].tid, NULL);
+ DBGPRINTF("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
+ printf("imptcp: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
+ pthread_cond_destroy(&wrkrInfo[i].run);
+ }
+ pthread_cond_destroy(&wrkrIdle);
+ pthread_mutex_destroy(&wrkrMut);
+
+}
+
+
+
/* start up all listeners
* This is a one-time stop once the module is set to start.
*/
@@ -965,6 +995,7 @@ sessActivity(ptcpsess_t *pSess)
{
int lenRcv;
int lenBuf;
+ char rcvBuf[128*1024];
DEFiRet;
DBGPRINTF("imptcp: new activity on session socket %d\n", pSess->sock);
@@ -1002,36 +1033,138 @@ finalize_it:
}
+/* This function is called to process a single request. This may
+ * be carried out by the main worker or a helper. It can be run
+ * concurrently.
+ */
+static inline void
+processWorkItem(struct epoll_event *event)
+{
+ epolld_t *epd;
+
+ epd = (epolld_t*) event->data.ptr;
+ switch(epd->typ) {
+ case epolld_lstn:
+ lstnActivity((ptcplstn_t *) epd->ptr);
+ break;
+ case epolld_sess:
+ sessActivity((ptcpsess_t *) epd->ptr);
+ break;
+ default:
+ errmsg.LogError(0, RS_RET_INTERNAL_ERROR,
+ "error: invalid epolld_type_t %d after epoll", epd->typ);
+ break;
+ }
+}
+
+
+/* This function is called to process a complete workset, that
+ * is a set of events returned from epoll.
+ */
+static inline void
+processWorkSet(int nEvents, struct epoll_event events[])
+{
+ int iEvt;
+ int i;
+ int remainEvents;
+
+ remainEvents = nEvents;
+ for(iEvt = 0 ; (iEvt < nEvents) && (glbl.GetGlobalInputTermState() == 0) ; ++iEvt) {
+dbgprintf("XXX: remain entries during processing %d\n", remainEvents);
+ if(remainEvents == 1) {
+dbgprintf("XXX: processWorkset 1\n");
+ /* process self, save context switch */
+ processWorkItem(events+iEvt);
+ } else {
+dbgprintf("XXX: processWorkset 2\n");
+ pthread_mutex_lock(&wrkrMut);
+ /* check if there is a free worker */
+ for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].event != NULL) ; ++i)
+dbgprintf("XXX: wrkrinfo[%d].event %p\n", i, wrkrInfo[i].event);
+ /*do search*/;
+ if(i < wrkrMax) {
+dbgprintf("XXX: processWorkset 2.1, event=%p, wrkrRunnig %d, max %d\n", events+iEvt, wrkrRunning, wrkrMax);
+ /* worker free -> use it! */
+ wrkrInfo[i].event = events+iEvt;
+ pthread_cond_signal(&wrkrInfo[i].run);
+ pthread_mutex_unlock(&wrkrMut);
+ } else {
+dbgprintf("XXX: processWorkset 2.2\n");
+ pthread_mutex_unlock(&wrkrMut);
+ /* no free worker, so we process this one ourselfs */
+ processWorkItem(events+iEvt);
+ }
+ }
+ --remainEvents;
+ }
+
+ if(nEvents > 1) {
+ /* we now need to wait until all workers finish. This is because the
+ * rest of this module can not handle the concurrency introduced
+ * by workers running during the epoll call.
+ */
+ pthread_mutex_lock(&wrkrMut);
+dbgprintf("XXX: processWorkset: waiting for all workers to idle, curr = %d\n", wrkrRunning);
+ while(wrkrRunning > 0) {
+dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, curr = %d\n", wrkrRunning);
+ pthread_cond_wait(&wrkrIdle, &wrkrMut);
+dbgprintf("XXX: processWorkset: loop-waiting for all workers to idle, after wait, curr = %d\n", wrkrRunning);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+ }
+
+}
+
+
+/* worker to process incoming requests
+ */
+static void *
+wrkr(void *myself)
+{
+ struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself;
+
+ pthread_mutex_lock(&wrkrMut);
+ while(1) {
+ while(me->event == NULL && glbl.GetGlobalInputTermState() == 0) {
+ pthread_cond_wait(&me->run, &wrkrMut);
+ }
+ if(glbl.GetGlobalInputTermState() == 1)
+ break;
+ ++wrkrRunning;
+ pthread_mutex_unlock(&wrkrMut);
+
+dbgprintf("XXX: worker %p activated\n", pthread_self());
+ ++me->numCalled;
+ processWorkItem(me->event);
+
+ pthread_mutex_lock(&wrkrMut);
+ me->event = NULL; /* indicate we are free again */
+ --wrkrRunning;
+ pthread_cond_signal(&wrkrIdle);
+dbgprintf("XXX: worker %p idling, currently running %d\n", pthread_self(), wrkrRunning);
+ }
+ pthread_mutex_unlock(&wrkrMut);
+
+ return NULL;
+}
+
+
/* This function is called to gather input.
*/
BEGINrunInput
- int i;
- int nfds;
+ int nEvents;
struct epoll_event events[128];
- epolld_t *epd;
CODESTARTrunInput
+ startWorkerPool();
DBGPRINTF("imptcp: now beginning to process input data\n");
while(glbl.GetGlobalInputTermState() == 0) {
DBGPRINTF("imptcp going on epoll_wait\n");
- nfds = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1);
- DBGPRINTF("imptcp: epoll returned %d events\n", nfds);
- for(i = 0 ; (i < nfds) && (glbl.GetGlobalInputTermState() == 0) ; ++i) { /* support for larger batches (later, TODO) */
- epd = (epolld_t*) events[i].data.ptr;
- switch(epd->typ) {
- case epolld_lstn:
- lstnActivity((ptcplstn_t *) epd->ptr);
- break;
- case epolld_sess:
- sessActivity((ptcpsess_t *) epd->ptr);
- break;
- default:
- errmsg.LogError(0, RS_RET_INTERNAL_ERROR,
- "error: invalid epolld_type_t %d after epoll", epd->typ);
- break;
- }
- }
+ nEvents = epoll_wait(epollfd, events, sizeof(events)/sizeof(struct epoll_event), -1);
+ DBGPRINTF("imptcp: epoll returned %d events\n", nEvents);
+ processWorkSet(nEvents, events);
}
DBGPRINTF("imptcp: successfully terminated\n");
+ /* we stop the worker pool in AfterRun, in case we get cancelled for some reason (old Interface) */
ENDrunInput
@@ -1104,6 +1237,8 @@ shutdownSrv(ptcpsrv_t *pSrv)
BEGINafterRun
ptcpsrv_t *pSrv, *srvDel;
CODESTARTafterRun
+ stopWorkerPool();
+
/* we need to close everything that is still open */
pSrv = pSrvRoot;
while(pSrv != NULL) {