summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-14 19:08:43 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-14 19:08:43 +0000
commit75b645f16b930f142b777b00b529fb726ef10243 (patch)
tree45ad552d5683c633e573e09801dd8a2a0f56acc2 /queue.c
parentec27ea55f38d78fd97c80f8554870693a96be332 (diff)
downloadrsyslog-75b645f16b930f142b777b00b529fb726ef10243.tar.gz
rsyslog-75b645f16b930f142b777b00b529fb726ef10243.tar.xz
rsyslog-75b645f16b930f142b777b00b529fb726ef10243.zip
some more work on disk assisted mode (still not complete)
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c50
1 files changed, 48 insertions, 2 deletions
diff --git a/queue.c b/queue.c
index 87cea59d..10a560d2 100644
--- a/queue.c
+++ b/queue.c
@@ -183,6 +183,17 @@ queueDAConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr)
dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, pThis->iQueueSize);
CHKiRet(queueEnqObj(pThis->pqDA, pUsr));
+ if(pThis->iQueueSize == pThis->iLowWtrMrk) {
+ dbgprintf("Queue 0x%lx: %d entries - passed low water mark in DA mode, sleeping\n",
+ queueGetID(pThis), pThis->iQueueSize);
+ pthread_mutex_lock(&pThis->mutDA);
+dbgprintf("mutex locked (think about CLEANUP!)\n");
+ pthread_cond_wait(&pThis->condDA, &pThis->mutDA);
+dbgprintf("condition returned\n");
+ pthread_mutex_unlock(&pThis->mutDA);
+dbgprintf("mutex unlocked (think about CLEANUP!)\n");
+ }
+
finalize_it:
return iRet;
}
@@ -197,15 +208,43 @@ queueChkStrtDA(queue_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, queue);
- if(pThis->iQueueSize < pThis->iHighWtrMrk || pThis->bRunsDA)
+
+
+ /* if we do not hit the high water mark, we have nothing to do */
+ if(pThis->iQueueSize != pThis->iHighWtrMrk)
ABORT_FINALIZE(RS_RET_OK);
+ if(pThis->bRunsDA) {
+ /* then we need to signal that we are at the high water mark again. The DA
+ * consumer shall check if it needs to restart. Note that we may pass the
+ * high water mark while we drain the queue.
+ * TODO: problem here? (condition signalled on drain...)
+ */
+ dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n",
+ queueGetID(pThis), pThis->iQueueSize);
+ pthread_mutex_lock(&pThis->mutDA);
+ pthread_cond_signal(&pThis->condDA);
+ pthread_mutex_unlock(&pThis->mutDA);
+ FINALIZE;
+ }
+
dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n",
queueGetID(pThis), pThis->iQueueSize);
+ /* set up sync objects for low water mark algo */
+ pthread_mutex_init(&pThis->mutDA, NULL);
+ pthread_cond_init(&pThis->condDA, NULL);
+
/* create message queue */
CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
+ /* as the created queue is the same object class, we take the
+ * liberty to access its properties directly.
+ */
+ pThis->pqDA->condSignalOnEmpty = &pThis->condDA;
+ pThis->pqDA->mutSignalOnEmpty = &pThis->mutDA;
+ pThis->pqDA->bSignalOnEmpty = 1;
+
CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
@@ -692,7 +731,7 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
pthread_mutex_lock(pThis->mut);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
- dbgprintf("Queue 0x%lx: waiting %ld ms on worker thread termination, %d still running\n",
+ dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n",
queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd);
if(pthread_cond_timedwait(&pThis->condThrdTrm, pThis->mut, &t) != 0) {
@@ -879,6 +918,13 @@ queueWorker(void *arg)
while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
queueGetID(pThis), iMyThrdIndx);
+ if(pThis->bSignalOnEmpty) {
+ /* we need to signal our parent queue that we are empty */
+dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx);
+ pthread_mutex_lock(pThis->mutSignalOnEmpty);
+ pthread_cond_signal(pThis->condSignalOnEmpty);
+ pthread_mutex_unlock(pThis->mutSignalOnEmpty);
+ }
pthread_cond_wait(pThis->notEmpty, pThis->mut);
}
if(pThis->iQueueSize > 0) {