diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-18 17:28:34 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-05-18 17:28:34 +0200 |
commit | 93f873277bfe5ebb309ff5e92f5dc7244ebd9f1a (patch) | |
tree | 0a1431d8ce318552e83cce38273015c5206155a3 /runtime/queue.c | |
parent | b81311ac70e4de0bd5c0b0286413ff1b527ef906 (diff) | |
download | rsyslog-93f873277bfe5ebb309ff5e92f5dc7244ebd9f1a.tar.gz rsyslog-93f873277bfe5ebb309ff5e92f5dc7244ebd9f1a.tar.xz rsyslog-93f873277bfe5ebb309ff5e92f5dc7244ebd9f1a.zip |
t-delete list implemented, queue store drivers updated...
... on the way to the ultra-reliable queue modes (redesign doc). This
version does not really work, but is a good commit point. Next comes
queue size calculation. DA mode does not yet work.
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 353 |
1 files changed, 306 insertions, 47 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 6bea338a..dc399066 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -79,6 +79,93 @@ static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex); #define QUEUE_CHECKPOINT 1 #define QUEUE_NO_CHECKPOINT 0 +/*********************************************************************** + * we need a private data structure, the "to-delete" list. As C does + * not provide any partly private data structures, we implement this + * structure right here inside the module. + * Note that this list must always be kept sorted based on a unique + * dequeue ID (which is monotonically increasing). + * rgerhards, 2009-05-18 + ***********************************************************************/ + +/* generate next uniqueue dequeue ID. Note that uniqueness is only required + * on a per-queue basis and while this instance runs. So a stricly monotonically + * increasing counter is sufficient (if enough bits are used). + */ +static inline qDeqID getNextDeqID(qqueue_t *pQueue) +{ + ISOBJ_TYPE_assert(pQueue, qqueue); + return pQueue->deqIDAdd++; +} + + +/* return the top element of the to-delete list or NULL, if the + * list is empty. + */ +static inline toDeleteLst_t *tdlPeek(qqueue_t *pQueue) +{ + ISOBJ_TYPE_assert(pQueue, qqueue); + return pQueue->toDeleteLst; +} + + +/* remove the top element of the to-delete list. Nothing but the + * element itself is destroyed. Must not be called when the list + * is empty. + */ +static inline rsRetVal tdlPop(qqueue_t *pQueue) +{ + toDeleteLst_t *pRemove; + DEFiRet; + + ISOBJ_TYPE_assert(pQueue, qqueue); + assert(pQueue->toDeleteLst != NULL); + + pRemove = pQueue->toDeleteLst; + pQueue->toDeleteLst = pQueue->toDeleteLst->pNext; + free(pRemove); + + RETiRet; +} + + +/* Add a new to-delete list entry. The function allocates the data + * structure, populates it with the values provided and links the new + * element into the correct place inside the list. + */ +static inline rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElem) +{ + toDeleteLst_t *pNew; + toDeleteLst_t *pPrev; + DEFiRet; + + ISOBJ_TYPE_assert(pQueue, qqueue); + assert(pQueue->toDeleteLst != NULL); + + CHKmalloc(pNew = malloc(sizeof(toDeleteLst_t))); + pNew->deqID = deqID; + pNew->nElem = nElem; + + /* now find right spot */ + for( pPrev = pQueue->toDeleteLst + ; pPrev != NULL && deqID > pPrev->deqID + ; pPrev = pPrev->pNext) { + /*JUST SEARCH*/; + } + + if(pPrev == NULL) { + pNew->pNext = pQueue->toDeleteLst; + pQueue->toDeleteLst = pNew; + } else { + pNew->pNext = pPrev->pNext; + pPrev->pNext = pNew; + } + +finalize_it: + RETiRet; +} + + /* methods */ @@ -114,7 +201,9 @@ static inline void queueDrain(qqueue_t *pThis) /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(pThis->iQueueSize-- > 0) { - pThis->qDel(pThis, &pUsr); + pThis->qDeq(pThis, &pUsr); +// TODO: ULTRA + //pThis->qDel(pThis, &pUsr); if(pUsr != NULL) { objDestruct(pUsr); } @@ -472,6 +561,7 @@ static rsRetVal qConstructFixedArray(qqueue_t *pThis) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } + pThis->tVars.farray.deqhead = 0; pThis->tVars.farray.head = 0; pThis->tVars.farray.tail = 0; @@ -508,13 +598,29 @@ static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) RETiRet; } -static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out) + +static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out) { DEFiRet; ASSERT(pThis != NULL); - *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.head]; + *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.deqhead]; +//MULTIdbgprintf("ULTRA qDeqFA, deqhead=%d head=%d, tail=%d\n", pThis->tVars.farray.deqhead, pThis->tVars.farray.head, pThis->tVars.farray.tail); + pThis->tVars.farray.deqhead++; + if (pThis->tVars.farray.deqhead == pThis->iMaxQueueSize) + pThis->tVars.farray.deqhead = 0; + + RETiRet; +} + +static rsRetVal qDelFixedArray(qqueue_t *pThis) +{ + DEFiRet; + + ASSERT(pThis != NULL); + +//MULTIdbgprintf("ULTRA qDelFA, deqhead=%d head=%d, tail=%d\n", pThis->tVars.farray.deqhead, pThis->tVars.farray.head, pThis->tVars.farray.tail); pThis->tVars.farray.head++; if (pThis->tVars.farray.head == pThis->iMaxQueueSize) pThis->tVars.farray.head = 0; @@ -529,15 +635,13 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out) static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr) { - DEFiRet; qLinkedList_t *pEntry; + DEFiRet; ASSERT(ppRoot != NULL); ASSERT(ppLast != NULL); - if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) { - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } + CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t)))); pEntry->pNext = NULL; pEntry->pUsr = pUsr; @@ -586,8 +690,9 @@ static rsRetVal qConstructLinkedList(qqueue_t *pThis) ASSERT(pThis != NULL); - pThis->tVars.linklist.pRoot = 0; - pThis->tVars.linklist.pLast = 0; + pThis->tVars.linklist.pDeqRoot = NULL; + pThis->tVars.linklist.pDelRoot = NULL; + pThis->tVars.linklist.pLast = NULL; qqueueChkIsDA(pThis); @@ -610,15 +715,60 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) { + qLinkedList_t *pEntry; + DEFiRet; + + CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t)))); + + pEntry->pNext = NULL; + pEntry->pUsr = pUsr; + + if(pThis->tVars.linklist.pDelRoot == NULL) { + pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry; + } else { + pThis->tVars.linklist.pLast->pNext = pEntry; + pThis->tVars.linklist.pLast = pEntry; + } + + if(pThis->tVars.linklist.pDeqRoot == NULL) { + pThis->tVars.linklist.pDeqRoot = pEntry; + } +RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot); + +finalize_it: + RETiRet; +} + + +static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr) +{ + qLinkedList_t *pEntry; DEFiRet; - iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr); + +RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot); + pEntry = pThis->tVars.linklist.pDeqRoot; + *ppUsr = pEntry->pUsr; + pThis->tVars.linklist.pDeqRoot = pEntry->pNext; + RETiRet; } -static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr) + +static rsRetVal qDelLinkedList(qqueue_t *pThis) { + qLinkedList_t *pEntry; DEFiRet; - iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr); + + pEntry = pThis->tVars.linklist.pDelRoot; + + if(pThis->tVars.linklist.pDelRoot == pThis->tVars.linklist.pLast) { + pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = NULL; + } else { + pThis->tVars.linklist.pDelRoot = pEntry->pNext; + } + + free(pEntry); + RETiRet; } @@ -732,11 +882,29 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) /* and now the stream objects (some order as when persisted!) */ CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF, (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); - CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF, + CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF, (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite)); - CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead)); + CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDel)); + + /* we now need to take care of the Deq handle. It is not persisted, so we can create + * a virgin copy based on pReadDel. // TODO duplicat code, same as blow - single function! + */ + + CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDeq)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); + CHKiRet(strmSetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000)); + CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ)); + CHKiRet(strmSetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR)); + CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDeq)); + + /* TODO: dirty, need stream methods --> */ + pThis->tVars.disk.pReadDeq->iCurrFNum = pThis->tVars.disk.pReadDel->iCurrFNum; + pThis->tVars.disk.pReadDeq->iCurrOffs = pThis->tVars.disk.pReadDel->iCurrOffs; + /* <-- dirty, need stream methods :TODO */ + CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDeq)); /* OK, we could successfully read the file, so we now can request that it be * deleted when we are done with the persisted information. @@ -787,17 +955,25 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite)); - CHKiRet(strmConstruct(&pThis->tVars.disk.pRead)); - CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); - CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); - CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000)); - CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ)); - CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR)); - CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead)); - - - CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix)); - CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDeq)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); + CHKiRet(strmSetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000)); + CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ)); + CHKiRet(strmSetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR)); + CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDeq)); + + CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDel)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1)); + CHKiRet(strmSetDir(pThis->tVars.disk.pReadDel, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000)); + CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ)); + CHKiRet(strmSetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR)); + CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDel)); + + CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(strmSetFName(pThis->tVars.disk.pReadDeq, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(strmSetFName(pThis->tVars.disk.pReadDel, pThis->pszFilePrefix, pThis->lenFilePrefix)); } /* now we set (and overwrite in case of a persisted restart) some parameters which @@ -806,7 +982,8 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) * ability to read existing queue files. -- rgerhards, 2008-01-12 */ CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize)); - CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize)); + CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pReadDeq, pThis->iMaxFileSize)); + CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pReadDel, pThis->iMaxFileSize)); finalize_it: RETiRet; @@ -820,7 +997,8 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) ASSERT(pThis != NULL); strmDestruct(&pThis->tVars.disk.pWrite); - strmDestruct(&pThis->tVars.disk.pRead); + strmDestruct(&pThis->tVars.disk.pReadDeq); + strmDestruct(&pThis->tVars.disk.pReadDel); RETiRet; } @@ -852,16 +1030,30 @@ finalize_it: RETiRet; } -static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr) + +static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr) { DEFiRet; + CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL)); + +finalize_it: + RETiRet; +} + + +static rsRetVal qDelDisk(qqueue_t *pThis) +{ + obj_t *pDummyObj; /* we need to deserialize it... */ + DEFiRet; + int64 offsIn; int64 offsOut; - CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsIn)); - CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pRead, NULL, NULL)); - CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsOut)); + CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn)); + CHKiRet(obj.Deserialize(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL)); + objDestruct(pDummyObj); + CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut)); /* This time it is a bit tricky: we free disk space only upon file deletion. So we need * to keep track of what we have read until we get an out-offset that is lower than the @@ -882,6 +1074,7 @@ finalize_it: RETiRet; } + /* -------------------- direct (no queueing) -------------------- */ static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis) { @@ -920,7 +1113,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) } -static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) +static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) { return RS_RET_OK; } @@ -1023,7 +1216,8 @@ qqueueDel(qqueue_t *pThis, void *pUsr) if(pThis->iUngottenObjs > 0) { iRet = GetUngottenObj(pThis, (obj_t**) pUsr); } else { - iRet = pThis->qDel(pThis, pUsr); + iRet = pThis->qDeq(pThis, pUsr); + // TODO: ULTRA iRet = pThis->qDel(pThis, pUsr); ATOMIC_DEC(pThis->iQueueSize); } @@ -1290,18 +1484,21 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qConstruct = qConstructFixedArray; pThis->qDestruct = qDestructFixedArray; pThis->qAdd = qAddFixedArray; + pThis->qDeq = qDeqFixedArray; pThis->qDel = qDelFixedArray; break; case QUEUETYPE_LINKEDLIST: pThis->qConstruct = qConstructLinkedList; pThis->qDestruct = qDestructLinkedList; pThis->qAdd = qAddLinkedList; - pThis->qDel = (rsRetVal (*)(qqueue_t*,void**)) qDelLinkedList; + pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; + pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; pThis->qDestruct = qDestructDisk; pThis->qAdd = qAddDisk; + pThis->qDeq = qDeqDisk; pThis->qDel = qDelDisk; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ @@ -1390,8 +1587,68 @@ finalize_it: } +/* Finally remove n elements from the queue store. + */ +static inline rsRetVal +DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) +{ + int i; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + + /* now send delete request to storage driver */ + for(i = 0 ; i < nElem ; ++i) { + pThis->qDel(pThis); + } + + ++pThis->deqIDDel; /* one more batch dequeued */ + + RETiRet; +} + + +/* remove messages from the physical queue store that are fully processed. This is + * controlled via the to-delete list. We can only delete those elements, that are + * at the current physical tail of the queue. If the batch is from another position, + * we schedule it for deletion, but actual deletion will happen at a later call + * of this function here. We always delete as much as possible, which includes + * picking up things from the to-delete list. + */ +static inline rsRetVal +DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) +{ + toDeleteLst_t *pTdl; + qDeqID deqIDDel; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pBatch != NULL); + + pTdl = tdlPeek(pThis); + if(pTdl == NULL) { + DoDeleteBatchFromQStore(pThis, pBatch->nElem); + } else if(pBatch->deqID == pThis->deqIDDel) { + deqIDDel = pThis->deqIDDel; + pTdl = tdlPeek(pThis); + while(pTdl != NULL && deqIDDel == pTdl->deqID) { + DoDeleteBatchFromQStore(pThis, pTdl->nElem); + tdlPop(pThis); + ++deqIDDel; + pTdl = tdlPeek(pThis); + } + } else { + /* can not delete, insert into to-delete list */ + CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem)); + } + +finalize_it: + RETiRet; +} + + /* Delete a batch of processed user objects from the queue, which includes - * destructing the objects themself. The pointer piRemainingQueu + * destructing the objects themself. * rgerhards, 2009-05-13 */ static inline rsRetVal @@ -1401,30 +1658,31 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) void *pUsr; DEFiRet; - /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */ - ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); +// TODO: ULTRA: lock qaueue mutex if instructed to do so - /* if the queue runs in DA mode, the DA worker already deleted the message. But - * in regular mode, we need to do it ourselfs. We differentiate between the two cases, - * because it is actually the easiest way to handle the destruct-Problem in a simple - * and pUsrp-Type agnostic way (else we would need an objAddRef() generic function). + /* if the queue runs in DA mode, the DA worker already deleted the in-memory representation + * of the message. But in regular mode, we need to do it ourselfs. We differentiate between + * the two cases, because it is actually the easiest way to handle the destruct-Problem in + * a simple and pUsrp-Type agnostic way (else we would need an objAddRef() generic function). */ if(!pThis->bRunsDA) { for(i = 0 ; i < pBatch->nElem ; ++i) { - /* TODO: pull msgs off the queue (not yet necessary) */ pUsr = pBatch->pElem[i].pUsrp; objDestruct(pUsr); } } - pBatch->nElem = 0; + + iRet = DeleteBatchFromQStore(pThis, pBatch); + + pBatch->nElem = 0; /* reset batch */ RETiRet; } -/* dequeue as many user points as are available, until we hit the configured +/* dequeue as many user pointers as are available, until we hit the configured * upper limit of pointers. * This must only be called when the queue mutex is LOOKED, otherwise serious * malfunction will happen. @@ -1463,6 +1721,7 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */ pWti->batch.nElem = nDequeued; + pWti->batch.deqID = getNextDeqID(pThis); *piRemainingQueueSize = iQueueSize; finalize_it: @@ -1957,7 +2216,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) pThis->bNeedDelQIF = 0; } /* indicate spool file needs to be deleted */ - CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1)); FINALIZE; /* nothing left to do, so be happy */ } @@ -1992,13 +2251,13 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) /* now persist the stream info */ CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF)); - CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF)); + CHKiRet(strmSerialize(pThis->tVars.disk.pReadDel, psQIF)); /* tell the input file object that it must not delete the file on close if the queue * is non-empty - but only if we are not during a simple checkpoint */ if(bIsCheckpoint != QUEUE_CHECKPOINT) { - CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDel, 0)); } /* we have persisted the queue object. So whenever it comes to an empty queue, |