diff options
-rw-r--r-- | queue.c | 72 | ||||
-rw-r--r-- | wtp.c | 25 | ||||
-rw-r--r-- | wtp.h | 1 |
3 files changed, 65 insertions, 33 deletions
@@ -889,6 +889,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* first calculate absolute timeout - we need the absolute value here, because we need to coordinate * shutdown of both the regular and DA queue on *the same* timeout. */ +RUNLOG_VAR("%d", pThis->toQShutdown); timeoutComp(&tTimeout, pThis->toQShutdown); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { @@ -926,11 +927,11 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) // TODO: what about pure disk queues and bSaveOnShutdown? BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ -RUNLOG_VAR("%d", pThis->bSaveOnShutdown); -RUNLOG_VAR("%d", pThis->bIsDA); -RUNLOG_VAR("%d", pThis->iQueueSize); +//RUNLOG_VAR("%d", pThis->bSaveOnShutdown); +//RUNLOG_VAR("%d", pThis->bIsDA); +//RUNLOG_VAR("%d", pThis->iQueueSize); if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) { -RUNLOG; +//RUNLOG; /* switch to enqueue-only mode so that no more actions happen */ if(pThis->bRunsDA == 0) { queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ @@ -958,13 +959,18 @@ RUNLOG; * the queue is now empty. If regular workers are still running, and try to pull the next message, * they will automatically terminate as there no longer is any message left to process. */ + // TODO: use pWtp mutex? - guess so! BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ RUNLOG_VAR("%d", pThis->iQueueSize); - if(pThis->iQueueSize > 0) { + //old: if(pThis->iQueueSize > 0) { + if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); timeoutComp(&tTimeout, pThis->toActShutdown); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); - if(iRetLocal != RS_RET_OK) { + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and " + "triggers cancellation)\n", queueGetID(pThis)); + } else if(iRetLocal != RS_RET_OK) { dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue " "in disk save mode. Continuing, but results are unpredictable\n", queueGetID(pThis), iRetLocal); @@ -980,42 +986,44 @@ RUNLOG_VAR("%d", pThis->iQueueSize); * function is still needed (what is no problem as we do not yet destroy the queue - but I * thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25 */ - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ -RUNLOG_VAR("%d", pThis->iQueueSize); - if(pThis->iQueueSize > 0) { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the primary queue\n", - queueGetID(pThis)); - iRetLocal = wtpCancelAll(pThis->pWtpReg); - if(iRetLocal != RS_RET_OK) { - dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel primary queue worker " - "threads, continuing, but results are unpredictable\n", - queueGetID(pThis), iRetLocal); - } - } else { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the primary queue\n", + queueGetID(pThis)); + iRetLocal = wtpCancelAll(pThis->pWtpReg); /* returns immediately if all threads already have terminated */ + if(iRetLocal != RS_RET_OK) { + dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel primary queue worker " + "threads, continuing, but results are unpredictable\n", + queueGetID(pThis), iRetLocal); } + + /* TODO: + * If we cancelled some regular workers above, we need to think about where any "ungotten()" pUsr + * data elements need to go to. We need to make sure they are persisted. But this will be kept open + * until we finally code that part of the logic. + * To provide an early idea: the ungetObj() call should be a pointer. If running DA, it shall point + * to the DA queues ungetObj() and if we are running regular, it should point to the parent queues. The + * idea behind that logic is that if something is to be ungotten, it should normally go back to the top + * of the queue, which in that case is inside the DA queue... - but that idea needs to be verified once + * we reached that point. + * rgerhards, 2008-01-27 + */ + + + /* TODO: think: do we really need to do this here? Can't it happen on DA queue destruction? If we + * disable it, we get an assertion... I think this is OK, as we need to have a certain order and + * canceling the DA workers here ensures that order. But in any instant, we may have a look at this + * code after we have reaced the milestone. -- rgerhards, 2008-01-27 + */ /* ... and now the DA queue, if it exists (should always be after the primary one) */ - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - //TODO: use right mutex! - // was used: if(pThis->pqDA != NULL && pThis->pqDA->pWtpReg->iCurNumWrkThrd > 0) { -if(pThis->pqDA != NULL) { -RUNLOG_VAR("%p", pThis->pqDA->pWtpReg); -RUNLOG_VAR("%d", pThis->pqDA->pWtpReg->iCurNumWrkThrd); -} - if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + if(pThis->pqDA != NULL) { dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n", queueGetID(pThis)); - iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg); + iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg); /* returns immediately if all threads already have terminated */ if(iRetLocal != RS_RET_OK) { dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel DA queue worker " "threads, continuing, but results are unpredictable\n", queueGetID(pThis), iRetLocal); } - } else { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); } /* ... finally ... all worker threads have terminated :-) @@ -286,7 +286,6 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); /* and wait for their termination */ -dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pThis->mut); pthread_cleanup_push(mutexCancelCleanup, &pThis->mut); @@ -582,6 +581,30 @@ DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)); DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*)); +/* return the current number of worker threads. + * TODO: atomic operation would bring a nice performance + * enhancemcent + * rgerhards, 2008-01-27 + */ +int +wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex) +{ + DEFVARS_mutexProtection; + int iNumWrkr; + + BEGINfunc + ISOBJ_TYPE_assert(pThis, wtp); + + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); + iNumWrkr = pThis->iCurNumWrkThrd; + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + +RUNLOG_VAR("%d", iNumWrkr); + ENDfunc + return iNumWrkr; +} + + /* set the debug header message * The passed-in string is duplicated. So if the caller does not need * it any longer, it must free it. Must be called only before object is finalized. @@ -96,6 +96,7 @@ rsRetVal wtpCancelAll(wtp_t *pThis); rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg); rsRetVal wtpSignalWrkrTermination(wtp_t *pWtp); rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout); +int wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex); PROTOTYPEObjClassInit(wtp); PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)); |