summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-10-22 14:57:34 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-10-22 14:57:34 +0200
commit33e216daf7f89542cc6c91f1e97da6fdb71eecf8 (patch)
treef4b74d404f66234121cdd1f7fe1bba029975ce61 /runtime
parente04e1b50025f5fa9c26abd946190dce8f797d08f (diff)
downloadrsyslog-33e216daf7f89542cc6c91f1e97da6fdb71eecf8.tar.gz
rsyslog-33e216daf7f89542cc6c91f1e97da6fdb71eecf8.tar.xz
rsyslog-33e216daf7f89542cc6c91f1e97da6fdb71eecf8.zip
Begun to work on partial batch deletes...
... but this brings a lot of problems with it. The issue is that we still have a sequential store and we do not know how we could delete the one entry right in the middle of processing. I keep this branch if we intend to move on with it - but for now I look into a different solution...
Diffstat (limited to 'runtime')
-rw-r--r--runtime/queue.c30
1 files changed, 21 insertions, 9 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 4bbcc2b8..d9dc599a 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1362,7 +1362,7 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue
* picking up things from the to-delete list.
*/
static inline rsRetVal
-DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
+DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
{
toDeleteLst_t *pTdl;
qDeqID deqIDDel;
@@ -1370,10 +1370,11 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
+ assert(nDeleted > 0);
pTdl = tdlPeek(pThis); /* get current head element */
if(pTdl == NULL) { /* to-delete list empty */
- DoDeleteBatchFromQStore(pThis, pBatch->nElemDeq);
+ DoDeleteBatchFromQStore(pThis, nDeleted);
} else if(pBatch->deqID == pThis->deqIDDel) {
deqIDDel = pThis->deqIDDel;
pTdl = tdlPeek(pThis);
@@ -1386,7 +1387,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
} else {
/* can not delete, insert into to-delete list */
dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID);
- CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElemDeq));
+ CHKiRet(tdlAdd(pThis, pBatch->deqID, nDeleted));
}
finalize_it:
@@ -1395,7 +1396,10 @@ finalize_it:
/* Delete a batch of processed user objects from the queue, which includes
- * destructing the objects themself.
+ * destructing the objects themself. It is assumed that batches
+ * are processed in sequential order, that is if we find one unprocessed entry,
+ * that indicates the end of the delete operation. Note that this function MUST
+ * be called only for non-empty batches!
* rgerhards, 2009-05-13
*/
static inline rsRetVal
@@ -1408,13 +1412,17 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
- for(i = 0 ; i < pBatch->nElem ; ++i) {
+dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state);
+ for(i = 0 ; i < (pBatch->nElem) && (pBatch->pElem[i].state != BATCH_STATE_RDY); ++i) {
dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state);
pUsr = pBatch->pElem[i].pUsrp;
objDestruct(pUsr);
}
- iRet = DeleteBatchFromQStore(pThis, pBatch);
+dbgprintf("we deleted %d objects\n", i);
+
+ if(i > 0)
+ iRet = DeleteBatchFromQStore(pThis, pBatch, i);
pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
@@ -1423,7 +1431,11 @@ dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch
/* dequeue as many user pointers as are available, until we hit the configured
- * upper limit of pointers.
+ * upper limit of pointers. Note that this function also deletes all processed
+ * objects from the previous batch. However, it is perfectly valid that the
+ * previous batch contained NO objects at all. For example, this happens
+ * immediately after system startup or when a queue was exhausted and the queue
+ * worker needed to wait for new data.
* This must only be called when the queue mutex is LOOKED, otherwise serious
* malfunction will happen.
*/
@@ -1716,8 +1728,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
d_pthread_mutex_unlock(pThis->mut);
/* iterate over returned results and enqueue them in DA queue */
- //for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
- for(i = 0 ; i < pWti->batch.nElem ; i++) {
+ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
+ //for(i = 0 ; i < pWti->batch.nElem ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28