summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/batch.h2
-rw-r--r--runtime/queue.c74
2 files changed, 38 insertions, 38 deletions
diff --git a/runtime/batch.h b/runtime/batch.h
index 031718a7..2b3aa83e 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -34,7 +34,7 @@
typedef enum {
BATCH_STATE_RDY = 0, /* object ready for processing */
BATCH_STATE_BAD = 1, /* unrecoverable failure while processing, do NOT resubmit to same action */
- BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unkonwn */
+ BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unknown */
BATCH_STATE_COMM = 3, /* message successfully commited */
BATCH_STATE_DISC = 4, /* discarded - processed OK, but do not submit to any other action */
} batch_state_t;
diff --git a/runtime/queue.c b/runtime/queue.c
index 67bc40c2..b4f00446 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -70,6 +70,7 @@ DEFobjCurrIf(strm)
DEFobjCurrIf(errmsg)
/* forward-definitions */
+static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr);
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
@@ -1362,7 +1363,7 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue
* picking up things from the to-delete list.
*/
static inline rsRetVal
-DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
+DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
{
toDeleteLst_t *pTdl;
qDeqID deqIDDel;
@@ -1370,10 +1371,11 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
+ assert(nDeleted > 0);
pTdl = tdlPeek(pThis); /* get current head element */
if(pTdl == NULL) { /* to-delete list empty */
- DoDeleteBatchFromQStore(pThis, pBatch->nElemDeq);
+ DoDeleteBatchFromQStore(pThis, nDeleted);
} else if(pBatch->deqID == pThis->deqIDDel) {
deqIDDel = pThis->deqIDDel;
pTdl = tdlPeek(pThis);
@@ -1386,7 +1388,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
} else {
/* can not delete, insert into to-delete list */
dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID);
- CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElemDeq));
+ CHKiRet(tdlAdd(pThis, pBatch->deqID, nDeleted));
}
finalize_it:
@@ -1395,7 +1397,8 @@ finalize_it:
/* Delete a batch of processed user objects from the queue, which includes
- * destructing the objects themself.
+ * destructing the objects themself. Any entries not marked as finally
+ * processed are enqueued again. The new enqueue is necessary because we have a
* rgerhards, 2009-05-13
*/
static inline rsRetVal
@@ -1403,18 +1406,37 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
{
int i;
void *pUsr;
+ int nEnqueued = 0;
+ rsRetVal localRet;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
+dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state);
for(i = 0 ; i < pBatch->nElem ; ++i) {
dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state);
pUsr = pBatch->pElem[i].pUsrp;
+ if( pBatch->pElem[i].state == BATCH_STATE_RDY
+ || pBatch->pElem[i].state == BATCH_STATE_SUB) {
+RUNLOG_STR("we need to requeue the entry");
+ localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
+ (obj_t*)MsgAddRef((msg_t*) pUsr));
+ ++nEnqueued;
+ if(localRet != RS_RET_OK) {
+ DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
+ }
+ }
objDestruct(pUsr);
}
- iRet = DeleteBatchFromQStore(pThis, pBatch);
+dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued);
+
+ if(nEnqueued > 0)
+ qqueueChkPersist(pThis, nEnqueued);
+
+ if(i > 0)
+ iRet = DeleteBatchFromQStore(pThis, pBatch, i);
pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
@@ -1423,7 +1445,11 @@ dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch
/* dequeue as many user pointers as are available, until we hit the configured
- * upper limit of pointers.
+ * upper limit of pointers. Note that this function also deletes all processed
+ * objects from the previous batch. However, it is perfectly valid that the
+ * previous batch contained NO objects at all. For example, this happens
+ * immediately after system startup or when a queue was exhausted and the queue
+ * worker needed to wait for new data.
* This must only be called when the queue mutex is LOOKED, otherwise serious
* malfunction will happen.
*/
@@ -1720,14 +1746,16 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
d_pthread_mutex_unlock(pThis->mut);
/* iterate over returned results and enqueue them in DA queue */
- //for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
- for(i = 0 ; i < pWti->batch.nElem ; i++) {
+ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
+ //for(i = 0 ; i < pWti->batch.nElem ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28
*/
dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg);
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY,
+ (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
+ pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
}
/* now we are done, but need to re-aquire the mutex */
@@ -1741,12 +1769,6 @@ finalize_it:
/* must only be called when the queue mutex is locked, else results
* are not stable!
- * If we are a child, we have done our duty when the queue is empty. In that case,
- * we can terminate.
- * Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from
- * the DA queue.
- * If our queue is in destruction, we drain to the DA queue and so we shall not terminate
- * until we have done so.
*/
static rsRetVal
qqueueChkStopWrkrDA(qqueue_t *pThis)
@@ -1755,28 +1777,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
if(pThis->bEnqOnly) {
iRet = RS_RET_TERMINATE_WHEN_IDLE;
-#if 0
- } else {
- if(pThis->bRunsDA) {
- ASSERT(pThis->pqDA != NULL);
- if( pThis->pqDA->bEnqOnly
- && pThis->pqDA->sizeOnDiskMax > 0
- && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
- /* this queue can never grow, so we can give up... */
- iRet = RS_RET_TERMINATE_NOW;
- } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
-dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk);
- iRet = RS_RET_TERMINATE_NOW;
-RUNLOG_STR("XXX: re-start reg worker");
-if(!pThis->bShutdownImmediate)
- qqueueAdviseMaxWorkers(pThis);
-RUNLOG_STR("XXX: done re-start reg worker");
- }
- } else {
- // experimental iRet = RS_RET_TERMINATE_NOW;
- ;
- }
-#endif
}
RETiRet;