summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-05 15:58:33 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-05 15:58:33 +0000
commit558003a7250d878c8cc52bf2d8863394b9baf835 (patch)
tree6564e5791397e428bf168a9a27125c980080bbd6
parent62154cdde95ad579d4b2b98f59fac4817be8a0f4 (diff)
downloadrsyslog-558003a7250d878c8cc52bf2d8863394b9baf835.tar.gz
rsyslog-558003a7250d878c8cc52bf2d8863394b9baf835.tar.xz
rsyslog-558003a7250d878c8cc52bf2d8863394b9baf835.zip
- added multiple worker thread capability to queue class
- implemented $MainMsgQueueWorkerThreads config directive
-rw-r--r--queue.c42
-rw-r--r--queue.h6
-rw-r--r--syslogd.c15
3 files changed, 46 insertions, 17 deletions
diff --git a/queue.c b/queue.c
index 0b18b6db..4c0a46c1 100644
--- a/queue.c
+++ b/queue.c
@@ -1,3 +1,6 @@
+// TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in
+// call consumer state. Facilitates retaining messages in queue until action could
+// be called!
/* queue.c
*
* This file implements the queue object and its several queueing methods.
@@ -404,13 +407,15 @@ queueWorker(void *arg)
* is done by queueStart(). The reason is that we want to give the caller a chance
* to modify some parameters before the queue is actually started.
*/
-rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, rsRetVal (*pConsumer)(void*))
+rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*))
{
DEFiRet;
queue_t *pThis;
assert(ppThis != NULL);
assert(pConsumer != NULL);
+ assert(iWorkerThreads >= 0);
if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -426,6 +431,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize,
pthread_cond_init (pThis->notFull, NULL);
pThis->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
pthread_cond_init (pThis->notEmpty, NULL);
+ pThis->iNumWorkerThreads = iWorkerThreads;
pThis->qType = qType;
/* set type-specific handlers */
@@ -476,36 +482,48 @@ finalize_it:
*/
rsRetVal queueStart(queue_t *pThis)
{
+ DEFiRet;
+ int iState;
int i;
if(pThis->qType != QUEUETYPE_DIRECT) {
+ if((pThis->pWorkerThreads = calloc(pThis->iNumWorkerThreads, sizeof(pthread_t))) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+
/* fire up the worker thread */
pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */
- i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis);
- dbgprintf("Worker thread for queue 0x%lx, type %d started with state %d.\n",
- (unsigned long) pThis, (int) pThis->qType, i);
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ iState = pthread_create(&(pThis->pWorkerThreads[i]), NULL, queueWorker, (void*) pThis);
+ dbgprintf("Worker thread %d for queue 0x%lx, type %d started with state %d.\n",
+ i, (unsigned long) pThis, (int) pThis->qType, iState);
+ }
}
- return RS_RET_OK;
+finalize_it:
+ return iRet;
}
/* destructor for the queue object */
rsRetVal queueDestruct(queue_t *pThis)
{
DEFiRet;
+ int i;
assert(pThis != NULL);
- if(pThis->qType != QUEUETYPE_DIRECT) {
+ if(pThis->pWorkerThreads != NULL) {
/* first stop the worker thread */
dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis);
pThis->bDoRun = 0;
- /* It's actually not "not empty" below but awaking the worker. The worker
- * then finds out that it shall terminate and does so.
+ /* It's actually not "not empty" below but awaking the workers. They
+ * then find out that they shall terminate and do so.
*/
- pthread_cond_signal(pThis->notEmpty);
- pthread_join(pThis->thrdWorker, NULL);
- dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis);
+ pthread_cond_broadcast(pThis->notEmpty);
+ /* end then wait for all worker threads to terminate */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ pthread_join(pThis->pWorkerThreads[i], NULL);
+ }
+ dbgprintf("Worker threads for queue 0x%lx terminated.\n", (unsigned long) pThis);
}
/* ... then free resources */
@@ -561,7 +579,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
finalize_it:
/* now activate the worker thread */
- if(pThis->qType != QUEUETYPE_DIRECT) {
+ if(pThis->pWorkerThreads != NULL) {
pthread_mutex_unlock(pThis->mut);
i = pthread_cond_signal(pThis->notEmpty);
dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);
diff --git a/queue.h b/queue.h
index a01f2496..8c8782d3 100644
--- a/queue.h
+++ b/queue.h
@@ -45,7 +45,8 @@ typedef struct queue_s {
queueType_t qType;
int iQueueSize; /* Current number of elements in the queue */
int iMaxQueueSize; /* how large can the queue grow? */
- pthread_t thrdWorker; /* ID of the worker thread associated with this queue */
+ int iNumWorkerThreads;/* number of worker threads to use */
+ pthread_t *pWorkerThreads;/* array with ID of the worker thread(s) associated with this queue */
int bDoRun; /* 1 - run queue, 0 - shutdown of queue requested */
rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */
/* type-specific handlers (set during construction) */
@@ -84,7 +85,8 @@ typedef struct queue_s {
/* prototypes */
rsRetVal queueDestruct(queue_t *pThis);
rsRetVal queueEnqObj(queue_t *pThis, void *pUsr);
-rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, rsRetVal (*pConsumer)(void*));
rsRetVal queueStart(queue_t *pThis);
+rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*));
#endif /* #ifndef QUEUE_H_INCLUDED */
diff --git a/syslogd.c b/syslogd.c
index b7e51b55..618c3847 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -420,6 +420,7 @@ extern int errno;
/* main message queue and its configuration parameters */
static queue_t *pMsgQueue = NULL; /* the main message queue */
static int iMainMsgQueueSize = 10000; /* size of the main message queue above */
+static int iMainMsgQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
static queueType_t MainMsgQueType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */
@@ -522,6 +523,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
pszMainMsgQFilePrefix = NULL;
}
iMainMsgQueueSize = 10000;
+ iMainMsgQueueNumWorkers = 1;
MainMsgQueType = QUEUETYPE_FIXED_ARRAY;
return RS_RET_OK;
@@ -3134,6 +3136,7 @@ static void dbgPrintInitInfo(void)
cCCEscapeChar);
dbgprintf("Main queue size %d messages.\n", iMainMsgQueueSize);
+ dbgprintf("Main queue worker threads: %d\n", iMainMsgQueueNumWorkers);
dbgprintf("Spool Directory: '%s'.\n", pszSpoolDirectory);
}
@@ -3355,14 +3358,19 @@ init(void)
pDfltProgNameCmp = NULL;
}
+ /* some checks */
+ if(iMainMsgQueueNumWorkers < 1) {
+ logerror("$MainMsgQueueNumWorkers must be at least 1! Set to 1.\n");
+ iMainMsgQueueNumWorkers = 1;
+ }
+
/* switch the message object to threaded operation, if necessary */
- // TODO: add check for nbr of workers once we have that!
- if(MainMsgQueType == QUEUETYPE_DIRECT) {
+ if(MainMsgQueType == QUEUETYPE_DIRECT || iMainMsgQueueNumWorkers > 1) {
MsgEnableThreadSafety();
}
/* create message queue */
- CHKiRet_Hdlr(queueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueSize, msgConsumer)) {
+ CHKiRet_Hdlr(queueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueNumWorkers, iMainMsgQueueSize, msgConsumer)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
exit(1);
@@ -4501,6 +4509,7 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuefileprefix", 0, eCmdHdlrGetWord, NULL, &pszMainMsgQFilePrefix, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesize", 0, eCmdHdlrInt, NULL, &iMainMsgQueueSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetype", 0, eCmdHdlrGetWord, setMainMsgQueType, NULL, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iMainMsgQueueNumWorkers, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeinterval", 0, eCmdHdlrInt, setActionResumeInterval, NULL, NULL));