From 22b9dc1af11c3fdfdf9218fb48e15aedf9a342b3 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 16 Jan 2008 09:24:38 +0000 Subject: queue is now able to restore persisted state on startup (but still some fine tuning to be done) --- queue.c | 216 +++++++++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 174 insertions(+), 42 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 02346075..88e41168 100644 --- a/queue.c +++ b/queue.c @@ -52,10 +52,33 @@ DEFobjStaticHelpers /* forward-definitions */ rsRetVal queueChkPersist(queue_t *pThis); static void *queueWorker(void *arg); +static rsRetVal queueGetQueueSize(queue_t *pThis, int *piQueueSize); /* methods */ +/* send a command to a specific active thread. If the thread is not + * active, the command is not sent. + */ +static inline rsRetVal +queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); + + if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRDCMD_RUN) { + dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx); + pThis->pWrkThrds[iIdx].tCurrCmd = tCmd; + } else { + dbgprintf("Queue 0x%lx: command %d NOT sent to inactive thread %d\n", queueGetID(pThis), tCmd, iIdx); + } + + return iRet; +} + /* send a command to a specific thread + * TODO: check if we can run into trouble with inactive threads */ static inline rsRetVal queueTellWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) @@ -114,10 +137,34 @@ queueStrtWrkThrd(queue_t *pThis, int i) } +/* send a command to all active worker threads. A start index can be + * given. Usually, this is 0 or 1. Thread 0 is reserved to disk-assisted + * mode and this start index take care of the special handling it needs to + * receive. -- rgerhards, 2008-01-16 + */ +static inline rsRetVal +queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd) +{ + DEFiRet; + int i; + + ISOBJ_TYPE_assert(pThis, queue); + assert(iStartIdx == 0 || iStartIdx == 1); + + /* tell the workers our request */ + for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i) + if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRDCMD_TERMINATED) + queueTellActWrkThrd(pThis, i, tCmd); + + return iRet; +} + + /* send a command to all worker threads. A start index can be * given. Usually, this is 0 or 1. Thread 0 is reserved to disk-assisted * mode and this start index take care of the special handling it needs to * receive. + * TODO: check if we run into trouble with inactive worker threads */ static inline rsRetVal queueTellWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd) @@ -173,6 +220,27 @@ queueTimeoutComp(struct timespec *pt, int iTimeout) } +/* wake up all worker threads. Param bWithDAWrk tells if the DA worker + * is to be awaken, too. It needs special handling because it waits on + * two different conditions depending on processing state. + * rgerhards, 2008-01-16 + */ +static inline rsRetVal +queueWakeupWrkThrds(queue_t *pThis, int bWithDAWrk) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + + pthread_cond_broadcast(pThis->notEmpty); + if(bWithDAWrk && pThis->qRunsDA != QRUNS_REGULAR) { + /* if running disk-assisted, workers may wait on that condition, too */ + pthread_cond_broadcast(&pThis->condDA); + } + + return iRet; +} + /* --------------- code for disk-assisted (DA) queue modes -------------------- */ @@ -201,10 +269,9 @@ queueTurnOffDAMode(queue_t *pThis) queueStrtAllWrkThrds(pThis); /* restore our regular worker threads */ pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */ - /* note: a disk queue alsways has a single worker and it alwas has the ID 1 */ - queueTellWrkThrd(pThis->pqDA, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* tell the DA worker to terminate... */ - pthread_mutex_unlock(&pThis->mutDA); /* ... permit it to run ... */ - queueJoinWrkThrd(pThis->pqDA, 1); /* ... and wait for the shutdown to happen */ + /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, + * this will be quick. + */ queueDestruct(pThis->pqDA); /* and now we are ready to destruct the DA queue */ pThis->pqDA = NULL; @@ -288,43 +355,48 @@ queueChkIsDA(queue_t *pThis) * rgerhards, 2008-01-14 */ static inline rsRetVal -queueDAConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr) +queueDAConsumer(queue_t *pThis, int iMyThrdIndx, int iQueueSize, void *pUsr) { DEFiRet; int iCancelStateSave; + int iSizeDAQueue; ISOBJ_TYPE_assert(pThis, queue); ISOBJ_assert(pUsr); assert(pThis->qRunsDA != QRUNS_REGULAR); -dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, pThis->iQueueSize); +dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, iQueueSize); CHKiRet(queueEnqObj(pThis->pqDA, pUsr)); - /* we check if we reached the low water mark, but only if we are not in shutdown mode */ - if(pThis->iQueueSize == pThis->iLowWtrMrk && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { + /* We check if we reached the low water mark (but only if we are not in shutdown mode) + * Note that the child queue now in almost all cases is non-empty, because we just enqueued + * a message. + */ + if(iQueueSize <= pThis->iLowWtrMrk && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n", - queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); + queueGetID(pThis), iMyThrdIndx, iQueueSize); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); dbgprintf("pre mutex lock (think about CLEANUP!)\n"); pthread_mutex_lock(&pThis->mutDA); dbgprintf("mutex locked (think about CLEANUP!)\n"); + /* wait for either passing the high water mark or the child disk queue drain */ pthread_cond_wait(&pThis->condDA, &pThis->mutDA); dbgprintf("condition returned\n"); - /* we are back. either we have passed the high water mark or the child disk queue - * is empty. We check for the later. If that is the case, we switch back to - * non-DA mode - */ - if(pThis->pqDA->iQueueSize == 0) { - dbgprintf("Queue 0x%lx/w%d: %d entries - disk assisted child queue signaled it is empty\n", - queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); - CHKiRet(queueTurnOffDAMode(pThis)); /* this also unlocks the mutex! */ - } else { - pthread_mutex_unlock(&pThis->mutDA); - } + pthread_mutex_unlock(&pThis->mutDA); dbgprintf("mutex unlocked (think about CLEANUP!)\n"); pthread_setcancelstate(iCancelStateSave, NULL); } + /* now check if the DA queue is empty. If so, we can turn off DA mode. Note that we must + * use queueGetQueueSize() in order to avoid a race on child iQueueSize. -- rgerhards, 2008-01-16 + */ + CHKiRet(queueGetQueueSize(pThis->pqDA, &iSizeDAQueue)); + +dbgprintf("Queue %p/w%d: DA queue size now %d\n", pThis, iMyThrdIndx, iSizeDAQueue); + if(iSizeDAQueue == 0) { + CHKiRet(queueTurnOffDAMode(pThis)); /* this also unlocks the mutex! */ + } + finalize_it: dbgprintf("DAConsumer returns with iRet %d\n", iRet); return iRet; @@ -358,7 +430,8 @@ queueStrtDA(queue_t *pThis) */ pThis->pqDA->condSignalOnEmpty = &pThis->condDA; pThis->pqDA->mutSignalOnEmpty = &pThis->mutDA; - pThis->pqDA->bSignalOnEmpty = 1; + pThis->pqDA->condSignalOnEmpty2 = pThis->notEmpty; + pThis->pqDA->bSignalOnEmpty = 2; CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); @@ -397,7 +470,7 @@ queueStrtDA(queue_t *pThis) * on one. So even if the scheduler plays badly with us, things should be * quite well. -- rgerhards, 2008-01-15 */ - pthread_cond_broadcast(pThis->notEmpty); + queueWakeupWrkThrds(pThis, 0); /* awake all workers, but not ourselves ;) */ pThis->qRunsDA = QRUNS_DA; /* we are now in DA mode! */ @@ -934,6 +1007,28 @@ queueDel(queue_t *pThis, void *pUsr) } +/* Send a shutdown command to all workers and awake them. This function + * does NOT wait for them to terminate. Set bIncludeDAWRk to send the + * termination command to the DA worker, too (else this does not happen). + * rgerhards, 2008-01-16 + */ +static inline rsRetVal +queueWrkThrdReqTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int bIncludeDAWrk) +{ + DEFiRet; + + if(bIncludeDAWrk) { + queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ + queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ + } else { + queueTellActWrkThrds(pThis, 1, tShutdownCmd);/* first tell the workers our request */ + queueWakeupWrkThrds(pThis, 0); /* awake all workers but not DA-worker */ + } + + return iRet; +} + + /* Send a shutdown command to all workers and see if they terminate. * A timeout may be specified. * rgerhards, 2008-01-14 @@ -945,16 +1040,9 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) int bTimedOut; struct timespec t; - /* first tell the workers our request */ - queueTellWrkThrds(pThis, 0, tShutdownCmd); - - /* awake them... */ - pthread_cond_broadcast(pThis->notEmpty); - if(pThis->qRunsDA != QRUNS_REGULAR) /* if running disk-assisted, workers may wait on that condition, too */ - pthread_cond_broadcast(&pThis->condDA); - - /* get timeout */ - queueTimeoutComp(&t, iTimeout); + queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ + queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ + queueTimeoutComp(&t, iTimeout);/* get timeout */ /* and wait for their termination */ pthread_mutex_lock(pThis->mut); @@ -989,9 +1077,7 @@ queueWrkThrdCancel(queue_t *pThis) // worker cancellation! -- rgerhards, 2008-01-14 /* awake the workers one more time, just to be sure */ - pthread_cond_broadcast(pThis->notEmpty); - if(pThis->qRunsDA != QRUNS_REGULAR) /* if running disk-assisted, workers may wait on that condition, too */ - pthread_cond_broadcast(&pThis->condDA); + queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ /* first tell the workers our request */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) @@ -1079,6 +1165,7 @@ queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave) DEFiRet; rsRetVal iRetLocal; int iSeverity; + int iQueueSize; void *pUsr; int qRunsDA; @@ -1094,6 +1181,7 @@ queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave) iRet = queueDel(pThis, &pUsr); queueChkPersist(pThis); // when we support peek(), we must do this down after the del! qRunsDA = pThis->qRunsDA; /* do a local copy so that we prevent a race after mutex release */ + iQueueSize = pThis->iQueueSize; /* ... and the same for this property */ pthread_mutex_unlock(pThis->mut); pthread_cond_signal(pThis->notFull); pthread_setcancelstate(iCancelStateSave, NULL); @@ -1107,25 +1195,26 @@ queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave) FINALIZE; if(qRunsDA == QRUNS_DA) { - queueDAConsumer(pThis, iMyThrdIndx, pUsr); + queueDAConsumer(pThis, iMyThrdIndx, iQueueSize, pUsr); } else { /* we are running in normal, non-disk-assisted mode */ /* do a quick check if we need to drain the queue */ - if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) { + if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { iRetLocal = objGetSeverity(pUsr, &iSeverity); if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { dbgprintf("Queue 0x%lx/w%d: dequeue/queue nearly full (%d entries), " "discarded severity %d message\n", - queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, iSeverity); + queueGetID(pThis), iMyThrdIndx, iQueueSize, iSeverity); objDestruct(pUsr); } } else { dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n", queueGetID(pThis), iMyThrdIndx); iRetLocal = pThis->pConsumer(pUsr); - if(iRetLocal != RS_RET_OK) + if(iRetLocal != RS_RET_OK) { dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n", queueGetID(pThis), iMyThrdIndx, iRetLocal); + } } } @@ -1193,7 +1282,7 @@ queueWorker(void *arg) while(pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", queueGetID(pThis), iMyThrdIndx); - if(pThis->bSignalOnEmpty) { + if(pThis->bSignalOnEmpty > 0) { /* we need to signal our parent queue that we are empty */ dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); pthread_mutex_lock(pThis->mutSignalOnEmpty); @@ -1204,7 +1293,12 @@ dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx); * run. This is important because the parent may have changed our * state. So we simply go back to the begin of the loop. */ - continue; + //continue; + } + if(pThis->bSignalOnEmpty > 1) { + /* no mutex associated with this condition, it's just a try (but needed + * to wakeup a parent worker if e.g. the queue was restarted from disk) */ + pthread_cond_signal(pThis->condSignalOnEmpty2); } /* If we arrive here, we have the regular case, where we can safely assume that * iQueueSize and tCmd have not changed since the while(). @@ -1517,13 +1611,24 @@ rsRetVal queueDestruct(queue_t *pThis) assert(pThis != NULL); - /* first, terminate worker threads */ + /* if running DA, tell the DA workers to shut down. This saves us some CPU cycles which + * we can use to persist the remaining in-memory data to disk quicker. -- rgerhads, 2008-01-16 + */ + if(pThis->qRunsDA != QRUNS_REGULAR) + queueWrkThrdReqTrm(pThis->pqDA, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 0); + + /* then, terminate our own worker threads */ if(pThis->pWrkThrds != NULL) { queueShutdownWorkers(pThis); free(pThis->pWrkThrds); pThis->pWrkThrds = NULL; } + /* of we have now data left in in-memory queues, this data will be lost if we do not + * persist it to a disk queue. + * TODO: implement code rgerhards, 2008-01-16 + */ + /* if running DA, terminate disk queue */ if(pThis->qRunsDA != QRUNS_REGULAR) queueDestruct(pThis->pqDA); @@ -1689,6 +1794,33 @@ DEFpropSetMeth(queue, iDiscardSeverity, int); DEFpropSetMeth(queue, bIsDA, int); +/* get the size of this queue. The important thing about this get method is that it + * is synchronized via the queue mutex. So it provides the information back without + * any chance of race. Obviously, this causes quite some overhead, so this + * function should only be called in situations where a race needs to be avoided. + * rgerhards, 2008-01-16 + */ +static rsRetVal +queueGetQueueSize(queue_t *pThis, int *piQueueSize) +{ + DEFiRet; + int iCancelStateSave; + + ISOBJ_TYPE_assert(pThis, queue); + assert(piQueueSize != NULL); + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + pthread_mutex_lock(pThis->mut); + + *piQueueSize = pThis->iQueueSize; /* tell the world there is one more worker */ + + pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + + return iRet; +} + + /* This function can be used as a generic way to set properties. Only the subset * of properties required to read persisted property bags is supported. This * functions shall only be called by the property bag reader, thus it is static. -- cgit