diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-10-22 14:57:34 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-10-22 14:57:34 +0200 |
commit | 33e216daf7f89542cc6c91f1e97da6fdb71eecf8 (patch) | |
tree | f4b74d404f66234121cdd1f7fe1bba029975ce61 | |
parent | e04e1b50025f5fa9c26abd946190dce8f797d08f (diff) | |
download | rsyslog-33e216daf7f89542cc6c91f1e97da6fdb71eecf8.tar.gz rsyslog-33e216daf7f89542cc6c91f1e97da6fdb71eecf8.tar.xz rsyslog-33e216daf7f89542cc6c91f1e97da6fdb71eecf8.zip |
Begun to work on partial batch deletes...
... but this brings a lot of problems with it. The issue is that
we still have a sequential store and we do not know how we could
delete the one entry right in the middle of processing. I keep this
branch if we intend to move on with it - but for now I look into a
different solution...
-rw-r--r-- | action.c | 4 | ||||
-rw-r--r-- | runtime/queue.c | 30 | ||||
-rwxr-xr-x | tests/daqueue-persist-drvr.sh | 5 | ||||
-rwxr-xr-x | tests/diag.sh | 1 |
4 files changed, 28 insertions, 12 deletions
@@ -936,8 +936,8 @@ processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) CHKiRet(localRet); /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ - //for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { - for(i = 0 ; i < pBatch->nElem ; i++) { + for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { + //for(i = 0 ; i < pBatch->nElem ; i++) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; } iRet = finishBatch(pAction); diff --git a/runtime/queue.c b/runtime/queue.c index 4bbcc2b8..d9dc599a 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1362,7 +1362,7 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue * picking up things from the to-delete list. */ static inline rsRetVal -DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) +DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted) { toDeleteLst_t *pTdl; qDeqID deqIDDel; @@ -1370,10 +1370,11 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); + assert(nDeleted > 0); pTdl = tdlPeek(pThis); /* get current head element */ if(pTdl == NULL) { /* to-delete list empty */ - DoDeleteBatchFromQStore(pThis, pBatch->nElemDeq); + DoDeleteBatchFromQStore(pThis, nDeleted); } else if(pBatch->deqID == pThis->deqIDDel) { deqIDDel = pThis->deqIDDel; pTdl = tdlPeek(pThis); @@ -1386,7 +1387,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) } else { /* can not delete, insert into to-delete list */ dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); - CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElemDeq)); + CHKiRet(tdlAdd(pThis, pBatch->deqID, nDeleted)); } finalize_it: @@ -1395,7 +1396,10 @@ finalize_it: /* Delete a batch of processed user objects from the queue, which includes - * destructing the objects themself. + * destructing the objects themself. It is assumed that batches + * are processed in sequential order, that is if we find one unprocessed entry, + * that indicates the end of the delete operation. Note that this function MUST + * be called only for non-empty batches! * rgerhards, 2009-05-13 */ static inline rsRetVal @@ -1408,13 +1412,17 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); - for(i = 0 ; i < pBatch->nElem ; ++i) { +dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state); + for(i = 0 ; i < (pBatch->nElem) && (pBatch->pElem[i].state != BATCH_STATE_RDY); ++i) { dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state); pUsr = pBatch->pElem[i].pUsrp; objDestruct(pUsr); } - iRet = DeleteBatchFromQStore(pThis, pBatch); +dbgprintf("we deleted %d objects\n", i); + + if(i > 0) + iRet = DeleteBatchFromQStore(pThis, pBatch, i); pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ @@ -1423,7 +1431,11 @@ dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch /* dequeue as many user pointers as are available, until we hit the configured - * upper limit of pointers. + * upper limit of pointers. Note that this function also deletes all processed + * objects from the previous batch. However, it is perfectly valid that the + * previous batch contained NO objects at all. For example, this happens + * immediately after system startup or when a queue was exhausted and the queue + * worker needed to wait for new data. * This must only be called when the queue mutex is LOOKED, otherwise serious * malfunction will happen. */ @@ -1716,8 +1728,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) d_pthread_mutex_unlock(pThis->mut); /* iterate over returned results and enqueue them in DA queue */ - //for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { - for(i = 0 ; i < pWti->batch.nElem ; i++) { + for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { + //for(i = 0 ; i < pWti->batch.nElem ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh index 30a7c635..a4f4a73f 100755 --- a/tests/daqueue-persist-drvr.sh +++ b/tests/daqueue-persist-drvr.sh @@ -8,6 +8,9 @@ echo \[daqueue-persist-drvr.sh\]: testing memory daqueue persisting to disk, mode $1 source $srcdir/diag.sh init +export RSYSLOG_DEBUG="debug logfuncflow nostdout noprintmutexaction" +export RSYSLOG_DEBUGLOG="log" + # prepare config echo \$MainMsgQueueType $1 > work-queuemode.conf echo "*.* :omtesting:sleep 0 1000" > work-delay.conf @@ -21,7 +24,7 @@ source $srcdir/diag.sh check-mainq-spool echo "Enter phase 2, rsyslogd restart" -#exit +exit # restart engine and have rest processed #remove delay diff --git a/tests/diag.sh b/tests/diag.sh index ed2e66b2..d1bcbe44 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -23,6 +23,7 @@ case $1 in 'exit') rm -f rsyslogd.started work-*.conf diag-common.conf rm -f work rsyslog.out.log rsyslog.out.log.save # common work files rm -rf test-spool + echo ------------------------------------------------------------------------------- ;; 'startup') # start rsyslogd with default params. $2 is the config file name to use # returns only after successful startup |