summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--queue.c72
-rw-r--r--wtp.c25
-rw-r--r--wtp.h1
3 files changed, 65 insertions, 33 deletions
diff --git a/queue.c b/queue.c
index d26a6e24..e89b35b8 100644
--- a/queue.c
+++ b/queue.c
@@ -889,6 +889,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* first calculate absolute timeout - we need the absolute value here, because we need to coordinate
* shutdown of both the regular and DA queue on *the same* timeout.
*/
+RUNLOG_VAR("%d", pThis->toQShutdown);
timeoutComp(&tTimeout, pThis->toQShutdown);
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
@@ -926,11 +927,11 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
// TODO: what about pure disk queues and bSaveOnShutdown?
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
-RUNLOG_VAR("%d", pThis->bSaveOnShutdown);
-RUNLOG_VAR("%d", pThis->bIsDA);
-RUNLOG_VAR("%d", pThis->iQueueSize);
+//RUNLOG_VAR("%d", pThis->bSaveOnShutdown);
+//RUNLOG_VAR("%d", pThis->bIsDA);
+//RUNLOG_VAR("%d", pThis->iQueueSize);
if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) {
-RUNLOG;
+//RUNLOG;
/* switch to enqueue-only mode so that no more actions happen */
if(pThis->bRunsDA == 0) {
queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
@@ -958,13 +959,18 @@ RUNLOG;
* the queue is now empty. If regular workers are still running, and try to pull the next message,
* they will automatically terminate as there no longer is any message left to process.
*/
+ // TODO: use pWtp mutex? - guess so!
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
RUNLOG_VAR("%d", pThis->iQueueSize);
- if(pThis->iQueueSize > 0) {
+ //old: if(pThis->iQueueSize > 0) {
+ if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
timeoutComp(&tTimeout, pThis->toActShutdown);
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
- if(iRetLocal != RS_RET_OK) {
+ if(iRetLocal == RS_RET_TIMED_OUT) {
+ dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and "
+ "triggers cancellation)\n", queueGetID(pThis));
+ } else if(iRetLocal != RS_RET_OK) {
dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue "
"in disk save mode. Continuing, but results are unpredictable\n",
queueGetID(pThis), iRetLocal);
@@ -980,42 +986,44 @@ RUNLOG_VAR("%d", pThis->iQueueSize);
* function is still needed (what is no problem as we do not yet destroy the queue - but I
* thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25
*/
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
-RUNLOG_VAR("%d", pThis->iQueueSize);
- if(pThis->iQueueSize > 0) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the primary queue\n",
- queueGetID(pThis));
- iRetLocal = wtpCancelAll(pThis->pWtpReg);
- if(iRetLocal != RS_RET_OK) {
- dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel primary queue worker "
- "threads, continuing, but results are unpredictable\n",
- queueGetID(pThis), iRetLocal);
- }
- } else {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the primary queue\n",
+ queueGetID(pThis));
+ iRetLocal = wtpCancelAll(pThis->pWtpReg); /* returns immediately if all threads already have terminated */
+ if(iRetLocal != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel primary queue worker "
+ "threads, continuing, but results are unpredictable\n",
+ queueGetID(pThis), iRetLocal);
}
+
+ /* TODO:
+ * If we cancelled some regular workers above, we need to think about where any "ungotten()" pUsr
+ * data elements need to go to. We need to make sure they are persisted. But this will be kept open
+ * until we finally code that part of the logic.
+ * To provide an early idea: the ungetObj() call should be a pointer. If running DA, it shall point
+ * to the DA queues ungetObj() and if we are running regular, it should point to the parent queues. The
+ * idea behind that logic is that if something is to be ungotten, it should normally go back to the top
+ * of the queue, which in that case is inside the DA queue... - but that idea needs to be verified once
+ * we reached that point.
+ * rgerhards, 2008-01-27
+ */
+
+
+ /* TODO: think: do we really need to do this here? Can't it happen on DA queue destruction? If we
+ * disable it, we get an assertion... I think this is OK, as we need to have a certain order and
+ * canceling the DA workers here ensures that order. But in any instant, we may have a look at this
+ * code after we have reaced the milestone. -- rgerhards, 2008-01-27
+ */
/* ... and now the DA queue, if it exists (should always be after the primary one) */
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- //TODO: use right mutex!
- // was used: if(pThis->pqDA != NULL && pThis->pqDA->pWtpReg->iCurNumWrkThrd > 0) {
-if(pThis->pqDA != NULL) {
-RUNLOG_VAR("%p", pThis->pqDA->pWtpReg);
-RUNLOG_VAR("%d", pThis->pqDA->pWtpReg->iCurNumWrkThrd);
-}
- if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ if(pThis->pqDA != NULL) {
dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n",
queueGetID(pThis));
- iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg);
+ iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg); /* returns immediately if all threads already have terminated */
if(iRetLocal != RS_RET_OK) {
dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel DA queue worker "
"threads, continuing, but results are unpredictable\n",
queueGetID(pThis), iRetLocal);
}
- } else {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
/* ... finally ... all worker threads have terminated :-)
diff --git a/wtp.c b/wtp.c
index 3e5fb937..817204d8 100644
--- a/wtp.c
+++ b/wtp.c
@@ -286,7 +286,6 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
/* and wait for their termination */
-dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_push(mutexCancelCleanup, &pThis->mut);
@@ -582,6 +581,30 @@ DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*));
DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*));
+/* return the current number of worker threads.
+ * TODO: atomic operation would bring a nice performance
+ * enhancemcent
+ * rgerhards, 2008-01-27
+ */
+int
+wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex)
+{
+ DEFVARS_mutexProtection;
+ int iNumWrkr;
+
+ BEGINfunc
+ ISOBJ_TYPE_assert(pThis, wtp);
+
+ BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
+ iNumWrkr = pThis->iCurNumWrkThrd;
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+
+RUNLOG_VAR("%d", iNumWrkr);
+ ENDfunc
+ return iNumWrkr;
+}
+
+
/* set the debug header message
* The passed-in string is duplicated. So if the caller does not need
* it any longer, it must free it. Must be called only before object is finalized.
diff --git a/wtp.h b/wtp.h
index 7df3166b..6100fe52 100644
--- a/wtp.h
+++ b/wtp.h
@@ -96,6 +96,7 @@ rsRetVal wtpCancelAll(wtp_t *pThis);
rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg);
rsRetVal wtpSignalWrkrTermination(wtp_t *pWtp);
rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout);
+int wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex);
PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int));