summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-10-22 14:57:34 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-10-22 14:57:34 +0200
commit33e216daf7f89542cc6c91f1e97da6fdb71eecf8 (patch)
treef4b74d404f66234121cdd1f7fe1bba029975ce61
parente04e1b50025f5fa9c26abd946190dce8f797d08f (diff)
downloadrsyslog-33e216daf7f89542cc6c91f1e97da6fdb71eecf8.tar.gz
rsyslog-33e216daf7f89542cc6c91f1e97da6fdb71eecf8.tar.xz
rsyslog-33e216daf7f89542cc6c91f1e97da6fdb71eecf8.zip
Begun to work on partial batch deletes...
... but this brings a lot of problems with it. The issue is that we still have a sequential store and we do not know how we could delete the one entry right in the middle of processing. I keep this branch if we intend to move on with it - but for now I look into a different solution...
-rw-r--r--action.c4
-rw-r--r--runtime/queue.c30
-rwxr-xr-xtests/daqueue-persist-drvr.sh5
-rwxr-xr-xtests/diag.sh1
4 files changed, 28 insertions, 12 deletions
diff --git a/action.c b/action.c
index 58658ac1..3439f123 100644
--- a/action.c
+++ b/action.c
@@ -936,8 +936,8 @@ processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
CHKiRet(localRet);
/* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */
- //for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
- for(i = 0 ; i < pBatch->nElem ; i++) {
+ for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
+ //for(i = 0 ; i < pBatch->nElem ; i++) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
}
iRet = finishBatch(pAction);
diff --git a/runtime/queue.c b/runtime/queue.c
index 4bbcc2b8..d9dc599a 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1362,7 +1362,7 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue
* picking up things from the to-delete list.
*/
static inline rsRetVal
-DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
+DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted)
{
toDeleteLst_t *pTdl;
qDeqID deqIDDel;
@@ -1370,10 +1370,11 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
+ assert(nDeleted > 0);
pTdl = tdlPeek(pThis); /* get current head element */
if(pTdl == NULL) { /* to-delete list empty */
- DoDeleteBatchFromQStore(pThis, pBatch->nElemDeq);
+ DoDeleteBatchFromQStore(pThis, nDeleted);
} else if(pBatch->deqID == pThis->deqIDDel) {
deqIDDel = pThis->deqIDDel;
pTdl = tdlPeek(pThis);
@@ -1386,7 +1387,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
} else {
/* can not delete, insert into to-delete list */
dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID);
- CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElemDeq));
+ CHKiRet(tdlAdd(pThis, pBatch->deqID, nDeleted));
}
finalize_it:
@@ -1395,7 +1396,10 @@ finalize_it:
/* Delete a batch of processed user objects from the queue, which includes
- * destructing the objects themself.
+ * destructing the objects themself. It is assumed that batches
+ * are processed in sequential order, that is if we find one unprocessed entry,
+ * that indicates the end of the delete operation. Note that this function MUST
+ * be called only for non-empty batches!
* rgerhards, 2009-05-13
*/
static inline rsRetVal
@@ -1408,13 +1412,17 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
- for(i = 0 ; i < pBatch->nElem ; ++i) {
+dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state);
+ for(i = 0 ; i < (pBatch->nElem) && (pBatch->pElem[i].state != BATCH_STATE_RDY); ++i) {
dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state);
pUsr = pBatch->pElem[i].pUsrp;
objDestruct(pUsr);
}
- iRet = DeleteBatchFromQStore(pThis, pBatch);
+dbgprintf("we deleted %d objects\n", i);
+
+ if(i > 0)
+ iRet = DeleteBatchFromQStore(pThis, pBatch, i);
pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
@@ -1423,7 +1431,11 @@ dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch
/* dequeue as many user pointers as are available, until we hit the configured
- * upper limit of pointers.
+ * upper limit of pointers. Note that this function also deletes all processed
+ * objects from the previous batch. However, it is perfectly valid that the
+ * previous batch contained NO objects at all. For example, this happens
+ * immediately after system startup or when a queue was exhausted and the queue
+ * worker needed to wait for new data.
* This must only be called when the queue mutex is LOOKED, otherwise serious
* malfunction will happen.
*/
@@ -1716,8 +1728,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
d_pthread_mutex_unlock(pThis->mut);
/* iterate over returned results and enqueue them in DA queue */
- //for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
- for(i = 0 ; i < pWti->batch.nElem ; i++) {
+ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
+ //for(i = 0 ; i < pWti->batch.nElem ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28
diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh
index 30a7c635..a4f4a73f 100755
--- a/tests/daqueue-persist-drvr.sh
+++ b/tests/daqueue-persist-drvr.sh
@@ -8,6 +8,9 @@
echo \[daqueue-persist-drvr.sh\]: testing memory daqueue persisting to disk, mode $1
source $srcdir/diag.sh init
+export RSYSLOG_DEBUG="debug logfuncflow nostdout noprintmutexaction"
+export RSYSLOG_DEBUGLOG="log"
+
# prepare config
echo \$MainMsgQueueType $1 > work-queuemode.conf
echo "*.* :omtesting:sleep 0 1000" > work-delay.conf
@@ -21,7 +24,7 @@ source $srcdir/diag.sh check-mainq-spool
echo "Enter phase 2, rsyslogd restart"
-#exit
+exit
# restart engine and have rest processed
#remove delay
diff --git a/tests/diag.sh b/tests/diag.sh
index ed2e66b2..d1bcbe44 100755
--- a/tests/diag.sh
+++ b/tests/diag.sh
@@ -23,6 +23,7 @@ case $1 in
'exit') rm -f rsyslogd.started work-*.conf diag-common.conf
rm -f work rsyslog.out.log rsyslog.out.log.save # common work files
rm -rf test-spool
+ echo -------------------------------------------------------------------------------
;;
'startup') # start rsyslogd with default params. $2 is the config file name to use
# returns only after successful startup