summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--queue.c226
-rw-r--r--queue.h3
2 files changed, 181 insertions, 48 deletions
diff --git a/queue.c b/queue.c
index c670c8ce..864f4711 100644
--- a/queue.c
+++ b/queue.c
@@ -56,6 +56,22 @@
/* static data */
DEFobjStaticHelpers
+/* debug aides */
+#if 1
+#define d_pthread_mutex_lock(x) {dbgprintf("mutex %p lock file %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \
+ pthread_mutex_lock(x); \
+ if(0)dbgprintf("mutex %p lock aquired file %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \
+ }
+#define d_pthread_mutex_unlock(x) {dbgprintf("mutex %p UNlock file %s, %s(), line %d\n", x ,__FILE__, __func__, __LINE__);\
+ pthread_mutex_unlock(x); \
+ if(0)dbgprintf("mutex %p UNlock done file %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \
+ }
+#else
+#define d_pthread_mutex_lock(x) pthread_mutex_lock(x)
+#define d_pthread_mutex_unlock(x) pthread_mutex_unlock(x)
+#endif
+
+
/* forward-definitions */
rsRetVal queueChkPersist(queue_t *pThis);
static void *queueWorker(void *arg);
@@ -72,10 +88,38 @@ static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly);
static void queueMutexCleanup(void *arg)
{
assert(arg != NULL);
- pthread_mutex_unlock((pthread_mutex_t*) arg);
+ d_pthread_mutex_unlock((pthread_mutex_t*) arg);
}
+void test1(void) {}
+/* method to lock a mutex in a cancel-safe way. A cancel handler
+ * is pushed that unlocks the mutex it when thread is cancelled.
+ * rgerhards, 2008-01-19
+ */
+#define queueMutexCnclSaveLock(pMut) \
+do { \
+ int iCancelStateSave; \
+ \
+ assert(pMut != NULL); \
+ \
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \
+ d_pthread_mutex_lock(pMut); \
+ pthread_cleanup_push(queueMutexCleanup, pMut); \
+ pthread_setcancelstate(iCancelStateSave, NULL); \
+} while(0)\
+
+void test(void) {}
+/* method to unlock a mutex that was locked via queueMutexWithCnclSaveLock ()
+ * note that a push/pop interface is used, so they must be called in the respective
+ * order. -- rgerhards, 2008-01-19
+ */
+#define queueMutexWithCnclSaveUnlock() \
+do { \
+ pthread_cleanup_pop(1); \
+} while(0) \
+
+
/* get the current worker state. For simplicity and speed, we have
* NOT used our regular calling interface this time. I hope that won't
* bite in the long term... -- rgerhards, 2008-01-17
@@ -88,6 +132,43 @@ qWrkrGetState(qWrkThrd_t *pThis)
}
+/* indicate worker thread startup
+ * (it would be best if we could do this with an atomic operation)
+ * rgerhards, 2008-01-19
+ */
+static inline void
+queueWrkrThrdStartupIndication(queue_t *pThis)
+{
+ int iCancelStateSave;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(&pThis->mutThrdMgmt);
+ pThis->iCurNumWrkThrd++;
+ d_pthread_mutex_unlock(&pThis->mutThrdMgmt);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+}
+
+
+/* indicate worker thread shutdown
+ * (it would be best if we could do this with an atomic operation)
+ * rgerhards, 2008-01-19
+ */
+static inline void
+queueWrkrThrdShutdownIndication(queue_t *pThis)
+{
+ int iCancelStateSave;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(&pThis->mutThrdMgmt);
+ pThis->iCurNumWrkThrd--;
+ pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
+ d_pthread_mutex_unlock(&pThis->mutThrdMgmt);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+}
+
+
/* send a command to a specific thread
*/
static rsRetVal
@@ -193,14 +274,14 @@ qWrkrWaitStartup(qWrkThrd_t *pThis)
assert(pThis != NULL);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- pthread_mutex_lock(pThis->pQueue->mut);
+ d_pthread_mutex_lock(pThis->pQueue->mut);
if((pThis->tCurrCmd == eWRKTHRD_RUN_CREATED) || (pThis->tCurrCmd == eWRKTHRD_RUN_CREATED)) {
dbgprintf("Queue 0x%lx: waiting on worker thread %d startup\n", queueGetID(pThis->pQueue),
pThis->iThrd);
pthread_cond_wait(&pThis->condInitDone, pThis->pQueue->mut);
dbgprintf("worker startup done!\n");
}
- pthread_mutex_unlock(pThis->pQueue->mut);
+ d_pthread_mutex_unlock(pThis->pQueue->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
return RS_RET_OK;
@@ -313,7 +394,7 @@ queueStrtDAWrkr(queue_t *pThis)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
dbgprintf("Queue %p: DAWrkr thread mutex lock\n", pThis);
- pthread_mutex_lock(&pThis->pWrkThrds[0].mut);
+ d_pthread_mutex_lock(&pThis->pWrkThrds[0].mut);
pthread_cleanup_push(queueMutexCleanup, &pThis->pWrkThrds[0].mut);
pthread_setcancelstate(iCancelStateSave, NULL);
if(pThis->pWrkThrds[0].tCurrCmd == eWRKTHRD_STOPPED) {
@@ -447,6 +528,7 @@ static inline rsRetVal
queueChkAndStrtWrk(queue_t *pThis)
{
DEFiRet;
+ int iCancelStateSave;
ISOBJ_TYPE_assert(pThis, queue);
@@ -456,6 +538,11 @@ queueChkAndStrtWrk(queue_t *pThis)
if(pThis->bEnqOnly == 1)
FINALIZE; /* in enqueue-only mode we have no workers */
+ //queueMutexCnclSaveLock(&pThis->mutThrdMgmt);
+pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+d_pthread_mutex_lock(&pThis->mutThrdMgmt);
+pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt);
+pthread_setcancelstate(iCancelStateSave, NULL);
/* check if we need to start up another worker */
if(pThis->qRunsDA == QRUNS_REGULAR) {
if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) {
@@ -477,6 +564,8 @@ dbgprintf("Queue %p: DA worker is no longer running, restarting, qsize %d, worke
iRet = queueStrtDAWrkr(pThis);
}
}
+ //queueMutexCnclSaveUnlock(&pThis->mutThrdMgmt); // TODO: move to finalize_it, but needs a conditon
+pthread_cleanup_pop(1);
finalize_it:
return iRet;
@@ -517,6 +606,7 @@ queueTurnOffDAMode(queue_t *pThis)
dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQueInDestr %d, NumWrkd %d\n",
queueGetID(pThis),
pThis->bEnqOnly,pThis->bQueueInDestruction,pThis->iCurNumWrkThrd);
+ // TODO: think about this code - there is a race
if(pThis->bEnqOnly == 0 && pThis->bQueueInDestruction == 0 && pThis->iCurNumWrkThrd < 2)
queueStrtNewWrkThrd(pThis);
pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */
@@ -619,8 +709,10 @@ queueStrtDA(queue_t *pThis)
pthread_cond_init(&pThis->condDA, NULL);
/* create message queue */
+dbgprintf("Queue %p: queueSTrtDA pre child queue construct,\n", pThis);
CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
+dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pThis->pqDA);
/* as the created queue is the same object class, we take the
* liberty to access its properties directly.
*/
@@ -629,18 +721,25 @@ queueStrtDA(queue_t *pThis)
pThis->pqDA->condSignalOnEmpty2 = pThis->notEmpty;
pThis->pqDA->bSignalOnEmpty = 2;
pThis->pqDA->pqParent = pThis;
+dbgprintf("Queue %p: queueSTrtDA after assign\n", pThis);
CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
+dbgprintf("Queue %p: queueSTrtDA 10\n", pThis);
CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly));
+dbgprintf("Queue %p: queueSTrtDA 15\n", pThis);
CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
+dbgprintf("Queue %p: queueSTrtDA 20\n", pThis);
CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
+dbgprintf("Queue %p: queueSTrtDA 25\n", pThis);
if(pThis->toQShutdown == 0) {
+dbgprintf("Queue %p: queueSTrtDA 30a\n", pThis);
CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
} else {
+dbgprintf("Queue %p: queueSTrtDA 30b\n", pThis);
/* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND
* have an obviously large backlog, we can't finish it in any case. So there is no point
* in holding shutdown longer than necessary. -- rgerhards, 2008-01-15
@@ -648,6 +747,7 @@ queueStrtDA(queue_t *pThis)
CHKiRet(queueSettoQShutdown(pThis->pqDA, 1));
}
+dbgprintf("Queue %p: queueSTrtDA pre start\n", pThis);
iRet = queueStart(pThis->pqDA);
/* file not found is expected, that means it is no previous QIF available */
if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
@@ -737,9 +837,9 @@ queueChkStrtDA(queue_t *pThis)
dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n",
queueGetID(pThis), pThis->iQueueSize);
//pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- //pthread_mutex_lock(&pThis->mutDA);
+ //d_pthread_mutex_lock(&pThis->mutDA);
pthread_cond_signal(&pThis->condDA);
- //pthread_mutex_unlock(&pThis->mutDA);
+ //d_pthread_mutex_unlock(&pThis->mutDA);
//pthread_setcancelstate(iCancelStateSave, NULL);
queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */
}
@@ -1235,6 +1335,7 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
DEFiRet;
int bTimedOut;
struct timespec t;
+ int iCancelStateSave;
queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */
queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
@@ -1242,18 +1343,24 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
queueTimeoutComp(&t, iTimeout);/* get timeout */
/* and wait for their termination */
- pthread_mutex_lock(pThis->mut);
+dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt);
+ //queueMutexCnclSaveLock(&pThis->mutThrdMgmt);
+pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+d_pthread_mutex_lock(&pThis->mutThrdMgmt);
+pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt);
+pthread_setcancelstate(iCancelStateSave, NULL);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n",
queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd);
- if(pthread_cond_timedwait(&pThis->condThrdTrm, pThis->mut, &t) != 0) {
+ if(pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutThrdMgmt, &t) != 0) {
dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", queueGetID(pThis));
bTimedOut = 1; /* we exit the loop on timeout */
}
}
- pthread_mutex_unlock(pThis->mut);
+// queueMutexCnclSaveUnlock(&pThis->mutThrdMgmt);
+pthread_cleanup_pop(1);
if(bTimedOut)
iRet = RS_RET_TIMED_OUT;
@@ -1374,7 +1481,7 @@ dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, pWrkrInst->iT
*/
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
dbgprintf("pre mutex lock (think about CLEANUP!)\n");
- pthread_mutex_lock(pThis->mut);
+ d_pthread_mutex_lock(pThis->mut);
pthread_cleanup_push(queueMutexCleanup, pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
dbgprintf("mutex locked (think about CLEANUP!)\n");
@@ -1420,7 +1527,7 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancel
iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */
pWrkrInst->pUsr = pUsr; /* save it for the cancel cleanup handler */
qRunsDA = pThis->qRunsDA;
- pthread_mutex_unlock(pThis->mut);
+ d_pthread_mutex_unlock(pThis->mut);
pthread_cond_signal(pThis->notFull);
pthread_setcancelstate(iCancelStateSave, NULL);
/* WE ARE NO LONGER PROTECTED FROM THE MUTEX */
@@ -1488,14 +1595,14 @@ static void queueWorkerCancelCleanup(void *arg)
dbgprintf("Queue 0x%lx/w%d: cancelation cleanup handler called (NOT FULLY IMPLEMENTED, one msgs lost!)\n",
queueGetID(pThis), pWrkrInst->iThrd);
- pThis->iCurNumWrkThrd--; /* one worker less... */
- pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
qWrkrSetState(&pThis->pWrkThrds[pWrkrInst->iThrd], eWRKTHRD_TERMINATING);
pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
/* TODO: re-enqueue the data element! */
dbgprintf("Queue 0x%lx/w%d: thread CANCELED with %d entries left in queue, %d workers running.\n",
- queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize, pThis->iCurNumWrkThrd);
+ queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize, pThis->iCurNumWrkThrd - 1);
+
+ queueWrkrThrdShutdownIndication(pThis); // TODO: move above debug message into this function!
}
@@ -1555,16 +1662,17 @@ queueWorker(void *arg)
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
+ queueWrkrThrdStartupIndication(pThis);
+
/* do some one-time initialization */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- pthread_mutex_lock(pThis->mut);
+ d_pthread_mutex_lock(pThis->mut);
/* initialize our thread instance descriptor */
qWrkrInit(&pWrkrInst, pThis);
iMyThrdIndx = pWrkrInst->iThrd;
- pThis->iCurNumWrkThrd++; /* tell the world there is one more worker */
dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx);
dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA);
@@ -1584,7 +1692,7 @@ dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA);
pthread_cleanup_push(queueWorkerCancelCleanup, pWrkrInst);
- pthread_mutex_unlock(pThis->mut);
+ d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
/* end one-time stuff */
@@ -1595,13 +1703,13 @@ dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA);
dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n",
queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- pthread_mutex_lock(pThis->mut);
+ d_pthread_mutex_lock(pThis->mut);
/* process any pending thread requests */
queueChkWrkThrdChanges(pThis);
if((bContinueRun = queueWorkerRemainActive(pThis, pWrkrInst)) == 0) {
- pthread_mutex_unlock(pThis->mut);
+ d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
continue; /* and break loop */
}
@@ -1623,11 +1731,7 @@ dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n",
if(pThis->bSignalOnEmpty > 0) {
/* we need to signal our parent queue that we are empty */
dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx);
- //pthread_mutex_lock(pThis->mutSignalOnEmpty); // TODO: this was commented out
- pthread_mutex_lock(pThis->pqParent->mut); // TODO: this was commented out
pthread_cond_signal(pThis->condSignalOnEmpty);
- //pthread_mutex_unlock(pThis->mutSignalOnEmpty); // TODO: this was commented out
- pthread_mutex_unlock(pThis->pqParent->mut); // TODO: this was commented out
dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx);
}
if(pThis->bSignalOnEmpty > 1) {
@@ -1656,7 +1760,7 @@ dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n",
}
}
dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx);
- pthread_mutex_unlock(pThis->mut);
+ d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
pthread_testcancel(); /* see big comment below */
pthread_yield(); /* see big comment below */
@@ -1692,16 +1796,15 @@ dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n",
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- pthread_mutex_lock(pThis->mut);
+dbgprintf("Queue %p: worker waiting for mutex\n", pThis);
+ d_pthread_mutex_lock(&pThis->mutThrdMgmt);
/* check if we are the DA worker and, if so, switch back to regular mode */
if(pWrkrInst->iThrd == 0) {
queueTurnOffDAMode(pThis);
}
-
- pThis->iCurNumWrkThrd--;
- pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
pthread_cleanup_pop(0); /* remove cleanup handler */
+ pThis->iCurNumWrkThrd--; /* one less ;) */
/* if we ever need finalize_it, here would be the place for it! */
if(qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN ||
qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN_IMMEDIATE ||
@@ -1714,7 +1817,8 @@ dbgprintf("Queue 0x%lx/w%d: setting termination state\n", queueGetID(pThis), iMy
qWrkrSetState(pWrkrInst, eWRKTHRD_TERMINATING);
}
pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
- pthread_mutex_unlock(pThis->mut);
+ pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
+ d_pthread_mutex_unlock(&pThis->mutThrdMgmt);
pthread_setcancelstate(iCancelStateSave, NULL);
pthread_exit(0);
@@ -1750,12 +1854,6 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
pThis->iQueueSize = 0;
pThis->iMaxQueueSize = iMaxQueueSize;
pThis->pConsumer = pConsumer;
- pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
- pthread_mutex_init (pThis->mut, NULL);
- pThis->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
- pthread_cond_init (pThis->notFull, NULL);
- pThis->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
- pthread_cond_init (pThis->notEmpty, NULL);
pThis->iNumWorkerThreads = iWorkerThreads;
pThis->pszFilePrefix = NULL;
@@ -1809,6 +1907,26 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
assert(pThis != NULL);
+ /* finalize some initializations that could not yet be done because it is
+ * influenced by properties which might have been set after queueConstruct ()
+ */
+ if(pThis->pqParent == NULL) {
+dbgprintf("Queue %p: no parent, alloc mutex\n", pThis);
+ pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
+ pthread_mutex_init (pThis->mut, NULL);
+ } else {
+ /* child queue, we need to use parent's mutex */
+ pThis->mut = pThis->pqParent->mut;
+dbgprintf("Queue %p: I am child, use mutex %p\n", pThis, pThis->pqParent->mut);
+ }
+
+ pthread_mutex_init(&pThis->mutThrdMgmt, NULL);
+ pThis->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
+ pthread_cond_init (pThis->notFull, NULL);
+ pThis->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
+ pthread_cond_init (pThis->notEmpty, NULL);
+dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
+
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
@@ -1997,13 +2115,13 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
// We need to startup a worker if we are in non-DA mode and the queue is not empty and not in enque-only mode */
dbgprintf("Queue %p: queueDestruct probing if any regular workers need to be started, CurWrkr %d, qsize %d, qRunsDA %d\n",
pThis, pThis->iCurNumWrkThrd, pThis->iQueueSize, pThis->qRunsDA);
- pthread_mutex_lock(pThis->mut);
+ d_pthread_mutex_lock(pThis->mut);
dbgprintf("queueDestruct mutex locked\n");
if(pThis->iCurNumWrkThrd == 0 && pThis->iQueueSize > 0 && !pThis->bEnqOnly) {
dbgprintf("Queue %p: queueDestruct must start regular workers!\n", pThis);
queueStrtNewWrkThrd(pThis);
}
- pthread_mutex_unlock(pThis->mut);
+ d_pthread_mutex_unlock(pThis->mut);
dbgprintf("queueDestruct mutex unlocked\n");
/* wait again in case a new worker was started */
@@ -2029,8 +2147,13 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
free(pThis->pWrkThrds);
pThis->pWrkThrds = NULL;
}
- pthread_mutex_destroy(pThis->mut);
- free(pThis->mut);
+
+ if(pThis->pqParent == NULL) {
+ /* if we are not a child, we allocated our own mutex, which we now need to destroy */
+ pthread_mutex_destroy(pThis->mut);
+ free(pThis->mut);
+ }
+ pthread_mutex_destroy(&pThis->mutThrdMgmt);
pthread_cond_destroy(pThis->notFull);
free(pThis->notFull);
pthread_cond_destroy(pThis->notEmpty);
@@ -2122,7 +2245,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
*/
if(pThis->pWrkThrds != NULL) {
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- pthread_mutex_lock(pThis->mut);
+ d_pthread_mutex_lock(pThis->mut);
}
/* process any pending thread requests */
@@ -2166,7 +2289,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
finalize_it:
/* now awake sleeping worker threads */
if(pThis->pWrkThrds != NULL) {
- pthread_mutex_unlock(pThis->mut);
+ d_pthread_mutex_unlock(pThis->mut);
i = pthread_cond_signal(pThis->notEmpty);
dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -2178,6 +2301,11 @@ finalize_it:
/* set queue mode to enqueue only or not
+ * There is one subtle issue: this method may be called during queue
+ * construction or while it is running. In the former case, the queue
+ * mutex does not yet exist (it is NULL), while in the later case it
+ * must be locked. The function detects the state and operates as
+ * required.
* rgerhards, 2008-01-16
*/
static rsRetVal
@@ -2191,8 +2319,10 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly)
/* for simplicity, we do one big mutex lock. This method is extremely seldom
* called, so that doesn't matter... -- rgerhards, 2008-01-16
*/
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- pthread_mutex_lock(pThis->mut);
+ if(pThis->mut != NULL) {
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(pThis->mut);
+ }
if(bEnqOnly == pThis->bEnqOnly)
FINALIZE; /* no change, nothing to do */
@@ -2214,8 +2344,10 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly)
pThis->bEnqOnly = bEnqOnly;
finalize_it:
- pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
+ if(pThis->mut != NULL) {
+ d_pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ }
return iRet;
}
@@ -2250,11 +2382,11 @@ queueGetQueueSize(queue_t *pThis, int *piQueueSize)
assert(piQueueSize != NULL);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- pthread_mutex_lock(pThis->mut);
+ d_pthread_mutex_lock(pThis->mut);
*piQueueSize = pThis->iQueueSize; /* tell the world there is one more worker */
- pthread_mutex_unlock(pThis->mut);
+ d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
return iRet;
diff --git a/queue.h b/queue.h
index 481c5aa1..3b53eda3 100644
--- a/queue.h
+++ b/queue.h
@@ -115,7 +115,8 @@ typedef struct queue_s {
rsRetVal (*qDel)(struct queue_s *pThis, void **ppUsr);
/* end type-specific handler */
/* synchronization variables */
- pthread_mutex_t *mut;
+ pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
+ pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */
pthread_cond_t *notFull, *notEmpty;
pthread_cond_t condThrdTrm;/* signalled when threads terminate */
pthread_cond_t *condSignalOnEmpty;/* caller-provided condition to be signalled when queue is empty (DA mode!) */