diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-05 13:39:40 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-05 13:39:40 +0000 |
commit | 62154cdde95ad579d4b2b98f59fac4817be8a0f4 (patch) | |
tree | 29a65e0ece3080baa9015436ce0704da83d8f06e | |
parent | e055d4921b9a53e9dfedc56bbff3a9b12400d34d (diff) | |
download | rsyslog-62154cdde95ad579d4b2b98f59fac4817be8a0f4.tar.gz rsyslog-62154cdde95ad579d4b2b98f59fac4817be8a0f4.tar.xz rsyslog-62154cdde95ad579d4b2b98f59fac4817be8a0f4.zip |
added the "direct" queueing mode to queue class (no queing at all)
-rw-r--r-- | msg.c | 2 | ||||
-rw-r--r-- | queue.c | 89 | ||||
-rw-r--r-- | queue.h | 3 | ||||
-rw-r--r-- | syslogd.c | 17 |
4 files changed, 89 insertions, 22 deletions
@@ -157,12 +157,14 @@ static void MsgPrepareEnqueueLockingCase(msg_t *pThis) /* ... and now the locking and unlocking implementations: */ static void MsgLockLockingCase(msg_t *pThis) { + /* DEV debug only! dbgprintf("MsgLock(0x%lx)\n", (unsigned long) pThis); */ assert(pThis != NULL); pthread_mutex_lock(&pThis->mut); } static void MsgUnlockLockingCase(msg_t *pThis) { + /* DEV debug only! dbgprintf("MsgUnlock(0x%lx)\n", (unsigned long) pThis); */ assert(pThis != NULL); pthread_mutex_unlock(&pThis->mut); } @@ -252,6 +252,45 @@ rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) return iRet; } +/* -------------------- direct (no queueing) -------------------- */ +rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis) +{ + return RS_RET_OK; +} + + +rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis) +{ + return RS_RET_OK; +} + +rsRetVal qAddDirect(queue_t *pThis, void* pUsr) +{ + DEFiRet; + rsRetVal iRetLocal; + + assert(pThis != NULL); + + /* TODO: calling the consumer should go into its own function! -- rgerhards, 2008-01-05*/ + iRetLocal = pThis->pConsumer(pUsr); + if(iRetLocal != RS_RET_OK) + dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n", + (unsigned long) pThis, iRetLocal); + --pThis->iQueueSize; /* this is kind of a hack, but its the smartest thing we can do given + * the somewhat astonishing fact that this queue type does not actually + * queue anything ;) + */ + + return iRet; +} + +rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) +{ + return RS_RET_OK; +} + + + /* --------------- end type-specific handlers -------------------- */ @@ -409,6 +448,12 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, pThis->qAdd = qAddDisk; pThis->qDel = qDelDisk; break; + case QUEUETYPE_DIRECT: + pThis->qConstruct = qConstructDirect; + pThis->qDestruct = qDestructDirect; + pThis->qAdd = qAddDirect; + pThis->qDel = qDelDirect; + break; } /* call type-specific constructor */ @@ -433,11 +478,13 @@ rsRetVal queueStart(queue_t *pThis) { int i; - /* fire up the worker thread */ - pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */ - i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis); - dbgprintf("Worker thread for queue 0x%lx, type %d started with state %d.\n", - (unsigned long) pThis, (int) pThis->qType, i); + if(pThis->qType != QUEUETYPE_DIRECT) { + /* fire up the worker thread */ + pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */ + i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis); + dbgprintf("Worker thread for queue 0x%lx, type %d started with state %d.\n", + (unsigned long) pThis, (int) pThis->qType, i); + } return RS_RET_OK; } @@ -449,15 +496,17 @@ rsRetVal queueDestruct(queue_t *pThis) assert(pThis != NULL); - /* first stop the worker thread */ - dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis); - pThis->bDoRun = 0; - /* It's actually not "not empty" below but awaking the worker. The worker - * then finds out that it shall terminate and does so. - */ - pthread_cond_signal(pThis->notEmpty); - pthread_join(pThis->thrdWorker, NULL); - dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis); + if(pThis->qType != QUEUETYPE_DIRECT) { + /* first stop the worker thread */ + dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis); + pThis->bDoRun = 0; + /* It's actually not "not empty" below but awaking the worker. The worker + * then finds out that it shall terminate and does so. + */ + pthread_cond_signal(pThis->notEmpty); + pthread_join(pThis->thrdWorker, NULL); + dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis); + } /* ... then free resources */ pthread_mutex_destroy(pThis->mut); @@ -492,7 +541,8 @@ queueEnqObj(queue_t *pThis, void *pUsr) assert(pThis != NULL); - pthread_mutex_lock(pThis->mut); + if(pThis->qType != QUEUETYPE_DIRECT) + pthread_mutex_lock(pThis->mut); while(pThis->iQueueSize >= pThis->iMaxQueueSize) { dbgprintf("enqueueMsg: queue 0x%lx FULL.\n", (unsigned long) pThis); @@ -511,9 +561,12 @@ queueEnqObj(queue_t *pThis, void *pUsr) finalize_it: /* now activate the worker thread */ - pthread_mutex_unlock(pThis->mut); - i = pthread_cond_signal(pThis->notEmpty); - dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); + if(pThis->qType != QUEUETYPE_DIRECT) { + pthread_mutex_unlock(pThis->mut); + i = pthread_cond_signal(pThis->notEmpty); + dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); + } + return iRet; } /* @@ -30,7 +30,8 @@ typedef enum { QUEUETYPE_FIXED_ARRAY = 0,/* a simple queue made out of a fixed (initially malloced) array fast but memoryhog */ QUEUETYPE_LINKEDLIST = 1, /* linked list used as buffer, lower fixed memory overhead but slower */ - QUEUETYPE_DISK = 2 /* disk files used as buffer */ + QUEUETYPE_DISK = 2, /* disk files used as buffer */ + QUEUETYPE_DIRECT = 3 /* no queuing happens, consumer is directly called */ } queueType_t; /* list member definition for linked list types of queues: */ @@ -410,6 +410,10 @@ static int Initialized = 0; /* set when we have initialized ourselves * such a case. * read-only after startup, but modified during restart */ +static int bHaveMainQueue = 0;/* set to 1 if the main queue - in queueing mode - is available + * If the main queue is either not yet ready or not running in + * queueing mode (mode DIRECT!), then this is set to 0. + */ extern int errno; @@ -1531,7 +1535,7 @@ logmsgInternal(int pri, char *msg, int flags) getCurrTime(&(pMsg->tTIMESTAMP)); /* use the current time! */ flags |= INTERNAL_MSG; - if(Initialized == 0) { /* not yet in queued mode */ + if(bHaveMainQueue == 0) { /* not yet in queued mode */ iminternalAddMsg(pri, pMsg, flags); } else { /* we have the queue, so we can simply provide the @@ -3046,6 +3050,7 @@ static void freeSelectors(void) /* Reflect the deletion of the selectors linked list. */ Files = NULL; Initialized = 0; + bHaveMainQueue = 0; } } @@ -3351,8 +3356,10 @@ init(void) } /* switch the message object to threaded operation, if necessary */ - // TODO: handle the "if" part above ;) - MsgEnableThreadSafety(); + // TODO: add check for nbr of workers once we have that! + if(MainMsgQueType == QUEUETYPE_DIRECT) { + MsgEnableThreadSafety(); + } /* create message queue */ CHKiRet_Hdlr(queueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueSize, msgConsumer)) { @@ -3367,6 +3374,7 @@ init(void) } Initialized = 1; + bHaveMainQueue = (MainMsgQueType == QUEUETYPE_DIRECT) ? 0 : 1; /* the output part and the queue is now ready to run. So it is a good time * to start the inputs. Please note that the net code above should be @@ -4215,6 +4223,9 @@ static rsRetVal setMainMsgQueType(void __attribute__((unused)) *pVal, uchar *psz } else if (!strcasecmp((char *) pszType, "disk")) { MainMsgQueType = QUEUETYPE_DISK; dbgprintf("main message queue type set to DISK\n"); + } else if (!strcasecmp((char *) pszType, "direct")) { + MainMsgQueType = QUEUETYPE_DIRECT; + dbgprintf("main message queue type set to DIRECT (no queueing at all)\n"); } else { logerrorSz("unknown mainmessagequeuetype parameter: %s", (char *) pszType); iRet = RS_RET_INVALID_PARAMS; |