From 75b645f16b930f142b777b00b529fb726ef10243 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 14 Jan 2008 19:08:43 +0000 Subject: some more work on disk assisted mode (still not complete) --- queue.c | 50 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 87cea59d..10a560d2 100644 --- a/queue.c +++ b/queue.c @@ -183,6 +183,17 @@ queueDAConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr) dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, pThis->iQueueSize); CHKiRet(queueEnqObj(pThis->pqDA, pUsr)); + if(pThis->iQueueSize == pThis->iLowWtrMrk) { + dbgprintf("Queue 0x%lx: %d entries - passed low water mark in DA mode, sleeping\n", + queueGetID(pThis), pThis->iQueueSize); + pthread_mutex_lock(&pThis->mutDA); +dbgprintf("mutex locked (think about CLEANUP!)\n"); + pthread_cond_wait(&pThis->condDA, &pThis->mutDA); +dbgprintf("condition returned\n"); + pthread_mutex_unlock(&pThis->mutDA); +dbgprintf("mutex unlocked (think about CLEANUP!)\n"); + } + finalize_it: return iRet; } @@ -197,15 +208,43 @@ queueChkStrtDA(queue_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, queue); - if(pThis->iQueueSize < pThis->iHighWtrMrk || pThis->bRunsDA) + + + /* if we do not hit the high water mark, we have nothing to do */ + if(pThis->iQueueSize != pThis->iHighWtrMrk) ABORT_FINALIZE(RS_RET_OK); + if(pThis->bRunsDA) { + /* then we need to signal that we are at the high water mark again. The DA + * consumer shall check if it needs to restart. Note that we may pass the + * high water mark while we drain the queue. + * TODO: problem here? (condition signalled on drain...) + */ + dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", + queueGetID(pThis), pThis->iQueueSize); + pthread_mutex_lock(&pThis->mutDA); + pthread_cond_signal(&pThis->condDA); + pthread_mutex_unlock(&pThis->mutDA); + FINALIZE; + } + dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n", queueGetID(pThis), pThis->iQueueSize); + /* set up sync objects for low water mark algo */ + pthread_mutex_init(&pThis->mutDA, NULL); + pthread_cond_init(&pThis->condDA, NULL); + /* create message queue */ CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer)); + /* as the created queue is the same object class, we take the + * liberty to access its properties directly. + */ + pThis->pqDA->condSignalOnEmpty = &pThis->condDA; + pThis->pqDA->mutSignalOnEmpty = &pThis->mutDA; + pThis->pqDA->bSignalOnEmpty = 1; + CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); @@ -692,7 +731,7 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) pthread_mutex_lock(pThis->mut); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { - dbgprintf("Queue 0x%lx: waiting %ld ms on worker thread termination, %d still running\n", + dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n", queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd); if(pthread_cond_timedwait(&pThis->condThrdTrm, pThis->mut, &t) != 0) { @@ -879,6 +918,13 @@ queueWorker(void *arg) while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", queueGetID(pThis), iMyThrdIndx); + if(pThis->bSignalOnEmpty) { + /* we need to signal our parent queue that we are empty */ +dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); + pthread_mutex_lock(pThis->mutSignalOnEmpty); + pthread_cond_signal(pThis->condSignalOnEmpty); + pthread_mutex_unlock(pThis->mutSignalOnEmpty); + } pthread_cond_wait(pThis->notEmpty, pThis->mut); } if(pThis->iQueueSize > 0) { -- cgit