diff options
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 55 |
1 files changed, 49 insertions, 6 deletions
@@ -1,3 +1,5 @@ +// TODO: DA worker must not wait eternal on shutdown when in enqueue only mode!:w +// // TODO: we need to implement peek(), without it (today!) we lose one message upon // worker cancellation! -- rgerhards, 2008-01-14 // TODO: think about mutDA - I think it's no longer needed @@ -154,7 +156,12 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) RUNLOG_VAR("%d", pThis->bEnqOnly); if(!pThis->bEnqOnly) { if(pThis->bRunsDA) { - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ + /* if we have not yet reached the high water mark, there is no need to start a + * worker. -- rgerhards, 2008-01-26 + */ + // WRONG: if(pThis->iQueueSize >= pThis->iHighWtrMrk) { + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ + //} } else { if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { iMaxWorkers = 1; @@ -262,6 +269,7 @@ dbgprintf("Queue %p: queueStartDA pre start\n", pThis); wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */ pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */ + pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */ pthread_cond_signal(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n", @@ -980,11 +988,17 @@ RUNLOG_VAR("%d", pThis->iQueueSize); /* ... and now the DA queue, if it exists (should always be after the primary one) */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) { + //TODO: use right mutex! + //old: if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) { +if(pThis->pqDA != NULL) { +RUNLOG_VAR("%p", pThis->pqDA->pWtpReg); +RUNLOG_VAR("%d", pThis->pqDA->pWtpReg->iCurNumWrkThrd); +} + if(pThis->pqDA != NULL && pThis->pqDA->pWtpReg->iCurNumWrkThrd > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n", queueGetID(pThis)); - iRetLocal = wtpCancelAll(pThis->pWtpReg); + iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg); if(iRetLocal != RS_RET_OK) { dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel DA queue worker " "threads, continuing, but results are unpredictable\n", @@ -1078,23 +1092,28 @@ finalize_it: /* cancellation cleanup handler for queueWorker () * Updates admin structure and frees ressources. + * Params: + * arg1 - user pointer (in this case a queue_t) + * arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!]) * rgerhards, 2008-01-16 */ static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2) { + DEFiRet; + queue_t *pThis = (queue_t*) arg1; - wti_t *pWti = (wti_t*) arg2; + obj_t *pUsr = (obj_t*) arg2; ISOBJ_TYPE_assert(pThis, queue); - ISOBJ_TYPE_assert(pWti, wti); + ISOBJ_assert(pUsr); dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n", queueGetID(pThis)); /* TODO: re-enqueue the data element! */ - return RS_RET_OK; + RETiRet; } @@ -1308,8 +1327,13 @@ queueRegOnWrkrShutdown(queue_t *pThis) { DEFiRet; + ISOBJ_TYPE_assert(pThis, queue); + if(pThis->pqParent != NULL) { +RUNLOG_VAR("%p", pThis->pqParent); +RUNLOG_VAR("%p", pThis->pqParent->pWtpDA); assert(pThis->pqParent->pWtpDA != NULL); + pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ } @@ -1317,6 +1341,24 @@ queueRegOnWrkrShutdown(queue_t *pThis) } +/* The following function is called when a regular queue worker starts up. We need this + * hook to indicate in the parent queue (if we are a child) that we are not done yet. + */ +static rsRetVal +queueRegOnWrkrStartup(queue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + + if(pThis->pqParent != NULL) { + pThis->pqParent->bChildIsDone = 0; + } + + RETiRet; +} + + /* start up the queue - it must have been constructed and parameters defined * before. */ @@ -1368,6 +1410,7 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, queueIsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, queueConsumerReg)); CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, queueConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, queueRegOnWrkrStartup)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, queueRegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); |