summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2007-12-14 16:51:34 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2007-12-14 16:51:34 +0000
commit6c0c26dc96544aa5814d00045b3d559c99fc1b2e (patch)
tree20d558391ba1d881ab3b27f7fbcc636dcfbc67ad
parent6a80d9ee504b57e2b815c91698785d4fcd06f62e (diff)
downloadrsyslog-6c0c26dc96544aa5814d00045b3d559c99fc1b2e.tar.gz
rsyslog-6c0c26dc96544aa5814d00045b3d559c99fc1b2e.tar.xz
rsyslog-6c0c26dc96544aa5814d00045b3d559c99fc1b2e.zip
on the way to a real input module interface and threading class...
-rw-r--r--module-template.h19
-rw-r--r--modules.c12
-rw-r--r--modules.h5
-rw-r--r--plugins/immark/immark.c16
-rw-r--r--syslogd.c39
-rw-r--r--threads.c96
-rw-r--r--threads.h15
7 files changed, 189 insertions, 13 deletions
diff --git a/module-template.h b/module-template.h
index d54f5f83..4bce7f8d 100644
--- a/module-template.h
+++ b/module-template.h
@@ -28,6 +28,7 @@
#include "modules.h"
#include "objomsr.h"
+#include "threads.h"
/* macro to define standard output-module static data members
*/
@@ -445,6 +446,24 @@ static rsRetVal modExit(void)\
return iRet;\
}
+
+/* runInput()
+ * This is the main function for input modules. It is used to gather data from the
+ * input source and submit it to the message queue. Each runInput() instance has its own
+ * thread. This is handled by the rsyslog engine. It needs to spawn off new threads only
+ * if there is a module-internal need to do so.
+ */
+#define BEGINrunInput \
+static rsRetVal runInput(void)\
+{\
+ DEFiRet;
+
+#define CODESTARTrunInput
+
+#define ENDrunInput \
+ return iRet;\
+}
+
/*
* vi:set ai:
*/
diff --git a/modules.c b/modules.c
index b7ca71eb..6869ede6 100644
--- a/modules.c
+++ b/modules.c
@@ -160,13 +160,19 @@ modInfo_t *modGetNxt(modInfo_t *pThis)
/* this function is like modGetNxt(), but it returns pointers to
- * output modules only. As we currently deal just with output modules,
+ * modules of specific type only. As we currently deal just with output modules,
* it is a dummy, to be filled with real code later.
* rgerhards, 2007-07-24
*/
-modInfo_t *omodGetNxt(modInfo_t *pThis)
+modInfo_t *modGetNxtType(modInfo_t *pThis, eModType_t rqtdType)
{
- return(modGetNxt(pThis));
+ modInfo_t *pMod = pThis;
+
+ do {
+ pMod = modGetNxt(pMod);
+ } while(!(pMod == NULL || pMod->eType == rqtdType)); /* warning: do ... while() */
+
+ return pMod;
}
diff --git a/modules.h b/modules.h
index 5b745047..914a65fd 100644
--- a/modules.h
+++ b/modules.h
@@ -73,6 +73,9 @@ typedef struct moduleInfo {
* can allocate instance memory in this call.
*/
rsRetVal (*createInstance)();
+ /* input module specific members */
+ /* TODO: do a union with members, pass pointer to msg submit function to IM rger, 2007-12-14 */
+ rsRetVal (*runInput)(void); /* function to gather input and submit to queue */
union {
struct {/* data for input modules */
/* input modules come after output modules are finished, I am
@@ -91,7 +94,7 @@ typedef struct moduleInfo {
/* prototypes */
rsRetVal doModInit(rsRetVal (*modInit)(), uchar *name, void *pModHdlr);
-modInfo_t *omodGetNxt(modInfo_t *pThis);
+modInfo_t *modGetNxtType(modInfo_t *pThis, eModType_t rqtdType);
uchar *modGetName(modInfo_t *pThis);
uchar *modGetStateName(modInfo_t *pThis);
void modPrintList(void);
diff --git a/plugins/immark/immark.c b/plugins/immark/immark.c
index d70069c5..a43ea61e 100644
--- a/plugins/immark/immark.c
+++ b/plugins/immark/immark.c
@@ -39,10 +39,16 @@
#include <pthread.h>
#include "rsyslog.h"
#include "syslogd.h"
+#include "cfsysline.h"
#include "module-template.h"
MODULE_TYPE_INPUT
+/* Module static data */
+/* TODO: this needs a lot of work ;) */
+DEF_OMOD_STATIC_DATA
+static int bDoMarkMessages = 1;
+
typedef struct _instanceData {
} instanceData;
@@ -58,9 +64,8 @@ typedef struct _instanceData {
* (and pre 1.20.2 releases of rsyslog) did in mark procesing. They simply
* do not belong here.
*/
-rsRetVal
-immark_runInput(void)
-{
+BEGINrunInput
+CODESTARTrunInput
struct timeval tvSelectTimeout;
sigset_t sigSet;
sigfillset(&sigSet);
@@ -81,7 +86,8 @@ dbgprintf("immark post select, doing mark, bFinished: %d\n", bFinished);
}
fprintf(stderr, "immark: finished!\n");
return RS_RET_OK;
-}
+ENDrunInput
+
BEGINfreeInstance
CODESTARTfreeInstance
@@ -107,6 +113,8 @@ ENDqueryEtryPt
BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = 1; /* so far, we only support the initial definition */
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"markmessages", 0, eCmdHdlrBinary, NULL, &bDoMarkMessages, STD_LOADABLE_MODULE_ID));
ENDmodInit
#endif /* #if 0 */
/*
diff --git a/syslogd.c b/syslogd.c
index 1a09ed3b..1da475e5 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -4227,10 +4227,34 @@ finalize_it:
}
+/* Start the input modules. This function will probably undergo big changes
+ * while we implement the input module interface. For now, it does the most
+ * important thing to get at least my poor initial input modules up and
+ * running. Almost no config option is taken.
+ * rgerhards, 2007-12-14
+ */
+static rsRetVal
+startInputModules(void)
+{
+ DEFiRet;
+ modInfo_t *pMod;
+
+ /* loop through all modules and activate them (brr...) */
+ pMod = modGetNxtType(NULL, eMOD_IN);
+ while(pMod != NULL) {
+ /* activate here */
+ pMod = modGetNxtType(pMod, eMOD_IN);
+ }
+
+ return iRet;
+}
+
+
/* INIT -- Initialize syslogd from configuration table
* init() is called at initial startup AND each time syslogd is HUPed
*/
-static void init(void)
+static void
+init(void)
{
DEFiRet;
register int i;
@@ -4422,6 +4446,13 @@ static void init(void)
Initialized = 1;
+ /* the output part and the queue is now ready to run. So it is a good time
+ * now to start the inputs. Please note that the net code above should be
+ * shuffled to down here once we have everything in input modules.
+ * rgerhards, 2007-12-14
+ */
+ startInputModules();
+
if(Debug) {
dbgPrintInitInfo();
}
@@ -4449,7 +4480,7 @@ static void init(void)
sigAct.sa_handler = sighup_handler;
sigaction(SIGHUP, &sigAct, NULL);
- dbgprintf(" restarted.\n");
+ dbgprintf(" (re)started.\n");
}
@@ -5058,7 +5089,7 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction)
assert(ppAction != NULL);
/* loop through all modules and see if one picks up the line */
- pMod = omodGetNxt(NULL);
+ pMod = modGetNxtType(NULL, eMOD_OUT);
while(pMod != NULL) {
iRet = pMod->mod.om.parseSelectorAct(p, &pModData, &pOMSR);
dbgprintf("tried selector action for %s: %d\n", modGetName(pMod), iRet);
@@ -5085,7 +5116,7 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction)
dbgprintf("error %d parsing config line\n", (int) iRet);
break;
}
- pMod = omodGetNxt(pMod);
+ pMod = modGetNxtType(pMod, eMOD_OUT);
}
*ppAction = pAction;
diff --git a/threads.c b/threads.c
index c3aee4b8..d5168d02 100644
--- a/threads.c
+++ b/threads.c
@@ -27,17 +27,111 @@
#include "config.h"
#include "rsyslog.h"
-#include <pthread.h>
#include <stdlib.h>
#include <string.h>
+#include <signal.h>
+#include <pthread.h>
#include <assert.h>
#include "syslogd.h"
+#include "linkedlist.h"
#include "threads.h"
+/* static data */
int iMainMsgQueueSize;
msgQueue *pMsgQueue = NULL;
+/* linked list of currently-known threads */
+static linkedList_t llThrds;
+
+/* methods */
+
+/* Construct a new thread object
+ */
+static rsRetVal thrdConstruct(thrdInfo_t **pThis)
+{
+ thrdInfo_t *pNew;
+
+ if((pNew = calloc(1, sizeof(thrdInfo_t))) == NULL)
+ return RS_RET_OUT_OF_MEMORY;
+
+ /* OK, we got the element, now initialize members that should
+ * not be zero-filled.
+ */
+
+ *pThis = pNew;
+ return RS_RET_OK;
+}
+
+
+/* Destructs a thread object. The object must not be linked to the
+ * linked list of threads. Please note that the thread should have been
+ * stopped before. If not, we try to do it.
+ */
+static rsRetVal thrdDestruct(thrdInfo_t *pThis)
+{
+ assert(pThis != NULL);
+
+ if(pThis->bIsActive == 1) {
+ thrdTerminate(pThis);
+ }
+ free(pThis);
+
+ return RS_RET_OK;
+}
+
+
+/* terminate a thread gracefully. It's termination sync state is taken into
+ * account.
+ */
+rsRetVal thrdTerminate(thrdInfo_t *pThis)
+{
+ assert(pThis != NULL);
+
+ if(pThis->eTermTool == eTermSync_SIGNAL) {
+ pthread_kill(pThis->thrdID, SIGUSR2);
+ pthread_join(pThis->thrdID, NULL);
+ /* TODO: TIMEOUT! */
+ } else if(pThis->eTermTool == eTermSync_NONE) {
+ pthread_cancel(pThis->thrdID);
+ }
+ pThis->bIsActive = 0;
+
+ return RS_RET_OK;
+}
+
+
+/* initialize the thread-support subsystem
+ * must be called once at the start of the program
+ */
+rsRetVal thrdInit(void)
+{
+ DEFiRet;
+
+ iRet = llInit(&llThrds, thrdDestruct, NULL, NULL);
+
+ return iRet;
+}
+
+
+/* de-initialize the thread subsystem
+ * must be called once at the end of the program
+ */
+rsRetVal thrdExit(void)
+{
+ DEFiRet;
+
+ iRet = llDestroy(&llThrds);
+
+ return iRet;
+}
+
+
+
+/* queue functions (may be migrated to some other file...)
+ */
+
+
msgQueue *queueInit (void)
{
msgQueue *q;
diff --git a/threads.h b/threads.h
index 5f3faab7..dd2b17f7 100644
--- a/threads.h
+++ b/threads.h
@@ -23,6 +23,20 @@
#ifndef THREADS_H_INCLUDED
#define THREADS_H_INCLUDED
+
+/* type of sync tools for terminating the thread */
+typedef enum eTermSyncType {
+ eTermSync_NONE = 0, /* no cleanup necessary, just cancel thread */
+ eTermSync_SIGNAL /* termination via pthread_kill() */
+} eTermSyncType_t;
+
+/* the thread object */
+typedef struct thrdInfo {
+ eTermSyncType_t eTermTool;
+ int bIsActive; /* Is thread running? */
+ pthread_t thrdID;
+} thrdInfo_t;
+
/* this is the first approach to a queue, this time with static
* memory.
*/
@@ -35,6 +49,7 @@ typedef struct {
} msgQueue;
/* prototypes */
+rsRetVal thrdTerminate(thrdInfo_t *pThis);
msgQueue *queueInit (void);
void queueDelete (msgQueue *q);
void queueAdd (msgQueue *q, void* in);