From da53802c96a59a990859706219398dce709ba1b3 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 26 Oct 2009 12:21:07 +0100 Subject: implemented solution for cancel at shutdown/unprocessed entries We do now enqueue those objects that are left unprocessed. This enables us to delete the full batch, what is exactly what we need to do. --- runtime/batch.h | 2 +- runtime/queue.c | 58 +++++++++++++++++++++++---------------------------------- 2 files changed, 24 insertions(+), 36 deletions(-) (limited to 'runtime') diff --git a/runtime/batch.h b/runtime/batch.h index 031718a7..2b3aa83e 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -34,7 +34,7 @@ typedef enum { BATCH_STATE_RDY = 0, /* object ready for processing */ BATCH_STATE_BAD = 1, /* unrecoverable failure while processing, do NOT resubmit to same action */ - BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unkonwn */ + BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unknown */ BATCH_STATE_COMM = 3, /* message successfully commited */ BATCH_STATE_DISC = 4, /* discarded - processed OK, but do not submit to any other action */ } batch_state_t; diff --git a/runtime/queue.c b/runtime/queue.c index d9dc599a..d1eefde6 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -70,6 +70,7 @@ DEFobjCurrIf(strm) DEFobjCurrIf(errmsg) /* forward-definitions */ +static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr); static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); @@ -1396,10 +1397,8 @@ finalize_it: /* Delete a batch of processed user objects from the queue, which includes - * destructing the objects themself. It is assumed that batches - * are processed in sequential order, that is if we find one unprocessed entry, - * that indicates the end of the delete operation. Note that this function MUST - * be called only for non-empty batches! + * destructing the objects themself. Any entries not marked as finally + * processed are enqueued again. The new enqueue is necessary because we have a * rgerhards, 2009-05-13 */ static inline rsRetVal @@ -1407,19 +1406,34 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) { int i; void *pUsr; + int nEnqueued = 0; + rsRetVal localRet; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state); - for(i = 0 ; i < (pBatch->nElem) && (pBatch->pElem[i].state != BATCH_STATE_RDY); ++i) { + for(i = 0 ; i < pBatch->nElem ; ++i) { dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state); pUsr = pBatch->pElem[i].pUsrp; + if( pBatch->pElem[i].state == BATCH_STATE_RDY + || pBatch->pElem[i].state == BATCH_STATE_SUB) { +RUNLOG_STR("we need to requeue the entry"); + localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, + (obj_t*)MsgAddRef((msg_t*) pUsr)); + ++nEnqueued; + if(localRet != RS_RET_OK) { + DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); + } + } objDestruct(pUsr); } -dbgprintf("we deleted %d objects\n", i); +dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); + + if(nEnqueued > 0) + qqueueChkPersist(pThis, nEnqueued); if(i > 0) iRet = DeleteBatchFromQStore(pThis, pBatch, i); @@ -1735,7 +1749,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) * rgerhards, 2009-05-28 */ dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg); - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, + (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); + pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ } /* now we are done, but need to re-aquire the mutex */ @@ -1749,12 +1765,6 @@ finalize_it: /* must only be called when the queue mutex is locked, else results * are not stable! - * If we are a child, we have done our duty when the queue is empty. In that case, - * we can terminate. - * Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from - * the DA queue. - * If our queue is in destruction, we drain to the DA queue and so we shall not terminate - * until we have done so. */ static rsRetVal qqueueChkStopWrkrDA(qqueue_t *pThis) @@ -1763,28 +1773,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) if(pThis->bEnqOnly) { iRet = RS_RET_TERMINATE_WHEN_IDLE; -#if 0 - } else { - if(pThis->bRunsDA) { - ASSERT(pThis->pqDA != NULL); - if( pThis->pqDA->bEnqOnly - && pThis->pqDA->sizeOnDiskMax > 0 - && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { - /* this queue can never grow, so we can give up... */ - iRet = RS_RET_TERMINATE_NOW; - } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { -dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); - iRet = RS_RET_TERMINATE_NOW; -RUNLOG_STR("XXX: re-start reg worker"); -if(!pThis->bShutdownImmediate) - qqueueAdviseMaxWorkers(pThis); -RUNLOG_STR("XXX: done re-start reg worker"); - } - } else { - // experimental iRet = RS_RET_TERMINATE_NOW; - ; - } -#endif } RETiRet; -- cgit