summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--msg.c2
-rw-r--r--queue.c89
-rw-r--r--queue.h3
-rw-r--r--syslogd.c17
4 files changed, 89 insertions, 22 deletions
diff --git a/msg.c b/msg.c
index 3fd8949b..e8d253f0 100644
--- a/msg.c
+++ b/msg.c
@@ -157,12 +157,14 @@ static void MsgPrepareEnqueueLockingCase(msg_t *pThis)
/* ... and now the locking and unlocking implementations: */
static void MsgLockLockingCase(msg_t *pThis)
{
+ /* DEV debug only! dbgprintf("MsgLock(0x%lx)\n", (unsigned long) pThis); */
assert(pThis != NULL);
pthread_mutex_lock(&pThis->mut);
}
static void MsgUnlockLockingCase(msg_t *pThis)
{
+ /* DEV debug only! dbgprintf("MsgUnlock(0x%lx)\n", (unsigned long) pThis); */
assert(pThis != NULL);
pthread_mutex_unlock(&pThis->mut);
}
diff --git a/queue.c b/queue.c
index 73bbf168..0b18b6db 100644
--- a/queue.c
+++ b/queue.c
@@ -252,6 +252,45 @@ rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
return iRet;
}
+/* -------------------- direct (no queueing) -------------------- */
+rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis)
+{
+ return RS_RET_OK;
+}
+
+
+rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis)
+{
+ return RS_RET_OK;
+}
+
+rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
+{
+ DEFiRet;
+ rsRetVal iRetLocal;
+
+ assert(pThis != NULL);
+
+ /* TODO: calling the consumer should go into its own function! -- rgerhards, 2008-01-05*/
+ iRetLocal = pThis->pConsumer(pUsr);
+ if(iRetLocal != RS_RET_OK)
+ dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n",
+ (unsigned long) pThis, iRetLocal);
+ --pThis->iQueueSize; /* this is kind of a hack, but its the smartest thing we can do given
+ * the somewhat astonishing fact that this queue type does not actually
+ * queue anything ;)
+ */
+
+ return iRet;
+}
+
+rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
+{
+ return RS_RET_OK;
+}
+
+
+
/* --------------- end type-specific handlers -------------------- */
@@ -409,6 +448,12 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize,
pThis->qAdd = qAddDisk;
pThis->qDel = qDelDisk;
break;
+ case QUEUETYPE_DIRECT:
+ pThis->qConstruct = qConstructDirect;
+ pThis->qDestruct = qDestructDirect;
+ pThis->qAdd = qAddDirect;
+ pThis->qDel = qDelDirect;
+ break;
}
/* call type-specific constructor */
@@ -433,11 +478,13 @@ rsRetVal queueStart(queue_t *pThis)
{
int i;
- /* fire up the worker thread */
- pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */
- i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis);
- dbgprintf("Worker thread for queue 0x%lx, type %d started with state %d.\n",
- (unsigned long) pThis, (int) pThis->qType, i);
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ /* fire up the worker thread */
+ pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */
+ i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis);
+ dbgprintf("Worker thread for queue 0x%lx, type %d started with state %d.\n",
+ (unsigned long) pThis, (int) pThis->qType, i);
+ }
return RS_RET_OK;
}
@@ -449,15 +496,17 @@ rsRetVal queueDestruct(queue_t *pThis)
assert(pThis != NULL);
- /* first stop the worker thread */
- dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis);
- pThis->bDoRun = 0;
- /* It's actually not "not empty" below but awaking the worker. The worker
- * then finds out that it shall terminate and does so.
- */
- pthread_cond_signal(pThis->notEmpty);
- pthread_join(pThis->thrdWorker, NULL);
- dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis);
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ /* first stop the worker thread */
+ dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis);
+ pThis->bDoRun = 0;
+ /* It's actually not "not empty" below but awaking the worker. The worker
+ * then finds out that it shall terminate and does so.
+ */
+ pthread_cond_signal(pThis->notEmpty);
+ pthread_join(pThis->thrdWorker, NULL);
+ dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis);
+ }
/* ... then free resources */
pthread_mutex_destroy(pThis->mut);
@@ -492,7 +541,8 @@ queueEnqObj(queue_t *pThis, void *pUsr)
assert(pThis != NULL);
- pthread_mutex_lock(pThis->mut);
+ if(pThis->qType != QUEUETYPE_DIRECT)
+ pthread_mutex_lock(pThis->mut);
while(pThis->iQueueSize >= pThis->iMaxQueueSize) {
dbgprintf("enqueueMsg: queue 0x%lx FULL.\n", (unsigned long) pThis);
@@ -511,9 +561,12 @@ queueEnqObj(queue_t *pThis, void *pUsr)
finalize_it:
/* now activate the worker thread */
- pthread_mutex_unlock(pThis->mut);
- i = pthread_cond_signal(pThis->notEmpty);
- dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ pthread_mutex_unlock(pThis->mut);
+ i = pthread_cond_signal(pThis->notEmpty);
+ dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);
+ }
+
return iRet;
}
/*
diff --git a/queue.h b/queue.h
index 7ad9b1e1..a01f2496 100644
--- a/queue.h
+++ b/queue.h
@@ -30,7 +30,8 @@
typedef enum {
QUEUETYPE_FIXED_ARRAY = 0,/* a simple queue made out of a fixed (initially malloced) array fast but memoryhog */
QUEUETYPE_LINKEDLIST = 1, /* linked list used as buffer, lower fixed memory overhead but slower */
- QUEUETYPE_DISK = 2 /* disk files used as buffer */
+ QUEUETYPE_DISK = 2, /* disk files used as buffer */
+ QUEUETYPE_DIRECT = 3 /* no queuing happens, consumer is directly called */
} queueType_t;
/* list member definition for linked list types of queues: */
diff --git a/syslogd.c b/syslogd.c
index 28060db9..b7e51b55 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -410,6 +410,10 @@ static int Initialized = 0; /* set when we have initialized ourselves
* such a case.
* read-only after startup, but modified during restart
*/
+static int bHaveMainQueue = 0;/* set to 1 if the main queue - in queueing mode - is available
+ * If the main queue is either not yet ready or not running in
+ * queueing mode (mode DIRECT!), then this is set to 0.
+ */
extern int errno;
@@ -1531,7 +1535,7 @@ logmsgInternal(int pri, char *msg, int flags)
getCurrTime(&(pMsg->tTIMESTAMP)); /* use the current time! */
flags |= INTERNAL_MSG;
- if(Initialized == 0) { /* not yet in queued mode */
+ if(bHaveMainQueue == 0) { /* not yet in queued mode */
iminternalAddMsg(pri, pMsg, flags);
} else {
/* we have the queue, so we can simply provide the
@@ -3046,6 +3050,7 @@ static void freeSelectors(void)
/* Reflect the deletion of the selectors linked list. */
Files = NULL;
Initialized = 0;
+ bHaveMainQueue = 0;
}
}
@@ -3351,8 +3356,10 @@ init(void)
}
/* switch the message object to threaded operation, if necessary */
- // TODO: handle the "if" part above ;)
- MsgEnableThreadSafety();
+ // TODO: add check for nbr of workers once we have that!
+ if(MainMsgQueType == QUEUETYPE_DIRECT) {
+ MsgEnableThreadSafety();
+ }
/* create message queue */
CHKiRet_Hdlr(queueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueSize, msgConsumer)) {
@@ -3367,6 +3374,7 @@ init(void)
}
Initialized = 1;
+ bHaveMainQueue = (MainMsgQueType == QUEUETYPE_DIRECT) ? 0 : 1;
/* the output part and the queue is now ready to run. So it is a good time
* to start the inputs. Please note that the net code above should be
@@ -4215,6 +4223,9 @@ static rsRetVal setMainMsgQueType(void __attribute__((unused)) *pVal, uchar *psz
} else if (!strcasecmp((char *) pszType, "disk")) {
MainMsgQueType = QUEUETYPE_DISK;
dbgprintf("main message queue type set to DISK\n");
+ } else if (!strcasecmp((char *) pszType, "direct")) {
+ MainMsgQueType = QUEUETYPE_DIRECT;
+ dbgprintf("main message queue type set to DIRECT (no queueing at all)\n");
} else {
logerrorSz("unknown mainmessagequeuetype parameter: %s", (char *) pszType);
iRet = RS_RET_INVALID_PARAMS;