summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-05 13:39:40 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-05 13:39:40 +0000
commit62154cdde95ad579d4b2b98f59fac4817be8a0f4 (patch)
tree29a65e0ece3080baa9015436ce0704da83d8f06e /queue.c
parente055d4921b9a53e9dfedc56bbff3a9b12400d34d (diff)
downloadrsyslog-62154cdde95ad579d4b2b98f59fac4817be8a0f4.tar.gz
rsyslog-62154cdde95ad579d4b2b98f59fac4817be8a0f4.tar.xz
rsyslog-62154cdde95ad579d4b2b98f59fac4817be8a0f4.zip
added the "direct" queueing mode to queue class (no queing at all)
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c89
1 files changed, 71 insertions, 18 deletions
diff --git a/queue.c b/queue.c
index 73bbf168..0b18b6db 100644
--- a/queue.c
+++ b/queue.c
@@ -252,6 +252,45 @@ rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
return iRet;
}
+/* -------------------- direct (no queueing) -------------------- */
+rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis)
+{
+ return RS_RET_OK;
+}
+
+
+rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis)
+{
+ return RS_RET_OK;
+}
+
+rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
+{
+ DEFiRet;
+ rsRetVal iRetLocal;
+
+ assert(pThis != NULL);
+
+ /* TODO: calling the consumer should go into its own function! -- rgerhards, 2008-01-05*/
+ iRetLocal = pThis->pConsumer(pUsr);
+ if(iRetLocal != RS_RET_OK)
+ dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n",
+ (unsigned long) pThis, iRetLocal);
+ --pThis->iQueueSize; /* this is kind of a hack, but its the smartest thing we can do given
+ * the somewhat astonishing fact that this queue type does not actually
+ * queue anything ;)
+ */
+
+ return iRet;
+}
+
+rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
+{
+ return RS_RET_OK;
+}
+
+
+
/* --------------- end type-specific handlers -------------------- */
@@ -409,6 +448,12 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize,
pThis->qAdd = qAddDisk;
pThis->qDel = qDelDisk;
break;
+ case QUEUETYPE_DIRECT:
+ pThis->qConstruct = qConstructDirect;
+ pThis->qDestruct = qDestructDirect;
+ pThis->qAdd = qAddDirect;
+ pThis->qDel = qDelDirect;
+ break;
}
/* call type-specific constructor */
@@ -433,11 +478,13 @@ rsRetVal queueStart(queue_t *pThis)
{
int i;
- /* fire up the worker thread */
- pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */
- i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis);
- dbgprintf("Worker thread for queue 0x%lx, type %d started with state %d.\n",
- (unsigned long) pThis, (int) pThis->qType, i);
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ /* fire up the worker thread */
+ pThis->bDoRun = 1; /* we are NOT done (else worker would immediately terminate) */
+ i = pthread_create(&pThis->thrdWorker, NULL, queueWorker, (void*) pThis);
+ dbgprintf("Worker thread for queue 0x%lx, type %d started with state %d.\n",
+ (unsigned long) pThis, (int) pThis->qType, i);
+ }
return RS_RET_OK;
}
@@ -449,15 +496,17 @@ rsRetVal queueDestruct(queue_t *pThis)
assert(pThis != NULL);
- /* first stop the worker thread */
- dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis);
- pThis->bDoRun = 0;
- /* It's actually not "not empty" below but awaking the worker. The worker
- * then finds out that it shall terminate and does so.
- */
- pthread_cond_signal(pThis->notEmpty);
- pthread_join(pThis->thrdWorker, NULL);
- dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis);
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ /* first stop the worker thread */
+ dbgprintf("Initiating worker thread shutdown sequence for queue 0x%lx...\n", (unsigned long) pThis);
+ pThis->bDoRun = 0;
+ /* It's actually not "not empty" below but awaking the worker. The worker
+ * then finds out that it shall terminate and does so.
+ */
+ pthread_cond_signal(pThis->notEmpty);
+ pthread_join(pThis->thrdWorker, NULL);
+ dbgprintf("Worker thread for queue 0x%lx terminated.\n", (unsigned long) pThis);
+ }
/* ... then free resources */
pthread_mutex_destroy(pThis->mut);
@@ -492,7 +541,8 @@ queueEnqObj(queue_t *pThis, void *pUsr)
assert(pThis != NULL);
- pthread_mutex_lock(pThis->mut);
+ if(pThis->qType != QUEUETYPE_DIRECT)
+ pthread_mutex_lock(pThis->mut);
while(pThis->iQueueSize >= pThis->iMaxQueueSize) {
dbgprintf("enqueueMsg: queue 0x%lx FULL.\n", (unsigned long) pThis);
@@ -511,9 +561,12 @@ queueEnqObj(queue_t *pThis, void *pUsr)
finalize_it:
/* now activate the worker thread */
- pthread_mutex_unlock(pThis->mut);
- i = pthread_cond_signal(pThis->notEmpty);
- dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ pthread_mutex_unlock(pThis->mut);
+ i = pthread_cond_signal(pThis->notEmpty);
+ dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);
+ }
+
return iRet;
}
/*