summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c55
1 files changed, 49 insertions, 6 deletions
diff --git a/queue.c b/queue.c
index 45ce771d..ae0796c1 100644
--- a/queue.c
+++ b/queue.c
@@ -1,3 +1,5 @@
+// TODO: DA worker must not wait eternal on shutdown when in enqueue only mode!:w
+//
// TODO: we need to implement peek(), without it (today!) we lose one message upon
// worker cancellation! -- rgerhards, 2008-01-14
// TODO: think about mutDA - I think it's no longer needed
@@ -154,7 +156,12 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
RUNLOG_VAR("%d", pThis->bEnqOnly);
if(!pThis->bEnqOnly) {
if(pThis->bRunsDA) {
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
+ /* if we have not yet reached the high water mark, there is no need to start a
+ * worker. -- rgerhards, 2008-01-26
+ */
+ // WRONG: if(pThis->iQueueSize >= pThis->iHighWtrMrk) {
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
+ //}
} else {
if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
iMaxWorkers = 1;
@@ -262,6 +269,7 @@ dbgprintf("Queue %p: queueStartDA pre start\n", pThis);
wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */
pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */
+ pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */
pthread_cond_signal(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n",
@@ -980,11 +988,17 @@ RUNLOG_VAR("%d", pThis->iQueueSize);
/* ... and now the DA queue, if it exists (should always be after the primary one) */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) {
+ //TODO: use right mutex!
+ //old: if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) {
+if(pThis->pqDA != NULL) {
+RUNLOG_VAR("%p", pThis->pqDA->pWtpReg);
+RUNLOG_VAR("%d", pThis->pqDA->pWtpReg->iCurNumWrkThrd);
+}
+ if(pThis->pqDA != NULL && pThis->pqDA->pWtpReg->iCurNumWrkThrd > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n",
queueGetID(pThis));
- iRetLocal = wtpCancelAll(pThis->pWtpReg);
+ iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg);
if(iRetLocal != RS_RET_OK) {
dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel DA queue worker "
"threads, continuing, but results are unpredictable\n",
@@ -1078,23 +1092,28 @@ finalize_it:
/* cancellation cleanup handler for queueWorker ()
* Updates admin structure and frees ressources.
+ * Params:
+ * arg1 - user pointer (in this case a queue_t)
+ * arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!])
* rgerhards, 2008-01-16
*/
static rsRetVal
queueConsumerCancelCleanup(void *arg1, void *arg2)
{
+ DEFiRet;
+
queue_t *pThis = (queue_t*) arg1;
- wti_t *pWti = (wti_t*) arg2;
+ obj_t *pUsr = (obj_t*) arg2;
ISOBJ_TYPE_assert(pThis, queue);
- ISOBJ_TYPE_assert(pWti, wti);
+ ISOBJ_assert(pUsr);
dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n",
queueGetID(pThis));
/* TODO: re-enqueue the data element! */
- return RS_RET_OK;
+ RETiRet;
}
@@ -1308,8 +1327,13 @@ queueRegOnWrkrShutdown(queue_t *pThis)
{
DEFiRet;
+ ISOBJ_TYPE_assert(pThis, queue);
+
if(pThis->pqParent != NULL) {
+RUNLOG_VAR("%p", pThis->pqParent);
+RUNLOG_VAR("%p", pThis->pqParent->pWtpDA);
assert(pThis->pqParent->pWtpDA != NULL);
+ pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
}
@@ -1317,6 +1341,24 @@ queueRegOnWrkrShutdown(queue_t *pThis)
}
+/* The following function is called when a regular queue worker starts up. We need this
+ * hook to indicate in the parent queue (if we are a child) that we are not done yet.
+ */
+static rsRetVal
+queueRegOnWrkrStartup(queue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+ if(pThis->pqParent != NULL) {
+ pThis->pqParent->bChildIsDone = 0;
+ }
+
+ RETiRet;
+}
+
+
/* start up the queue - it must have been constructed and parameters defined
* before.
*/
@@ -1368,6 +1410,7 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, queueIsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, queueConsumerReg));
CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, queueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, queueRegOnWrkrStartup));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, queueRegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));