diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 41 |
1 files changed, 39 insertions, 2 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index d437d590..5e9c67ca 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -841,6 +841,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) batch_obj_t batchObj; DEFiRet; + //TODO: init batchObj (states _OK and new fields -- CHECK) ASSERT(pThis != NULL); /* calling the consumer is quite different here than it is from a worker thread */ @@ -861,6 +862,26 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) RETiRet; } +/*** EXPERIMENTAL ***/ +rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + /* calling the consumer is quite different here than it is from a worker thread */ + /* we need to provide the consumer's return value back to the caller because in direct + * mode the consumer probably has a lot to convey (which get's lost in the other modes + * because they are asynchronous. But direct mode is deliberately synchronous. + * rgerhards, 2008-02-12 + * We use our knowledge about the batch_t structure below, but without that, we + * pay a too-large performance toll... -- rgerhards, 2009-04-22 + */ + iRet = pThis->pConsumer(pThis->pUsr, pBatch, &pThis->bShutdownImmediate); + + RETiRet; +} + static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) { @@ -1364,10 +1385,10 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) assert(pBatch != NULL); for(i = 0 ; i < pBatch->nElem ; ++i) { -dbgprintf("XXX: enqueueing data element %d of %d\n", i, pBatch->nElem); pUsr = pBatch->pElem[i].pUsrp; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { +dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch->nElem, pBatch->pElem[i].state); localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*) pUsr)); ++nEnqueued; @@ -1385,7 +1406,7 @@ dbgprintf("XXX: enqueueing data element %d of %d\n", i, pBatch->nElem); iRet = DeleteBatchFromQStore(pThis, pBatch); - pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ + pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ // TODO: more fine init, new fields! 2010-06-14 RETiRet; } @@ -1430,6 +1451,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz /* all well, use this element */ pWti->batch.pElem[nDequeued].pUsrp = pUsr; pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; + pWti->batch.pElem[nDequeued].bFilterOK = 1; // TODO: think again if we can handle that with more performance ++nDequeued; } @@ -2273,6 +2295,21 @@ finalize_it: /* ------------------------------ END multi-enqueue functions ------------------------------ */ +/* enqueue a new user data element in direct mode + * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better + * code later on (like multi submit!) 2010-06-10 + * Enqueues the new element and awakes worker thread. + */ +rsRetVal +qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, qqueue); + iRet = qAddDirect(pThis, pUsr); + RETiRet; +} + + /* enqueue a new user data element * Enqueues the new element and awakes worker thread. */ |