diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 132 |
1 files changed, 69 insertions, 63 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index bb9ea060..0cd33701 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -232,12 +232,16 @@ getQueueTypeName(queueType_t t) switch(t) { case QUEUETYPE_FIXED_ARRAY: r = "FixedArray"; + break; case QUEUETYPE_LINKEDLIST: r = "LinkedList"; + break; case QUEUETYPE_DISK: r = "Disk"; + break; case QUEUETYPE_DIRECT: r = "Direct"; + break; } return r; } @@ -976,6 +980,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) { batch_t singleBatch; batch_obj_t batchObj; + sbool active = 1; int i; DEFiRet; @@ -994,9 +999,9 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) memset(&singleBatch, 0, sizeof(batch_t)); batchObj.state = BATCH_STATE_RDY; batchObj.pUsrp = (obj_t*) pUsr; - batchObj.bFilterOK = 1; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; + singleBatch.active = &active; iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate); /* delete the batch string params: TODO: create its own "class" for this */ for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { @@ -1596,7 +1601,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz /* all well, use this element */ pWti->batch.pElem[nDequeued].pUsrp = pUsr; pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; - pWti->batch.pElem[nDequeued].bFilterOK = 1; // TODO: think again if we can handle that with more performance ++nDequeued; } @@ -2075,6 +2079,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize); + pThis->bQueueStarted = 1; if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ @@ -2105,7 +2110,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ * the case when a disk queue has been loaded. If we did not start it here, it would never start. */ qqueueAdviseMaxWorkers(pThis); - pThis->bQueueStarted = 1; /* support statistics gathering */ qName = obj.GetName((obj_t*)pThis); @@ -2303,73 +2307,75 @@ DoSaveOnShutdown(qqueue_t *pThis) /* destructor for the queue object */ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(qqueue) - /* shut down all workers - * We do not need to shutdown workers when we are in enqueue-only mode or we are a - * direct queue - because in both cases we have none... ;) - * with a child! -- rgerhards, 2008-01-28 - */ - if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL - && pThis->pWtpReg != NULL) - ShutdownWorkers(pThis); + if(pThis->bQueueStarted) { + /* shut down all workers + * We do not need to shutdown workers when we are in enqueue-only mode or we are a + * direct queue - because in both cases we have none... ;) + * with a child! -- rgerhards, 2008-01-28 + */ + if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL + && pThis->pWtpReg != NULL) + ShutdownWorkers(pThis); - if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { - CHKiRet(DoSaveOnShutdown(pThis)); - } + if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { + CHKiRet(DoSaveOnShutdown(pThis)); + } - /* finally destruct our (regular) worker thread pool - * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen, - * e.g. when they are not created in enqueue-only mode. We already check the condition - * as this may otherwise be very hard to find once we optimize (and have long forgotten - * about this condition here ;) - * rgerhards, 2008-01-25 - */ - if(pThis->qType != QUEUETYPE_DIRECT && pThis->pWtpReg != NULL) { - wtpDestruct(&pThis->pWtpReg); - } + /* finally destruct our (regular) worker thread pool + * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen, + * e.g. when they are not created in enqueue-only mode. We already check the condition + * as this may otherwise be very hard to find once we optimize (and have long forgotten + * about this condition here ;) + * rgerhards, 2008-01-25 + */ + if(pThis->qType != QUEUETYPE_DIRECT && pThis->pWtpReg != NULL) { + wtpDestruct(&pThis->pWtpReg); + } - /* Now check if we actually have a DA queue and, if so, destruct it. - * Note that the wtp must be destructed first, it may be in cancel cleanup handler - * *right now* and actually *need* to access the queue object to persist some final - * data (re-queueing case). So we need to destruct the wtp first, which will make - * sure all workers have terminated. Please note that this also generates a situation - * where it is possible that the DA queue has a parent pointer but the parent has - * no WtpDA associated with it - which is perfectly legal thanks to this code here. - */ - if(pThis->pWtpDA != NULL) { - wtpDestruct(&pThis->pWtpDA); - } - if(pThis->pqDA != NULL) { - qqueueDestruct(&pThis->pqDA); - } + /* Now check if we actually have a DA queue and, if so, destruct it. + * Note that the wtp must be destructed first, it may be in cancel cleanup handler + * *right now* and actually *need* to access the queue object to persist some final + * data (re-queueing case). So we need to destruct the wtp first, which will make + * sure all workers have terminated. Please note that this also generates a situation + * where it is possible that the DA queue has a parent pointer but the parent has + * no WtpDA associated with it - which is perfectly legal thanks to this code here. + */ + if(pThis->pWtpDA != NULL) { + wtpDestruct(&pThis->pWtpDA); + } + if(pThis->pqDA != NULL) { + qqueueDestruct(&pThis->pqDA); + } - /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) - * This handler is most important for disk queues, it will finally persist the necessary - * on-disk structures. In theory, other queueing modes may implement their other (non-DA) - * methods of persisting a queue between runs, but in practice all of this is done via - * disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here - * if need arises (what I doubt...) -- rgerhards, 2008-01-25 - */ - CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) { - DBGOPRINT((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet); - } + /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) + * This handler is most important for disk queues, it will finally persist the necessary + * on-disk structures. In theory, other queueing modes may implement their other (non-DA) + * methods of persisting a queue between runs, but in practice all of this is done via + * disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here + * if need arises (what I doubt...) -- rgerhards, 2008-01-25 + */ + CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) { + DBGOPRINT((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet); + } - /* finally, clean up some simple things... */ - if(pThis->pqParent == NULL) { - /* if we are not a child, we allocated our own mutex, which we now need to destroy */ - pthread_mutex_destroy(pThis->mut); - free(pThis->mut); - } - pthread_mutex_destroy(&pThis->mutThrdMgmt); - pthread_cond_destroy(&pThis->notFull); - pthread_cond_destroy(&pThis->notEmpty); - pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); - pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); + /* finally, clean up some simple things... */ + if(pThis->pqParent == NULL) { + /* if we are not a child, we allocated our own mutex, which we now need to destroy */ + pthread_mutex_destroy(pThis->mut); + free(pThis->mut); + } + pthread_mutex_destroy(&pThis->mutThrdMgmt); + pthread_cond_destroy(&pThis->notFull); + pthread_cond_destroy(&pThis->notEmpty); + pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); + pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); - DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize); - DESTROY_ATOMIC_HELPER_MUT(pThis->mutLogDeq); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutLogDeq); - /* type-specific destructor */ - iRet = pThis->qDestruct(pThis); + /* type-specific destructor */ + iRet = pThis->qDestruct(pThis); + } free(pThis->pszFilePrefix); free(pThis->pszSpoolDir); |