summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c41
1 files changed, 39 insertions, 2 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index d437d590..5e9c67ca 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -841,6 +841,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
batch_obj_t batchObj;
DEFiRet;
+ //TODO: init batchObj (states _OK and new fields -- CHECK)
ASSERT(pThis != NULL);
/* calling the consumer is quite different here than it is from a worker thread */
@@ -861,6 +862,26 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
RETiRet;
}
+/*** EXPERIMENTAL ***/
+rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ /* calling the consumer is quite different here than it is from a worker thread */
+ /* we need to provide the consumer's return value back to the caller because in direct
+ * mode the consumer probably has a lot to convey (which get's lost in the other modes
+ * because they are asynchronous. But direct mode is deliberately synchronous.
+ * rgerhards, 2008-02-12
+ * We use our knowledge about the batch_t structure below, but without that, we
+ * pay a too-large performance toll... -- rgerhards, 2009-04-22
+ */
+ iRet = pThis->pConsumer(pThis->pUsr, pBatch, &pThis->bShutdownImmediate);
+
+ RETiRet;
+}
+
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
{
@@ -1364,10 +1385,10 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
assert(pBatch != NULL);
for(i = 0 ; i < pBatch->nElem ; ++i) {
-dbgprintf("XXX: enqueueing data element %d of %d\n", i, pBatch->nElem);
pUsr = pBatch->pElem[i].pUsrp;
if( pBatch->pElem[i].state == BATCH_STATE_RDY
|| pBatch->pElem[i].state == BATCH_STATE_SUB) {
+dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch->nElem, pBatch->pElem[i].state);
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
(obj_t*)MsgAddRef((msg_t*) pUsr));
++nEnqueued;
@@ -1385,7 +1406,7 @@ dbgprintf("XXX: enqueueing data element %d of %d\n", i, pBatch->nElem);
iRet = DeleteBatchFromQStore(pThis, pBatch);
- pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
+ pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ // TODO: more fine init, new fields! 2010-06-14
RETiRet;
}
@@ -1430,6 +1451,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
/* all well, use this element */
pWti->batch.pElem[nDequeued].pUsrp = pUsr;
pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
+ pWti->batch.pElem[nDequeued].bFilterOK = 1; // TODO: think again if we can handle that with more performance
++nDequeued;
}
@@ -2273,6 +2295,21 @@ finalize_it:
/* ------------------------------ END multi-enqueue functions ------------------------------ */
+/* enqueue a new user data element in direct mode
+ * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better
+ * code later on (like multi submit!) 2010-06-10
+ * Enqueues the new element and awakes worker thread.
+ */
+rsRetVal
+qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr)
+{
+ DEFiRet;
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ iRet = qAddDirect(pThis, pUsr);
+ RETiRet;
+}
+
+
/* enqueue a new user data element
* Enqueues the new element and awakes worker thread.
*/