diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2007-12-14 16:51:34 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2007-12-14 16:51:34 +0000 |
commit | 6c0c26dc96544aa5814d00045b3d559c99fc1b2e (patch) | |
tree | 20d558391ba1d881ab3b27f7fbcc636dcfbc67ad | |
parent | 6a80d9ee504b57e2b815c91698785d4fcd06f62e (diff) | |
download | rsyslog-6c0c26dc96544aa5814d00045b3d559c99fc1b2e.tar.gz rsyslog-6c0c26dc96544aa5814d00045b3d559c99fc1b2e.tar.xz rsyslog-6c0c26dc96544aa5814d00045b3d559c99fc1b2e.zip |
on the way to a real input module interface and threading class...
-rw-r--r-- | module-template.h | 19 | ||||
-rw-r--r-- | modules.c | 12 | ||||
-rw-r--r-- | modules.h | 5 | ||||
-rw-r--r-- | plugins/immark/immark.c | 16 | ||||
-rw-r--r-- | syslogd.c | 39 | ||||
-rw-r--r-- | threads.c | 96 | ||||
-rw-r--r-- | threads.h | 15 |
7 files changed, 189 insertions, 13 deletions
diff --git a/module-template.h b/module-template.h index d54f5f83..4bce7f8d 100644 --- a/module-template.h +++ b/module-template.h @@ -28,6 +28,7 @@ #include "modules.h" #include "objomsr.h" +#include "threads.h" /* macro to define standard output-module static data members */ @@ -445,6 +446,24 @@ static rsRetVal modExit(void)\ return iRet;\ } + +/* runInput() + * This is the main function for input modules. It is used to gather data from the + * input source and submit it to the message queue. Each runInput() instance has its own + * thread. This is handled by the rsyslog engine. It needs to spawn off new threads only + * if there is a module-internal need to do so. + */ +#define BEGINrunInput \ +static rsRetVal runInput(void)\ +{\ + DEFiRet; + +#define CODESTARTrunInput + +#define ENDrunInput \ + return iRet;\ +} + /* * vi:set ai: */ @@ -160,13 +160,19 @@ modInfo_t *modGetNxt(modInfo_t *pThis) /* this function is like modGetNxt(), but it returns pointers to - * output modules only. As we currently deal just with output modules, + * modules of specific type only. As we currently deal just with output modules, * it is a dummy, to be filled with real code later. * rgerhards, 2007-07-24 */ -modInfo_t *omodGetNxt(modInfo_t *pThis) +modInfo_t *modGetNxtType(modInfo_t *pThis, eModType_t rqtdType) { - return(modGetNxt(pThis)); + modInfo_t *pMod = pThis; + + do { + pMod = modGetNxt(pMod); + } while(!(pMod == NULL || pMod->eType == rqtdType)); /* warning: do ... while() */ + + return pMod; } @@ -73,6 +73,9 @@ typedef struct moduleInfo { * can allocate instance memory in this call. */ rsRetVal (*createInstance)(); + /* input module specific members */ + /* TODO: do a union with members, pass pointer to msg submit function to IM rger, 2007-12-14 */ + rsRetVal (*runInput)(void); /* function to gather input and submit to queue */ union { struct {/* data for input modules */ /* input modules come after output modules are finished, I am @@ -91,7 +94,7 @@ typedef struct moduleInfo { /* prototypes */ rsRetVal doModInit(rsRetVal (*modInit)(), uchar *name, void *pModHdlr); -modInfo_t *omodGetNxt(modInfo_t *pThis); +modInfo_t *modGetNxtType(modInfo_t *pThis, eModType_t rqtdType); uchar *modGetName(modInfo_t *pThis); uchar *modGetStateName(modInfo_t *pThis); void modPrintList(void); diff --git a/plugins/immark/immark.c b/plugins/immark/immark.c index d70069c5..a43ea61e 100644 --- a/plugins/immark/immark.c +++ b/plugins/immark/immark.c @@ -39,10 +39,16 @@ #include <pthread.h> #include "rsyslog.h" #include "syslogd.h" +#include "cfsysline.h" #include "module-template.h" MODULE_TYPE_INPUT +/* Module static data */ +/* TODO: this needs a lot of work ;) */ +DEF_OMOD_STATIC_DATA +static int bDoMarkMessages = 1; + typedef struct _instanceData { } instanceData; @@ -58,9 +64,8 @@ typedef struct _instanceData { * (and pre 1.20.2 releases of rsyslog) did in mark procesing. They simply * do not belong here. */ -rsRetVal -immark_runInput(void) -{ +BEGINrunInput +CODESTARTrunInput struct timeval tvSelectTimeout; sigset_t sigSet; sigfillset(&sigSet); @@ -81,7 +86,8 @@ dbgprintf("immark post select, doing mark, bFinished: %d\n", bFinished); } fprintf(stderr, "immark: finished!\n"); return RS_RET_OK; -} +ENDrunInput + BEGINfreeInstance CODESTARTfreeInstance @@ -107,6 +113,8 @@ ENDqueryEtryPt BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = 1; /* so far, we only support the initial definition */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(omsdRegCFSLineHdlr((uchar *)"markmessages", 0, eCmdHdlrBinary, NULL, &bDoMarkMessages, STD_LOADABLE_MODULE_ID)); ENDmodInit #endif /* #if 0 */ /* @@ -4227,10 +4227,34 @@ finalize_it: } +/* Start the input modules. This function will probably undergo big changes + * while we implement the input module interface. For now, it does the most + * important thing to get at least my poor initial input modules up and + * running. Almost no config option is taken. + * rgerhards, 2007-12-14 + */ +static rsRetVal +startInputModules(void) +{ + DEFiRet; + modInfo_t *pMod; + + /* loop through all modules and activate them (brr...) */ + pMod = modGetNxtType(NULL, eMOD_IN); + while(pMod != NULL) { + /* activate here */ + pMod = modGetNxtType(pMod, eMOD_IN); + } + + return iRet; +} + + /* INIT -- Initialize syslogd from configuration table * init() is called at initial startup AND each time syslogd is HUPed */ -static void init(void) +static void +init(void) { DEFiRet; register int i; @@ -4422,6 +4446,13 @@ static void init(void) Initialized = 1; + /* the output part and the queue is now ready to run. So it is a good time + * now to start the inputs. Please note that the net code above should be + * shuffled to down here once we have everything in input modules. + * rgerhards, 2007-12-14 + */ + startInputModules(); + if(Debug) { dbgPrintInitInfo(); } @@ -4449,7 +4480,7 @@ static void init(void) sigAct.sa_handler = sighup_handler; sigaction(SIGHUP, &sigAct, NULL); - dbgprintf(" restarted.\n"); + dbgprintf(" (re)started.\n"); } @@ -5058,7 +5089,7 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction) assert(ppAction != NULL); /* loop through all modules and see if one picks up the line */ - pMod = omodGetNxt(NULL); + pMod = modGetNxtType(NULL, eMOD_OUT); while(pMod != NULL) { iRet = pMod->mod.om.parseSelectorAct(p, &pModData, &pOMSR); dbgprintf("tried selector action for %s: %d\n", modGetName(pMod), iRet); @@ -5085,7 +5116,7 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction) dbgprintf("error %d parsing config line\n", (int) iRet); break; } - pMod = omodGetNxt(pMod); + pMod = modGetNxtType(pMod, eMOD_OUT); } *ppAction = pAction; @@ -27,17 +27,111 @@ #include "config.h" #include "rsyslog.h" -#include <pthread.h> #include <stdlib.h> #include <string.h> +#include <signal.h> +#include <pthread.h> #include <assert.h> #include "syslogd.h" +#include "linkedlist.h" #include "threads.h" +/* static data */ int iMainMsgQueueSize; msgQueue *pMsgQueue = NULL; +/* linked list of currently-known threads */ +static linkedList_t llThrds; + +/* methods */ + +/* Construct a new thread object + */ +static rsRetVal thrdConstruct(thrdInfo_t **pThis) +{ + thrdInfo_t *pNew; + + if((pNew = calloc(1, sizeof(thrdInfo_t))) == NULL) + return RS_RET_OUT_OF_MEMORY; + + /* OK, we got the element, now initialize members that should + * not be zero-filled. + */ + + *pThis = pNew; + return RS_RET_OK; +} + + +/* Destructs a thread object. The object must not be linked to the + * linked list of threads. Please note that the thread should have been + * stopped before. If not, we try to do it. + */ +static rsRetVal thrdDestruct(thrdInfo_t *pThis) +{ + assert(pThis != NULL); + + if(pThis->bIsActive == 1) { + thrdTerminate(pThis); + } + free(pThis); + + return RS_RET_OK; +} + + +/* terminate a thread gracefully. It's termination sync state is taken into + * account. + */ +rsRetVal thrdTerminate(thrdInfo_t *pThis) +{ + assert(pThis != NULL); + + if(pThis->eTermTool == eTermSync_SIGNAL) { + pthread_kill(pThis->thrdID, SIGUSR2); + pthread_join(pThis->thrdID, NULL); + /* TODO: TIMEOUT! */ + } else if(pThis->eTermTool == eTermSync_NONE) { + pthread_cancel(pThis->thrdID); + } + pThis->bIsActive = 0; + + return RS_RET_OK; +} + + +/* initialize the thread-support subsystem + * must be called once at the start of the program + */ +rsRetVal thrdInit(void) +{ + DEFiRet; + + iRet = llInit(&llThrds, thrdDestruct, NULL, NULL); + + return iRet; +} + + +/* de-initialize the thread subsystem + * must be called once at the end of the program + */ +rsRetVal thrdExit(void) +{ + DEFiRet; + + iRet = llDestroy(&llThrds); + + return iRet; +} + + + +/* queue functions (may be migrated to some other file...) + */ + + msgQueue *queueInit (void) { msgQueue *q; @@ -23,6 +23,20 @@ #ifndef THREADS_H_INCLUDED #define THREADS_H_INCLUDED + +/* type of sync tools for terminating the thread */ +typedef enum eTermSyncType { + eTermSync_NONE = 0, /* no cleanup necessary, just cancel thread */ + eTermSync_SIGNAL /* termination via pthread_kill() */ +} eTermSyncType_t; + +/* the thread object */ +typedef struct thrdInfo { + eTermSyncType_t eTermTool; + int bIsActive; /* Is thread running? */ + pthread_t thrdID; +} thrdInfo_t; + /* this is the first approach to a queue, this time with static * memory. */ @@ -35,6 +49,7 @@ typedef struct { } msgQueue; /* prototypes */ +rsRetVal thrdTerminate(thrdInfo_t *pThis); msgQueue *queueInit (void); void queueDelete (msgQueue *q); void queueAdd (msgQueue *q, void* in); |