From 87f0e9b5f91407418a43a06f39831febfbd4e3ad Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 25 Jan 2008 19:25:46 +0000 Subject: disk-assisted queue mode finally begins to look good ;) --- queue.c | 220 ++++++++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 146 insertions(+), 74 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 320b3385..421fd651 100644 --- a/queue.c +++ b/queue.c @@ -62,7 +62,7 @@ DEFobjStaticHelpers /* forward-definitions */ rsRetVal queueChkPersist(queue_t *pThis); -static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly); +static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex); static int queueChkStopWrkrDA(queue_t *pThis); static int queueIsIdleDA(queue_t *pThis); static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave); @@ -81,9 +81,11 @@ static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2); * do it at some later time, because we need to destruct the DA queue. That, * however, can not be done in a thread that has been signalled * This is to be called when we revert back to our own queue. + * This function must be called with the queue mutex locked (the wti + * class ensures this). * rgerhards, 2008-01-15 */ -static inline rsRetVal +static rsRetVal queueTurnOffDAMode(queue_t *pThis) { DEFiRet; @@ -91,31 +93,69 @@ queueTurnOffDAMode(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); assert(pThis->bRunsDA); + /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need + * to wait for its startup... -- rgerhards, 2008-01-25 + */ + while(pThis->bRunsDA != 2) { + d_pthread_cond_wait(&pThis->condDAReady, pThis->mut); + } /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to * keep that comment if future need arises. */ - /* we start at least one worker thread. If no new messages come in, this will - * be the only one for the time being. I am not yet sure if that is acceptable. - * To solve that issue, queueWorker () would need to check if it needs to fire - * up addtl ones. I am not yet sure if that is justified. After all, if no new - * messages come into the queue, we may be well off with a single worker. - * rgerhards, 2008-01-16 + /* we need to check if the DA queue is empty because the DA worker may simply have + * terminated do to no new messages arriving. That does not, however, mean that the + * DA queue is empty. If there is still data in that queue, we do nothing and leave + * that for a later incarnation of this function (it will be called multiple times + * during the lifetime of DA-mode, depending on how often the DA worker receives an + * inactivity timeout. -- rgerhards, 2008-01-25 */ +RUNLOG_VAR("%p", pThis->pqDA); +RUNLOG_VAR("%d", pThis->pqDA->iQueueSize); + if(pThis->pqDA->iQueueSize == 0) { dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQueInDestr %d, NumWrkd %d\n", queueGetID(pThis), pThis->bEnqOnly,pThis->bQueueInDestruction,pThis->iCurNumWrkThrd); - // TODO: mutex? - pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ - /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, - * this will be quick. - */ - queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ + pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ + /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, + * this will be quick. + */ + queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ + dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n", + queueGetID(pThis), iRet); + } + + RETiRet; +} + + + +/* returns the number of workers that should be advised at + * this point in time. The mutex must be locked when + * ths function is called. -- rgerhards, 2008-01-25 + */ +static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) +{ + DEFiRet; + int iMaxWorkers; - dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n", - queueGetID(pThis), iRet); + ISOBJ_TYPE_assert(pThis, queue); + +RUNLOG_VAR("%d", pThis->bEnqOnly); + if(!pThis->bEnqOnly) { + if(pThis->bRunsDA) { + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ + } else { + if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; + } else { + iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1; + } + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ + } + } RETiRet; } @@ -151,7 +191,10 @@ RUNLOG_VAR("%s", pThis->pszFilePrefix); * chore of this function is to create the DA queue object. If that function fails, * the DA worker should return with an appropriate state, which in turn should lead to * a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this - * function is called, else a race on pThis->bRunsDA may happen. + * function is called, else a number of races will happen. + * Please note that this function may be called *while* we in DA mode. This is due to the + * fact that the DA worker calls it and the DA worker may be suspended (and restarted) due + * to inactivity timeouts. * rgerhards, 2008-01-15 */ static rsRetVal @@ -161,6 +204,9 @@ queueStartDA(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); + if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */ + FINALIZE; /* ... then we are already done! */ + /* set up sync objects */ pthread_mutex_init(&pThis->mutDA, NULL); pthread_cond_init(&pThis->condDA, NULL); @@ -180,7 +226,7 @@ dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pT CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); - CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly)); + CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); if(pThis->toQShutdown == 0) { @@ -207,7 +253,8 @@ dbgprintf("Queue %p: queueStartDA pre start\n", pThis); */ wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */ - pThis->bRunsDA = 1; /* we are now in DA mode! */ + pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */ + pthread_cond_signal(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n", queueGetID(pThis), queueGetID(pThis->pqDA)); @@ -248,7 +295,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) * rgerhards, 2008-01-24 */ if(pThis->pWtpDA == NULL) { - lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx/DA", (unsigned long) pThis); + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx:DA", (unsigned long) pThis); CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, queueChkStopWrkrDA)); @@ -273,8 +320,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) * that will also start one up. If we forgot that step, everything would be stalled * until the next enqueue request. */ - if(pThis->bEnqOnly == 0) - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* one worker only for disk queues! */ + queueAdviseMaxWorkers(pThis); finalize_it: END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -283,7 +329,9 @@ finalize_it: /* check if we need to start disk assisted mode and send some signals to - * keep it running if we are already in it. + * keep it running if we are already in it. It also checks if DA mode is + * partially initialized, in which case it waits for initialization to + * complete. * rgerhards, 2008-01-14 */ static inline rsRetVal @@ -310,7 +358,7 @@ dbgprintf("Queue %p: chkStartDA\n", pThis); */ dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", queueGetID(pThis), pThis->iQueueSize); - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* run again [see comment above] ;) */ + queueAdviseMaxWorkers(pThis); } else { /* this is the case when we are currently not running in DA mode. So it is time * to turn it back on. @@ -779,7 +827,6 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) { DEFiRet; DEFVARS_mutexProtection; - int i; struct timespec tTimeout; rsRetVal iRetLocal; @@ -788,15 +835,20 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis)); // TODO: reminder, delte after testing: do we need to modify the high wtr mark? I dont' think so 2008-01-25 + /* we reduce the low water mark in any case. This is not absolutely necessary, but + * it is useful because we enable DA mode at several spots below and so we do not need + * to think about the low water mark each time. + */ + pThis->iLowWtrMrk = 0; + /* first try to shutdown the queue within the regular shutdown period */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ if(pThis->iQueueSize > 0) { if(pThis->bRunsDA) { - /* worker threads may be inactive after reaching low water - * mark. Lower the mark and react workers. + /* We may have waited on the low water mark. As it may have changed, we + * see if we reactivate the worker. */ - pThis->iLowWtrMrk = 0; - wtpAdviseMaxWorkers(pThis->pWtpReg, 1); + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); } } END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -843,17 +895,22 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) } /* when we reach this point, both queues are either empty or the regular queue shutdown timeout - * has expired. Now we need to check if we areconfigured to not loose messages. If so, we need + * has expired. Now we need to check if we are configured to not loose messages. If so, we need * to persist the queue to disk (this is only possible if the queue is DA-enabled). */ +// TODO: what about pure disk queues and bSaveOnShutdown? BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ +RUNLOG_VAR("%d", pThis->bSaveOnShutdown); +RUNLOG_VAR("%d", pThis->bIsDA); +RUNLOG_VAR("%d", pThis->iQueueSize); if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) { +RUNLOG; /* switch to enqueue-only mode so that no more actions happen */ if(pThis->bRunsDA == 0) { queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ } else { - queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* switch to enqueue-only mode */ + queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */ } END_MTX_PROTECTED_OPERATIONS(pThis->mut); /* make sure we do not timeout before we are done */ @@ -870,10 +927,12 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) END_MTX_PROTECTED_OPERATIONS(pThis->mut); } +RUNLOG; /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we * can now request immediate shutdown of any remaining workers. */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ +RUNLOG_VAR("%d", pThis->iQueueSize); if(pThis->iQueueSize > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); @@ -895,9 +954,10 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) * thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25 */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ +RUNLOG_VAR("%d", pThis->iQueueSize); if(pThis->iQueueSize > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); - dbgprintf("Queue 0x%lx: primary queue worker threads could not be shutdown, now canceling them\n", + dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the primary queue\n", queueGetID(pThis)); iRetLocal = wtpCancelAll(pThis->pWtpReg); if(iRetLocal != RS_RET_OK) { @@ -913,7 +973,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); - dbgprintf("Queue 0x%lx: DA worker threads could not be shutdown, now canceling them\n", + dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n", queueGetID(pThis)); iRetLocal = wtpCancelAll(pThis->pWtpReg); if(iRetLocal != RS_RET_OK) { @@ -948,8 +1008,6 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, DEFiRet; queue_t *pThis; -int *pBoom = NULL; -//*pBoom = 'A'; assert(ppThis != NULL); assert(pConsumer != NULL); assert(iWorkerThreads >= 0); @@ -1196,7 +1254,11 @@ static int queueIsIdleDA(queue_t *pThis) { /* remember: iQueueSize is the DA queue size, not the main queue! */ - return (pThis->iQueueSize == 0); +RUNLOG_VAR("%d", pThis->iLowWtrMrk); +dbgprintf("queueIsIdleDA(%p) returns %d, qsize %d\n", pThis, pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk), pThis->iQueueSize); + //// TODO: I think we need just a single function... + //return (pThis->iQueueSize == 0); + return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk)); } /* must only be called when the queue mutex is locked, else results * are not stable! Regular queue version @@ -1235,6 +1297,7 @@ dbgprintf("Queue %p: I am child, use mutex %p\n", pThis, pThis->pqParent->mut); } pthread_mutex_init(&pThis->mutThrdMgmt, NULL); + pthread_cond_init (&pThis->condDAReady, NULL); pthread_cond_init (&pThis->notFull, NULL); pthread_cond_init (&pThis->notEmpty, NULL); dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); @@ -1242,8 +1305,8 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ - dbgprintf("Queue 0x%lx: type %d, enq-only %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis), - pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize); + dbgprintf("Queue 0x%lx: type %d, enq-only %d, disk assisted %d, maxFileSz %ld, qsize %d starting\n", queueGetID(pThis), + pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iQueueSize); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ @@ -1251,7 +1314,7 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); /* create worker thread pools for regular operation. The DA pool is created on an as-needed * basis, which potentially means never under most circumstances. */ - lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx/Reg", (unsigned long) pThis); + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx:Reg", (unsigned long) pThis); CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, queueChkStopWrkrReg)); @@ -1265,20 +1328,18 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); CHKiRet(wtpConstructFinalize (pThis->pWtpReg)); /* initialize worker thread instances */ +RUNLOG_VAR("%d", pThis->bIsDA); if(pThis->bIsDA) { /* If we are disk-assisted, we need to check if there is a QIF file * which we need to load. -- rgerhards, 2008-01-15 */ iRetLocal = queueHaveQIF(pThis); +RUNLOG_VAR("%d", iRetLocal); if(iRetLocal == RS_RET_OK) { dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n", queueGetID(pThis)); - +RUNLOG; queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ - /* we need to start the DA worker thread so that messages will be processed. So - * we advise the worker pool there is at least one needed. The wtp does the rest... - */ - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); bInitialized = 1; /* we are done */ } else { // TODO: use logerror? -- rgerhards, 2008-01-16 @@ -1287,11 +1348,17 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); } } +RUNLOG_VAR("%d", bInitialized); if(!bInitialized) { - dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis)); - /* we do not fire up any worker threads here, this happens automatically when they are needed */ - // TODO: preforked workers? queueStrtAllWrkThrds(pThis); + dbgprintf("Queue 0x%lx: queue starts up without (loading) any DA disk state (this is normal for the DA " + "queue itself!)\n", queueGetID(pThis)); } + + /* if the queue already contains data, we need to start the correct number of worker threads. This can be + * the case when a disk queue has been loaded. If we did not start it here, it would never start. + */ + queueAdviseMaxWorkers(pThis); + pThis->bQueueStarted = 1; finalize_it: @@ -1411,12 +1478,21 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ /* shut down all workers (handles *all* of the persistence logic) */ - queueShutdownWorkers(pThis); + if(!pThis->bEnqOnly) /* in enque-only mode, we have no worker pool! */ + queueShutdownWorkers(pThis); +RUNLOG; - /* finally destruct our (regular) worker thread pool */ - if(pThis->qType != QUEUETYPE_DIRECT) { + /* finally destruct our (regular) worker thread pool + * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen, + * e.g. when they are not created in enqueue-only mode. We already check the condition + * as this may otherwise be very hard to find once we optimize (and have long forgotten + * about this condition here ;) + * rgerhards, 2008-01-25 + */ + if(pThis->qType != QUEUETYPE_DIRECT && pThis->pWtpReg != NULL) { wtpDestruct(&pThis->pWtpReg); } +RUNLOG; /* Now check if we actually have a DA queue and, if so, destruct it. * Note that the wtp must be destructed first, it may be in cancel cleanup handler @@ -1424,10 +1500,16 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove * data (re-queueing case). So we need to destruct the wtp first, which will make * sure all workers have terminated. */ +RUNLOG_VAR("%p", pThis->pWtpDA); if(pThis->pWtpDA != NULL) { +RUNLOG; wtpDestruct(&pThis->pWtpDA); +RUNLOG_VAR("%p", pThis->pqDA); + } + if(pThis->pqDA != NULL) { queueDestruct(&pThis->pqDA); } +RUNLOG; /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) * This handler is most important for disk queues, it will finally persist the necessary @@ -1447,6 +1529,7 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove free(pThis->mut); } pthread_mutex_destroy(&pThis->mutThrdMgmt); + pthread_cond_destroy(&pThis->condDAReady); pthread_cond_destroy(&pThis->notFull); pthread_cond_destroy(&pThis->notEmpty); @@ -1522,7 +1605,6 @@ queueEnqObj(queue_t *pThis, void *pUsr) { DEFiRet; int iCancelStateSave; - int iMaxWorkers; int i; struct timespec t; @@ -1530,6 +1612,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) // TODO: check if queue is terminating and if so either discard message or enqeue it to the DA queue *directly* dbgprintf("Queue %p: EnqObj() 1\n", pThis); +RUNLOG_VAR("%d", pThis->bRunsDA); /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE * during its execution. If that is not done, race conditions occur if the @@ -1544,25 +1627,10 @@ dbgprintf("Queue %p: EnqObj() 1\n", pThis); /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); -dbgprintf("Queue %p: EnqObj() 10\n", pThis); /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) CHKiRet(queueChkStrtDA(pThis)); -RUNLOG_VAR("%d", pThis->bIsDA); - /* make sure at least one worker is running. */ - if(pThis->bRunsDA) { -RUNLOG; - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ - } else { - if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { - iMaxWorkers = 1; - } else { - iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1; - } -RUNLOG; - wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); - } /* wait for the queue to be ready... */ while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { @@ -1576,6 +1644,8 @@ RUNLOG; } /* and finally enqueue the message */ +RUNLOG_VAR("%p", pThis); +RUNLOG_VAR("%d", pThis->bRunsDA); CHKiRet(queueAdd(pThis, pUsr)); queueChkPersist(pThis); @@ -1587,6 +1657,8 @@ finalize_it: pthread_setcancelstate(iCancelStateSave, NULL); } + /* make sure at least one worker is running. */ + queueAdviseMaxWorkers(pThis); RETiRet; } @@ -1601,10 +1673,10 @@ finalize_it: * rgerhards, 2008-01-16 */ static rsRetVal -queueSetEnqOnly(queue_t *pThis, int bEnqOnly) +queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex) { DEFiRet; - int iCancelStateSave; + DEFVARS_mutexProtection; ISOBJ_TYPE_assert(pThis, queue); @@ -1612,8 +1684,7 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly) * called, so that doesn't matter... -- rgerhards, 2008-01-16 */ if(pThis->mut != NULL) { - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->mut); + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex); } if(bEnqOnly == pThis->bEnqOnly) @@ -1626,8 +1697,10 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly) /* this means we need to terminate all workers - that's it... */ dbgprintf("Queue 0x%lx: switching to enqueue-only mode, terminating all worker threads\n", queueGetID(pThis)); - wtpWakeupAllWrkr(pThis->pWtpDA); - wtpWakeupAllWrkr(pThis->pWtpReg); + if(pThis->pWtpReg != NULL) + wtpWakeupAllWrkr(pThis->pWtpReg); + if(pThis->pWtpDA != NULL) + wtpWakeupAllWrkr(pThis->pWtpDA); } else { /* switch back to regular mode */ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */ @@ -1638,8 +1711,7 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly) finalize_it: if(pThis->mut != NULL) { - d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } RETiRet; } -- cgit