summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c178
1 files changed, 62 insertions, 116 deletions
diff --git a/queue.c b/queue.c
index 864f4711..c77ef447 100644
--- a/queue.c
+++ b/queue.c
@@ -58,13 +58,13 @@ DEFobjStaticHelpers
/* debug aides */
#if 1
-#define d_pthread_mutex_lock(x) {dbgprintf("mutex %p lock file %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \
+#define d_pthread_mutex_lock(x) {dbgprintf("mutex %p lock %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \
pthread_mutex_lock(x); \
- if(0)dbgprintf("mutex %p lock aquired file %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \
+ if(1)dbgprintf("mutex %p lock aquired %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \
}
-#define d_pthread_mutex_unlock(x) {dbgprintf("mutex %p UNlock file %s, %s(), line %d\n", x ,__FILE__, __func__, __LINE__);\
+#define d_pthread_mutex_unlock(x) {dbgprintf("mutex %p UNlock %s, %s(), line %d\n", x ,__FILE__, __func__, __LINE__);\
pthread_mutex_unlock(x); \
- if(0)dbgprintf("mutex %p UNlock done file %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \
+ if(1)dbgprintf("mutex %p UNlock done %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \
}
#else
#define d_pthread_mutex_lock(x) pthread_mutex_lock(x)
@@ -75,7 +75,6 @@ DEFobjStaticHelpers
/* forward-definitions */
rsRetVal queueChkPersist(queue_t *pThis);
static void *queueWorker(void *arg);
-static rsRetVal queueGetQueueSize(queue_t *pThis, int *piQueueSize);
static rsRetVal queueChkWrkThrdChanges(queue_t *pThis);
static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly);
@@ -92,34 +91,6 @@ static void queueMutexCleanup(void *arg)
}
-void test1(void) {}
-/* method to lock a mutex in a cancel-safe way. A cancel handler
- * is pushed that unlocks the mutex it when thread is cancelled.
- * rgerhards, 2008-01-19
- */
-#define queueMutexCnclSaveLock(pMut) \
-do { \
- int iCancelStateSave; \
- \
- assert(pMut != NULL); \
- \
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \
- d_pthread_mutex_lock(pMut); \
- pthread_cleanup_push(queueMutexCleanup, pMut); \
- pthread_setcancelstate(iCancelStateSave, NULL); \
-} while(0)\
-
-void test(void) {}
-/* method to unlock a mutex that was locked via queueMutexWithCnclSaveLock ()
- * note that a push/pop interface is used, so they must be called in the respective
- * order. -- rgerhards, 2008-01-19
- */
-#define queueMutexWithCnclSaveUnlock() \
-do { \
- pthread_cleanup_pop(1); \
-} while(0) \
-
-
/* get the current worker state. For simplicity and speed, we have
* NOT used our regular calling interface this time. I hope that won't
* bite in the long term... -- rgerhards, 2008-01-17
@@ -136,7 +107,7 @@ qWrkrGetState(qWrkThrd_t *pThis)
* (it would be best if we could do this with an atomic operation)
* rgerhards, 2008-01-19
*/
-static inline void
+static void
queueWrkrThrdStartupIndication(queue_t *pThis)
{
int iCancelStateSave;
@@ -154,7 +125,7 @@ queueWrkrThrdStartupIndication(queue_t *pThis)
* (it would be best if we could do this with an atomic operation)
* rgerhards, 2008-01-19
*/
-static inline void
+static void
queueWrkrThrdShutdownIndication(queue_t *pThis)
{
int iCancelStateSave;
@@ -393,15 +364,12 @@ queueStrtDAWrkr(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
-dbgprintf("Queue %p: DAWrkr thread mutex lock\n", pThis);
d_pthread_mutex_lock(&pThis->pWrkThrds[0].mut);
- pthread_cleanup_push(queueMutexCleanup, &pThis->pWrkThrds[0].mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
if(pThis->pWrkThrds[0].tCurrCmd == eWRKTHRD_STOPPED) {
iRet = queueStrtWrkThrd(pThis, 0);
}
- pthread_cleanup_pop(1);
-dbgprintf("Queue %p: DAWrkr thread mutex unlock\n", pThis);
+ d_pthread_mutex_unlock(&pThis->pWrkThrds[0].mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
return iRet;
}
@@ -508,7 +476,7 @@ queueWakeupWrkThrds(queue_t *pThis, int bWithDAWrk)
ISOBJ_TYPE_assert(pThis, queue);
- pthread_cond_broadcast(pThis->notEmpty);
+ pthread_cond_broadcast(&pThis->notEmpty);
if(bWithDAWrk && pThis->qRunsDA != QRUNS_REGULAR) {
/* if running disk-assisted, workers may wait on that condition, too */
pthread_cond_broadcast(&pThis->condDA);
@@ -538,11 +506,10 @@ queueChkAndStrtWrk(queue_t *pThis)
if(pThis->bEnqOnly == 1)
FINALIZE; /* in enqueue-only mode we have no workers */
- //queueMutexCnclSaveLock(&pThis->mutThrdMgmt);
-pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
-d_pthread_mutex_lock(&pThis->mutThrdMgmt);
-pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt);
-pthread_setcancelstate(iCancelStateSave, NULL);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(&pThis->mutThrdMgmt);
+ pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt);
+ pthread_setcancelstate(iCancelStateSave, NULL);
/* check if we need to start up another worker */
if(pThis->qRunsDA == QRUNS_REGULAR) {
if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) {
@@ -564,8 +531,7 @@ dbgprintf("Queue %p: DA worker is no longer running, restarting, qsize %d, worke
iRet = queueStrtDAWrkr(pThis);
}
}
- //queueMutexCnclSaveUnlock(&pThis->mutThrdMgmt); // TODO: move to finalize_it, but needs a conditon
-pthread_cleanup_pop(1);
+ pthread_cleanup_pop(1);
finalize_it:
return iRet;
@@ -718,7 +684,7 @@ dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pT
*/
pThis->pqDA->condSignalOnEmpty = &pThis->condDA;
pThis->pqDA->mutSignalOnEmpty = &pThis->mutDA;
- pThis->pqDA->condSignalOnEmpty2 = pThis->notEmpty;
+ pThis->pqDA->condSignalOnEmpty2 = &pThis->notEmpty;
pThis->pqDA->bSignalOnEmpty = 2;
pThis->pqDA->pqParent = pThis;
dbgprintf("Queue %p: queueSTrtDA after assign\n", pThis);
@@ -821,7 +787,6 @@ static rsRetVal
queueChkStrtDA(queue_t *pThis)
{
DEFiRet;
- int iCancelStateSave;
ISOBJ_TYPE_assert(pThis, queue);
@@ -837,6 +802,7 @@ queueChkStrtDA(queue_t *pThis)
dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n",
queueGetID(pThis), pThis->iQueueSize);
//pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ // TODO: mutex call order check! this must aquire the queue mutex
//d_pthread_mutex_lock(&pThis->mutDA);
pthread_cond_signal(&pThis->condDA);
//d_pthread_mutex_unlock(&pThis->mutDA);
@@ -1344,11 +1310,10 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
/* and wait for their termination */
dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt);
- //queueMutexCnclSaveLock(&pThis->mutThrdMgmt);
-pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
-d_pthread_mutex_lock(&pThis->mutThrdMgmt);
-pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt);
-pthread_setcancelstate(iCancelStateSave, NULL);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(&pThis->mutThrdMgmt);
+ pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt);
+ pthread_setcancelstate(iCancelStateSave, NULL);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n",
@@ -1359,8 +1324,7 @@ pthread_setcancelstate(iCancelStateSave, NULL);
bTimedOut = 1; /* we exit the loop on timeout */
}
}
-// queueMutexCnclSaveUnlock(&pThis->mutThrdMgmt);
-pthread_cleanup_pop(1);
+ pthread_cleanup_pop(1);
if(bTimedOut)
iRet = RS_RET_TIMED_OUT;
@@ -1528,7 +1492,7 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancel
pWrkrInst->pUsr = pUsr; /* save it for the cancel cleanup handler */
qRunsDA = pThis->qRunsDA;
d_pthread_mutex_unlock(pThis->mut);
- pthread_cond_signal(pThis->notFull);
+ pthread_cond_signal(&pThis->notFull);
pthread_setcancelstate(iCancelStateSave, NULL);
/* WE ARE NO LONGER PROTECTED FROM THE MUTEX */
@@ -1587,22 +1551,29 @@ static void queueWorkerCancelCleanup(void *arg)
{
qWrkThrd_t *pWrkrInst = (qWrkThrd_t*) arg;
queue_t *pThis;
+ int iCancelStateSave;
assert(pWrkrInst != NULL);
ISOBJ_TYPE_assert(pWrkrInst->pQueue, queue);
pThis = pWrkrInst->pQueue;
- dbgprintf("Queue 0x%lx/w%d: cancelation cleanup handler called (NOT FULLY IMPLEMENTED, one msgs lost!)\n",
+ dbgprintf("Queue 0x%lx/w%d: cancelation cleanup handler called (NOT FULLY IMPLEMENTED, one msg lost!)\n",
queueGetID(pThis), pWrkrInst->iThrd);
+ /* TODO: re-enqueue the data element! */
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(&pThis->mutThrdMgmt);
qWrkrSetState(&pThis->pWrkThrds[pWrkrInst->iThrd], eWRKTHRD_TERMINATING);
pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
- /* TODO: re-enqueue the data element! */
dbgprintf("Queue 0x%lx/w%d: thread CANCELED with %d entries left in queue, %d workers running.\n",
queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize, pThis->iCurNumWrkThrd - 1);
- queueWrkrThrdShutdownIndication(pThis); // TODO: move above debug message into this function!
+ pThis->iCurNumWrkThrd--;
+ pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
+ d_pthread_mutex_unlock(&pThis->mutThrdMgmt);
+ pthread_setcancelstate(iCancelStateSave, NULL);
}
@@ -1621,15 +1592,15 @@ static inline int
queueWorkerRemainActive(queue_t *pThis, qWrkThrd_t *pWrkrInst)
{
register int b; /* this is a boolean! */
- int iSizeDAQueue;
/* first check the usual termination condition that applies to all workers */
b = ( (qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING)
|| ((qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && (pThis->iQueueSize > 0)));
-dbgprintf("Queue %p/w%d: chk 1 pre empty queue, qsize %d, cont run: %d, cmd %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize, b, qWrkrGetState(pWrkrInst));
+dbgprintf("Queue %p/w%d: chk 1 pre empty queue, qsize %d (high wtr %d), cont run: %d, cmd %d, DA qsize %d\n", pThis,
+ pWrkrInst->iThrd,
+ pThis->iQueueSize, pThis->iHighWtrMrk, b, qWrkrGetState(pWrkrInst),
+ (pThis->pqDA == NULL) ? -1 : pThis->pqDA->iQueueSize);
if(b && pWrkrInst->iThrd == 0 && pThis->qRunsDA == QRUNS_DA) {
-// queueGetQueueSize(pThis->pqDA, &iSizeDAQueue);
-// b = pThis->iQueueSize >= pThis->iHighWtrMrk || iSizeDAQueue != 0;
b = pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->pqDA->iQueueSize != 0;
}
@@ -1662,11 +1633,9 @@ queueWorker(void *arg)
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
- queueWrkrThrdStartupIndication(pThis);
-
- /* do some one-time initialization */
+ /* do some one-time thread initialization */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(pThis->mut);
+ d_pthread_mutex_lock(&pThis->mutThrdMgmt);
/* initialize our thread instance descriptor */
qWrkrInit(&pWrkrInst, pThis);
@@ -1687,12 +1656,13 @@ dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA);
* because someone may have requested us to shut down even before we got a chance to do
* our init. That would be a bad race... -- rgerhards, 2008-01-16
*/
+ pThis->iCurNumWrkThrd++;
if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT)
qWrkrSetState(pWrkrInst, eWRKTHRD_RUNNING); /* we are running now! */
pthread_cleanup_push(queueWorkerCancelCleanup, pWrkrInst);
- d_pthread_mutex_unlock(pThis->mut);
+ d_pthread_mutex_unlock(&pThis->mutThrdMgmt);
pthread_setcancelstate(iCancelStateSave, NULL);
/* end one-time stuff */
@@ -1702,19 +1672,23 @@ dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA);
while(bContinueRun) {
dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n",
queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(pThis->mut);
-
/* process any pending thread requests */
queueChkWrkThrdChanges(pThis);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(pThis->mut);
+dbgprintf("pthis 2: %p\n", pThis);
+
if((bContinueRun = queueWorkerRemainActive(pThis, pWrkrInst)) == 0) {
+dbgprintf("pthis 2a: %p\n", pThis);
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
+dbgprintf("pthis 2b: %p\n", pThis);
continue; /* and break loop */
}
/* if we reach this point, we are still protected by the mutex */
+dbgprintf("pthis 3: %p\n", pThis);
if(pThis->iQueueSize == 0) {
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
@@ -1745,12 +1719,13 @@ dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n",
dbgprintf("Queue %p/w%d: pre condwait ->notEmpty, worker shutdown %d\n", pThis, iMyThrdIndx, pThis->toWrkShutdown);
/* DA worker and first worker never have an inactivity timeout */
if(pWrkrInst->iThrd < 2 || pThis->toWrkShutdown == -1) {
+ //xxx if(pThis->toWrkShutdown == -1) {
dbgprintf("worker never times out!\n");
/* never shut down any started worker */
- pthread_cond_wait(pThis->notEmpty, pThis->mut);
+ pthread_cond_wait(&pThis->notEmpty, pThis->mut);
} else {
queueTimeoutComp(&t, pThis->toWrkShutdown);/* get absolute timeout */
- if(pthread_cond_timedwait (pThis->notEmpty, pThis->mut, &t) != 0) {
+ if(pthread_cond_timedwait(&pThis->notEmpty, pThis->mut, &t) != 0) {
dbgprintf("Queue 0x%lx/w%d: inactivity timeout, worker terminating...\n",
queueGetID(pThis), iMyThrdIndx);
/* we use SHUTDOWN (and not SHUTDOWN_IMMEDIATE) so that the worker
@@ -1921,10 +1896,8 @@ dbgprintf("Queue %p: I am child, use mutex %p\n", pThis, pThis->pqParent->mut);
}
pthread_mutex_init(&pThis->mutThrdMgmt, NULL);
- pThis->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
- pthread_cond_init (pThis->notFull, NULL);
- pThis->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
- pthread_cond_init (pThis->notEmpty, NULL);
+ pthread_cond_init (&pThis->notFull, NULL);
+ pthread_cond_init (&pThis->notEmpty, NULL);
dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
/* call type-specific constructor */
@@ -1966,7 +1939,8 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
if(!bInitialized) {
dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis));
/* fire up the worker threads */
- queueStrtNewWrkThrd(pThis);
+ if(pThis->bEnqOnly == 0)
+ queueStrtNewWrkThrd(pThis);
// TODO: preforked workers! queueStrtAllWrkThrds(pThis);
}
pThis->bQueueStarted = 1;
@@ -2119,6 +2093,7 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
dbgprintf("queueDestruct mutex locked\n");
if(pThis->iCurNumWrkThrd == 0 && pThis->iQueueSize > 0 && !pThis->bEnqOnly) {
dbgprintf("Queue %p: queueDestruct must start regular workers!\n", pThis);
+ // TODO check mutex call order - doies function aquire mutex?
queueStrtNewWrkThrd(pThis);
}
d_pthread_mutex_unlock(pThis->mut);
@@ -2154,10 +2129,8 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
free(pThis->mut);
}
pthread_mutex_destroy(&pThis->mutThrdMgmt);
- pthread_cond_destroy(pThis->notFull);
- free(pThis->notFull);
- pthread_cond_destroy(pThis->notEmpty);
- free(pThis->notEmpty);
+ pthread_cond_destroy(&pThis->notFull);
+ pthread_cond_destroy(&pThis->notEmpty);
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
@@ -2237,6 +2210,9 @@ queueEnqObj(queue_t *pThis, void *pUsr)
ISOBJ_TYPE_assert(pThis, queue);
+ /* process any pending thread requests */
+ queueChkWrkThrdChanges(pThis);
+
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
* during its execution. If that is not done, race conditions occur if the
@@ -2248,9 +2224,6 @@ queueEnqObj(queue_t *pThis, void *pUsr)
d_pthread_mutex_lock(pThis->mut);
}
- /* process any pending thread requests */
- queueChkWrkThrdChanges(pThis);
-
/* first check if we can discard anything */
if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) {
iRetLocal = objGetSeverity(pUsr, &iSeverity);
@@ -2277,7 +2250,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis));
queueTimeoutComp(&t, pThis->toEnq);
- if(pthread_cond_timedwait (pThis->notFull, pThis->mut, &t) != 0) {
+ if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", queueGetID(pThis));
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
@@ -2290,7 +2263,7 @@ finalize_it:
/* now awake sleeping worker threads */
if(pThis->pWrkThrds != NULL) {
d_pthread_mutex_unlock(pThis->mut);
- i = pthread_cond_signal(pThis->notEmpty);
+ i = pthread_cond_signal(&pThis->notEmpty);
dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);
pthread_setcancelstate(iCancelStateSave, NULL);
}
@@ -2366,33 +2339,6 @@ DEFpropSetMeth(queue, bIsDA, int);
DEFpropSetMeth(queue, iMinMsgsPerWrkr, int);
-/* get the size of this queue. The important thing about this get method is that it
- * is synchronized via the queue mutex. So it provides the information back without
- * any chance of race. Obviously, this causes quite some overhead, so this
- * function should only be called in situations where a race needs to be avoided.
- * rgerhards, 2008-01-16
- */
-static rsRetVal
-queueGetQueueSize(queue_t *pThis, int *piQueueSize)
-{
- DEFiRet;
- int iCancelStateSave;
-
- ISOBJ_TYPE_assert(pThis, queue);
- assert(piQueueSize != NULL);
-
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(pThis->mut);
-
- *piQueueSize = pThis->iQueueSize; /* tell the world there is one more worker */
-
- d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
-
- return iRet;
-}
-
-
/* This function can be used as a generic way to set properties. Only the subset
* of properties required to read persisted property bags is supported. This
* functions shall only be called by the property bag reader, thus it is static.