diff options
-rw-r--r-- | queue.c | 42 | ||||
-rw-r--r-- | queue.h | 6 | ||||
-rw-r--r-- | syslogd.c | 15 |
3 files changed, 46 insertions, 17 deletions
@@ -1,3 +1,6 @@ +// TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in +// call consumer state. Facilitates retaining messages in queue until action could +// be called! /* queue.c * * This file implements the queue object and its several queueing methods. @@ -404,13 +407,15 @@ queueWorker(void *arg) * is done by queueStart(). The reason is that we want to give the caller a chance * to modify some parameters before the queue is actually started. */ -rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, rsRetVal (*pConsumer)(void*)) +rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, + int iMaxQueueSize, rsRetVal (*pConsumer)(void*)) { DEFiRet; queue_t *pThis; assert(ppThis != NULL); assert(pConsumer != NULL); + assert(iWorkerThreads >= 0); if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -426,6 +431,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, pthread_cond_init (pThis->notFull, NULL); pThis->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); pthread_cond_init (pThis->notEmpty, NULL); + pThis->iNumWorkerThreads = iWorkerThreads; pThis->qType = qType; /* set type-specific handlers */ @@ -476,36 +482,48 @@ finalize_it: */ rsRetVal queueStart(queue_t *pThis) { + DEFiRet; + int iState; int i; if(pThis->qType != QUEUETYPE_DIRECT) { + if((pThis->pWorkerThreads = calloc(pThis->iNumWorkerThreads, sizeof(pthread_t))) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + /* fire up the worker thread */ pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */ - i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis); - dbgprintf("Worker thread for queue 0x%lx, type %d started with state %d.\n", - (unsigned long) pThis, (int) pThis->qType, i); + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { + iState = pthread_create(&(pThis->pWorkerThreads[i]), NULL, queueWorker, (void*) pThis); + dbgprintf("Worker thread %d for queue 0x%lx, type %d started with state %d.\n", + i, (unsigned long) pThis, (int) pThis->qType, iState); + } } - return RS_RET_OK; +finalize_it: + return iRet; } /* destructor for the queue object */ rsRetVal queueDestruct(queue_t *pThis) { DEFiRet; + int i; assert(pThis != NULL); - if(pThis->qType != QUEUETYPE_DIRECT) { + if(pThis->pWorkerThreads != NULL) { /* first stop the worker thread */ dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis); pThis->bDoRun = 0; - /* It's actually not "not empty" below but awaking the worker. The worker - * then finds out that it shall terminate and does so. + /* It's actually not "not empty" below but awaking the workers. They + * then find out that they shall terminate and do so. */ - pthread_cond_signal(pThis->notEmpty); - pthread_join(pThis->thrdWorker, NULL); - dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis); + pthread_cond_broadcast(pThis->notEmpty); + /* end then wait for all worker threads to terminate */ + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { + pthread_join(pThis->pWorkerThreads[i], NULL); + } + dbgprintf("Worker threads for queue 0x%lx terminated.\n", (unsigned long) pThis); } /* ... then free resources */ @@ -561,7 +579,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) finalize_it: /* now activate the worker thread */ - if(pThis->qType != QUEUETYPE_DIRECT) { + if(pThis->pWorkerThreads != NULL) { pthread_mutex_unlock(pThis->mut); i = pthread_cond_signal(pThis->notEmpty); dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); @@ -45,7 +45,8 @@ typedef struct queue_s { queueType_t qType; int iQueueSize; /* Current number of elements in the queue */ int iMaxQueueSize; /* how large can the queue grow? */ - pthread_t thrdWorker; /* ID of the worker thread associated with this queue */ + int iNumWorkerThreads;/* number of worker threads to use */ + pthread_t *pWorkerThreads;/* array with ID of the worker thread(s) associated with this queue */ int bDoRun; /* 1 - run queue, 0 - shutdown of queue requested */ rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */ /* type-specific handlers (set during construction) */ @@ -84,7 +85,8 @@ typedef struct queue_s { /* prototypes */ rsRetVal queueDestruct(queue_t *pThis); rsRetVal queueEnqObj(queue_t *pThis, void *pUsr); -rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, rsRetVal (*pConsumer)(void*)); rsRetVal queueStart(queue_t *pThis); +rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, + int iMaxQueueSize, rsRetVal (*pConsumer)(void*)); #endif /* #ifndef QUEUE_H_INCLUDED */ @@ -420,6 +420,7 @@ extern int errno; /* main message queue and its configuration parameters */ static queue_t *pMsgQueue = NULL; /* the main message queue */ static int iMainMsgQueueSize = 10000; /* size of the main message queue above */ +static int iMainMsgQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ static queueType_t MainMsgQueType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */ @@ -522,6 +523,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a pszMainMsgQFilePrefix = NULL; } iMainMsgQueueSize = 10000; + iMainMsgQueueNumWorkers = 1; MainMsgQueType = QUEUETYPE_FIXED_ARRAY; return RS_RET_OK; @@ -3134,6 +3136,7 @@ static void dbgPrintInitInfo(void) cCCEscapeChar); dbgprintf("Main queue size %d messages.\n", iMainMsgQueueSize); + dbgprintf("Main queue worker threads: %d\n", iMainMsgQueueNumWorkers); dbgprintf("Spool Directory: '%s'.\n", pszSpoolDirectory); } @@ -3355,14 +3358,19 @@ init(void) pDfltProgNameCmp = NULL; } + /* some checks */ + if(iMainMsgQueueNumWorkers < 1) { + logerror("$MainMsgQueueNumWorkers must be at least 1! Set to 1.\n"); + iMainMsgQueueNumWorkers = 1; + } + /* switch the message object to threaded operation, if necessary */ - // TODO: add check for nbr of workers once we have that! - if(MainMsgQueType == QUEUETYPE_DIRECT) { + if(MainMsgQueType == QUEUETYPE_DIRECT || iMainMsgQueueNumWorkers > 1) { MsgEnableThreadSafety(); } /* create message queue */ - CHKiRet_Hdlr(queueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueSize, msgConsumer)) { + CHKiRet_Hdlr(queueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueNumWorkers, iMainMsgQueueSize, msgConsumer)) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet); exit(1); @@ -4501,6 +4509,7 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuefileprefix", 0, eCmdHdlrGetWord, NULL, &pszMainMsgQFilePrefix, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesize", 0, eCmdHdlrInt, NULL, &iMainMsgQueueSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetype", 0, eCmdHdlrGetWord, setMainMsgQueType, NULL, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iMainMsgQueueNumWorkers, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeinterval", 0, eCmdHdlrInt, setActionResumeInterval, NULL, NULL)); |