summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-18 17:28:34 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-18 17:28:34 +0200
commit93f873277bfe5ebb309ff5e92f5dc7244ebd9f1a (patch)
tree0a1431d8ce318552e83cce38273015c5206155a3 /runtime/queue.c
parentb81311ac70e4de0bd5c0b0286413ff1b527ef906 (diff)
downloadrsyslog-93f873277bfe5ebb309ff5e92f5dc7244ebd9f1a.tar.gz
rsyslog-93f873277bfe5ebb309ff5e92f5dc7244ebd9f1a.tar.xz
rsyslog-93f873277bfe5ebb309ff5e92f5dc7244ebd9f1a.zip
t-delete list implemented, queue store drivers updated...
... on the way to the ultra-reliable queue modes (redesign doc). This version does not really work, but is a good commit point. Next comes queue size calculation. DA mode does not yet work.
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c353
1 files changed, 306 insertions, 47 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 6bea338a..dc399066 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -79,6 +79,93 @@ static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
#define QUEUE_CHECKPOINT 1
#define QUEUE_NO_CHECKPOINT 0
+/***********************************************************************
+ * we need a private data structure, the "to-delete" list. As C does
+ * not provide any partly private data structures, we implement this
+ * structure right here inside the module.
+ * Note that this list must always be kept sorted based on a unique
+ * dequeue ID (which is monotonically increasing).
+ * rgerhards, 2009-05-18
+ ***********************************************************************/
+
+/* generate next uniqueue dequeue ID. Note that uniqueness is only required
+ * on a per-queue basis and while this instance runs. So a stricly monotonically
+ * increasing counter is sufficient (if enough bits are used).
+ */
+static inline qDeqID getNextDeqID(qqueue_t *pQueue)
+{
+ ISOBJ_TYPE_assert(pQueue, qqueue);
+ return pQueue->deqIDAdd++;
+}
+
+
+/* return the top element of the to-delete list or NULL, if the
+ * list is empty.
+ */
+static inline toDeleteLst_t *tdlPeek(qqueue_t *pQueue)
+{
+ ISOBJ_TYPE_assert(pQueue, qqueue);
+ return pQueue->toDeleteLst;
+}
+
+
+/* remove the top element of the to-delete list. Nothing but the
+ * element itself is destroyed. Must not be called when the list
+ * is empty.
+ */
+static inline rsRetVal tdlPop(qqueue_t *pQueue)
+{
+ toDeleteLst_t *pRemove;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pQueue, qqueue);
+ assert(pQueue->toDeleteLst != NULL);
+
+ pRemove = pQueue->toDeleteLst;
+ pQueue->toDeleteLst = pQueue->toDeleteLst->pNext;
+ free(pRemove);
+
+ RETiRet;
+}
+
+
+/* Add a new to-delete list entry. The function allocates the data
+ * structure, populates it with the values provided and links the new
+ * element into the correct place inside the list.
+ */
+static inline rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElem)
+{
+ toDeleteLst_t *pNew;
+ toDeleteLst_t *pPrev;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pQueue, qqueue);
+ assert(pQueue->toDeleteLst != NULL);
+
+ CHKmalloc(pNew = malloc(sizeof(toDeleteLst_t)));
+ pNew->deqID = deqID;
+ pNew->nElem = nElem;
+
+ /* now find right spot */
+ for( pPrev = pQueue->toDeleteLst
+ ; pPrev != NULL && deqID > pPrev->deqID
+ ; pPrev = pPrev->pNext) {
+ /*JUST SEARCH*/;
+ }
+
+ if(pPrev == NULL) {
+ pNew->pNext = pQueue->toDeleteLst;
+ pQueue->toDeleteLst = pNew;
+ } else {
+ pNew->pNext = pPrev->pNext;
+ pPrev->pNext = pNew;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
/* methods */
@@ -114,7 +201,9 @@ static inline void queueDrain(qqueue_t *pThis)
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(pThis->iQueueSize-- > 0) {
- pThis->qDel(pThis, &pUsr);
+ pThis->qDeq(pThis, &pUsr);
+// TODO: ULTRA
+ //pThis->qDel(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
}
@@ -472,6 +561,7 @@ static rsRetVal qConstructFixedArray(qqueue_t *pThis)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
+ pThis->tVars.farray.deqhead = 0;
pThis->tVars.farray.head = 0;
pThis->tVars.farray.tail = 0;
@@ -508,13 +598,29 @@ static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
RETiRet;
}
-static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out)
+
+static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out)
{
DEFiRet;
ASSERT(pThis != NULL);
- *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.head];
+ *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.deqhead];
+//MULTIdbgprintf("ULTRA qDeqFA, deqhead=%d head=%d, tail=%d\n", pThis->tVars.farray.deqhead, pThis->tVars.farray.head, pThis->tVars.farray.tail);
+ pThis->tVars.farray.deqhead++;
+ if (pThis->tVars.farray.deqhead == pThis->iMaxQueueSize)
+ pThis->tVars.farray.deqhead = 0;
+
+ RETiRet;
+}
+
+static rsRetVal qDelFixedArray(qqueue_t *pThis)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+//MULTIdbgprintf("ULTRA qDelFA, deqhead=%d head=%d, tail=%d\n", pThis->tVars.farray.deqhead, pThis->tVars.farray.head, pThis->tVars.farray.tail);
pThis->tVars.farray.head++;
if (pThis->tVars.farray.head == pThis->iMaxQueueSize)
pThis->tVars.farray.head = 0;
@@ -529,15 +635,13 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out)
static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr)
{
- DEFiRet;
qLinkedList_t *pEntry;
+ DEFiRet;
ASSERT(ppRoot != NULL);
ASSERT(ppLast != NULL);
- if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
+ CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))));
pEntry->pNext = NULL;
pEntry->pUsr = pUsr;
@@ -586,8 +690,9 @@ static rsRetVal qConstructLinkedList(qqueue_t *pThis)
ASSERT(pThis != NULL);
- pThis->tVars.linklist.pRoot = 0;
- pThis->tVars.linklist.pLast = 0;
+ pThis->tVars.linklist.pDeqRoot = NULL;
+ pThis->tVars.linklist.pDelRoot = NULL;
+ pThis->tVars.linklist.pLast = NULL;
qqueueChkIsDA(pThis);
@@ -610,15 +715,60 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
{
+ qLinkedList_t *pEntry;
+ DEFiRet;
+
+ CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))));
+
+ pEntry->pNext = NULL;
+ pEntry->pUsr = pUsr;
+
+ if(pThis->tVars.linklist.pDelRoot == NULL) {
+ pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry;
+ } else {
+ pThis->tVars.linklist.pLast->pNext = pEntry;
+ pThis->tVars.linklist.pLast = pEntry;
+ }
+
+ if(pThis->tVars.linklist.pDeqRoot == NULL) {
+ pThis->tVars.linklist.pDeqRoot = pEntry;
+ }
+RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot);
+
+finalize_it:
+ RETiRet;
+}
+
+
+static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr)
+{
+ qLinkedList_t *pEntry;
DEFiRet;
- iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
+
+RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot);
+ pEntry = pThis->tVars.linklist.pDeqRoot;
+ *ppUsr = pEntry->pUsr;
+ pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
+
RETiRet;
}
-static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr)
+
+static rsRetVal qDelLinkedList(qqueue_t *pThis)
{
+ qLinkedList_t *pEntry;
DEFiRet;
- iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
+
+ pEntry = pThis->tVars.linklist.pDelRoot;
+
+ if(pThis->tVars.linklist.pDelRoot == pThis->tVars.linklist.pLast) {
+ pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = NULL;
+ } else {
+ pThis->tVars.linklist.pDelRoot = pEntry->pNext;
+ }
+
+ free(pEntry);
+
RETiRet;
}
@@ -732,11 +882,29 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
/* and now the stream objects (some order as when persisted!) */
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
- CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF,
+ CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
- CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead));
+ CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDel));
+
+ /* we now need to take care of the Deq handle. It is not persisted, so we can create
+ * a virgin copy based on pReadDel. // TODO duplicat code, same as blow - single function!
+ */
+
+ CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDeq));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0));
+ CHKiRet(strmSetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000));
+ CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ));
+ CHKiRet(strmSetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDeq));
+
+ /* TODO: dirty, need stream methods --> */
+ pThis->tVars.disk.pReadDeq->iCurrFNum = pThis->tVars.disk.pReadDel->iCurrFNum;
+ pThis->tVars.disk.pReadDeq->iCurrOffs = pThis->tVars.disk.pReadDel->iCurrOffs;
+ /* <-- dirty, need stream methods :TODO */
+ CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDeq));
/* OK, we could successfully read the file, so we now can request that it be
* deleted when we are done with the persisted information.
@@ -787,17 +955,25 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite));
- CHKiRet(strmConstruct(&pThis->tVars.disk.pRead));
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
- CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
- CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000));
- CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ));
- CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR));
- CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead));
-
-
- CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix));
- CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDeq));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0));
+ CHKiRet(strmSetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000));
+ CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ));
+ CHKiRet(strmSetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDeq));
+
+ CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDel));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1));
+ CHKiRet(strmSetDir(pThis->tVars.disk.pReadDel, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000));
+ CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ));
+ CHKiRet(strmSetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDel));
+
+ CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(strmSetFName(pThis->tVars.disk.pReadDeq, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(strmSetFName(pThis->tVars.disk.pReadDel, pThis->pszFilePrefix, pThis->lenFilePrefix));
}
/* now we set (and overwrite in case of a persisted restart) some parameters which
@@ -806,7 +982,8 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
* ability to read existing queue files. -- rgerhards, 2008-01-12
*/
CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize));
- CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize));
+ CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pReadDeq, pThis->iMaxFileSize));
+ CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pReadDel, pThis->iMaxFileSize));
finalize_it:
RETiRet;
@@ -820,7 +997,8 @@ static rsRetVal qDestructDisk(qqueue_t *pThis)
ASSERT(pThis != NULL);
strmDestruct(&pThis->tVars.disk.pWrite);
- strmDestruct(&pThis->tVars.disk.pRead);
+ strmDestruct(&pThis->tVars.disk.pReadDeq);
+ strmDestruct(&pThis->tVars.disk.pReadDel);
RETiRet;
}
@@ -852,16 +1030,30 @@ finalize_it:
RETiRet;
}
-static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr)
+
+static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr)
{
DEFiRet;
+ CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL));
+
+finalize_it:
+ RETiRet;
+}
+
+
+static rsRetVal qDelDisk(qqueue_t *pThis)
+{
+ obj_t *pDummyObj; /* we need to deserialize it... */
+ DEFiRet;
+
int64 offsIn;
int64 offsOut;
- CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsIn));
- CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pRead, NULL, NULL));
- CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsOut));
+ CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn));
+ CHKiRet(obj.Deserialize(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL));
+ objDestruct(pDummyObj);
+ CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut));
/* This time it is a bit tricky: we free disk space only upon file deletion. So we need
* to keep track of what we have read until we get an out-offset that is lower than the
@@ -882,6 +1074,7 @@ finalize_it:
RETiRet;
}
+
/* -------------------- direct (no queueing) -------------------- */
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
{
@@ -920,7 +1113,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
}
-static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
+static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
{
return RS_RET_OK;
}
@@ -1023,7 +1216,8 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
if(pThis->iUngottenObjs > 0) {
iRet = GetUngottenObj(pThis, (obj_t**) pUsr);
} else {
- iRet = pThis->qDel(pThis, pUsr);
+ iRet = pThis->qDeq(pThis, pUsr);
+ // TODO: ULTRA iRet = pThis->qDel(pThis, pUsr);
ATOMIC_DEC(pThis->iQueueSize);
}
@@ -1290,18 +1484,21 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qConstruct = qConstructFixedArray;
pThis->qDestruct = qDestructFixedArray;
pThis->qAdd = qAddFixedArray;
+ pThis->qDeq = qDeqFixedArray;
pThis->qDel = qDelFixedArray;
break;
case QUEUETYPE_LINKEDLIST:
pThis->qConstruct = qConstructLinkedList;
pThis->qDestruct = qDestructLinkedList;
pThis->qAdd = qAddLinkedList;
- pThis->qDel = (rsRetVal (*)(qqueue_t*,void**)) qDelLinkedList;
+ pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
+ pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
pThis->qDestruct = qDestructDisk;
pThis->qAdd = qAddDisk;
+ pThis->qDeq = qDeqDisk;
pThis->qDel = qDelDisk;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
@@ -1390,8 +1587,68 @@ finalize_it:
}
+/* Finally remove n elements from the queue store.
+ */
+static inline rsRetVal
+DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
+{
+ int i;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+
+ /* now send delete request to storage driver */
+ for(i = 0 ; i < nElem ; ++i) {
+ pThis->qDel(pThis);
+ }
+
+ ++pThis->deqIDDel; /* one more batch dequeued */
+
+ RETiRet;
+}
+
+
+/* remove messages from the physical queue store that are fully processed. This is
+ * controlled via the to-delete list. We can only delete those elements, that are
+ * at the current physical tail of the queue. If the batch is from another position,
+ * we schedule it for deletion, but actual deletion will happen at a later call
+ * of this function here. We always delete as much as possible, which includes
+ * picking up things from the to-delete list.
+ */
+static inline rsRetVal
+DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
+{
+ toDeleteLst_t *pTdl;
+ qDeqID deqIDDel;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(pBatch != NULL);
+
+ pTdl = tdlPeek(pThis);
+ if(pTdl == NULL) {
+ DoDeleteBatchFromQStore(pThis, pBatch->nElem);
+ } else if(pBatch->deqID == pThis->deqIDDel) {
+ deqIDDel = pThis->deqIDDel;
+ pTdl = tdlPeek(pThis);
+ while(pTdl != NULL && deqIDDel == pTdl->deqID) {
+ DoDeleteBatchFromQStore(pThis, pTdl->nElem);
+ tdlPop(pThis);
+ ++deqIDDel;
+ pTdl = tdlPeek(pThis);
+ }
+ } else {
+ /* can not delete, insert into to-delete list */
+ CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem));
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
/* Delete a batch of processed user objects from the queue, which includes
- * destructing the objects themself. The pointer piRemainingQueu
+ * destructing the objects themself.
* rgerhards, 2009-05-13
*/
static inline rsRetVal
@@ -1401,30 +1658,31 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
void *pUsr;
DEFiRet;
- /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */
-
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
+// TODO: ULTRA: lock qaueue mutex if instructed to do so
- /* if the queue runs in DA mode, the DA worker already deleted the message. But
- * in regular mode, we need to do it ourselfs. We differentiate between the two cases,
- * because it is actually the easiest way to handle the destruct-Problem in a simple
- * and pUsrp-Type agnostic way (else we would need an objAddRef() generic function).
+ /* if the queue runs in DA mode, the DA worker already deleted the in-memory representation
+ * of the message. But in regular mode, we need to do it ourselfs. We differentiate between
+ * the two cases, because it is actually the easiest way to handle the destruct-Problem in
+ * a simple and pUsrp-Type agnostic way (else we would need an objAddRef() generic function).
*/
if(!pThis->bRunsDA) {
for(i = 0 ; i < pBatch->nElem ; ++i) {
- /* TODO: pull msgs off the queue (not yet necessary) */
pUsr = pBatch->pElem[i].pUsrp;
objDestruct(pUsr);
}
}
- pBatch->nElem = 0;
+
+ iRet = DeleteBatchFromQStore(pThis, pBatch);
+
+ pBatch->nElem = 0; /* reset batch */
RETiRet;
}
-/* dequeue as many user points as are available, until we hit the configured
+/* dequeue as many user pointers as are available, until we hit the configured
* upper limit of pointers.
* This must only be called when the queue mutex is LOOKED, otherwise serious
* malfunction will happen.
@@ -1463,6 +1721,7 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */
pWti->batch.nElem = nDequeued;
+ pWti->batch.deqID = getNextDeqID(pThis);
*piRemainingQueueSize = iQueueSize;
finalize_it:
@@ -1957,7 +2216,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
pThis->bNeedDelQIF = 0;
}
/* indicate spool file needs to be deleted */
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1));
FINALIZE; /* nothing left to do, so be happy */
}
@@ -1992,13 +2251,13 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
/* now persist the stream info */
CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
- CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF));
+ CHKiRet(strmSerialize(pThis->tVars.disk.pReadDel, psQIF));
/* tell the input file object that it must not delete the file on close if the queue
* is non-empty - but only if we are not during a simple checkpoint
*/
if(bIsCheckpoint != QUEUE_CHECKPOINT) {
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDel, 0));
}
/* we have persisted the queue object. So whenever it comes to an empty queue,