summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-10-26 12:21:07 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2009-10-26 12:21:07 +0100
commitda53802c96a59a990859706219398dce709ba1b3 (patch)
tree43242bbabf5fbb91d70caf1bc7a87cbfdf535298
parent672c1b25d603006361836649c558777a1a872053 (diff)
downloadrsyslog-da53802c96a59a990859706219398dce709ba1b3.tar.gz
rsyslog-da53802c96a59a990859706219398dce709ba1b3.tar.xz
rsyslog-da53802c96a59a990859706219398dce709ba1b3.zip
implemented solution for cancel at shutdown/unprocessed entries
We do now enqueue those objects that are left unprocessed. This enables us to delete the full batch, what is exactly what we need to do.
-rw-r--r--runtime/batch.h2
-rw-r--r--runtime/queue.c58
-rwxr-xr-xtests/daqueue-persist-drvr.sh4
3 files changed, 26 insertions, 38 deletions
diff --git a/runtime/batch.h b/runtime/batch.h
index 031718a7..2b3aa83e 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -34,7 +34,7 @@
typedef enum {
BATCH_STATE_RDY = 0, /* object ready for processing */
BATCH_STATE_BAD = 1, /* unrecoverable failure while processing, do NOT resubmit to same action */
- BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unkonwn */
+ BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unknown */
BATCH_STATE_COMM = 3, /* message successfully commited */
BATCH_STATE_DISC = 4, /* discarded - processed OK, but do not submit to any other action */
} batch_state_t;
diff --git a/runtime/queue.c b/runtime/queue.c
index d9dc599a..d1eefde6 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -70,6 +70,7 @@ DEFobjCurrIf(strm)
DEFobjCurrIf(errmsg)
/* forward-definitions */
+static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr);
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
@@ -1396,10 +1397,8 @@ finalize_it:
/* Delete a batch of processed user objects from the queue, which includes
- * destructing the objects themself. It is assumed that batches
- * are processed in sequential order, that is if we find one unprocessed entry,
- * that indicates the end of the delete operation. Note that this function MUST
- * be called only for non-empty batches!
+ * destructing the objects themself. Any entries not marked as finally
+ * processed are enqueued again. The new enqueue is necessary because we have a
* rgerhards, 2009-05-13
*/
static inline rsRetVal
@@ -1407,19 +1406,34 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
{
int i;
void *pUsr;
+ int nEnqueued = 0;
+ rsRetVal localRet;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state);
- for(i = 0 ; i < (pBatch->nElem) && (pBatch->pElem[i].state != BATCH_STATE_RDY); ++i) {
+ for(i = 0 ; i < pBatch->nElem ; ++i) {
dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state);
pUsr = pBatch->pElem[i].pUsrp;
+ if( pBatch->pElem[i].state == BATCH_STATE_RDY
+ || pBatch->pElem[i].state == BATCH_STATE_SUB) {
+RUNLOG_STR("we need to requeue the entry");
+ localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
+ (obj_t*)MsgAddRef((msg_t*) pUsr));
+ ++nEnqueued;
+ if(localRet != RS_RET_OK) {
+ DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
+ }
+ }
objDestruct(pUsr);
}
-dbgprintf("we deleted %d objects\n", i);
+dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued);
+
+ if(nEnqueued > 0)
+ qqueueChkPersist(pThis, nEnqueued);
if(i > 0)
iRet = DeleteBatchFromQStore(pThis, pBatch, i);
@@ -1735,7 +1749,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
* rgerhards, 2009-05-28
*/
dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg);
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY,
+ (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
+ pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
}
/* now we are done, but need to re-aquire the mutex */
@@ -1749,12 +1765,6 @@ finalize_it:
/* must only be called when the queue mutex is locked, else results
* are not stable!
- * If we are a child, we have done our duty when the queue is empty. In that case,
- * we can terminate.
- * Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from
- * the DA queue.
- * If our queue is in destruction, we drain to the DA queue and so we shall not terminate
- * until we have done so.
*/
static rsRetVal
qqueueChkStopWrkrDA(qqueue_t *pThis)
@@ -1763,28 +1773,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
if(pThis->bEnqOnly) {
iRet = RS_RET_TERMINATE_WHEN_IDLE;
-#if 0
- } else {
- if(pThis->bRunsDA) {
- ASSERT(pThis->pqDA != NULL);
- if( pThis->pqDA->bEnqOnly
- && pThis->pqDA->sizeOnDiskMax > 0
- && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
- /* this queue can never grow, so we can give up... */
- iRet = RS_RET_TERMINATE_NOW;
- } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
-dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk);
- iRet = RS_RET_TERMINATE_NOW;
-RUNLOG_STR("XXX: re-start reg worker");
-if(!pThis->bShutdownImmediate)
- qqueueAdviseMaxWorkers(pThis);
-RUNLOG_STR("XXX: done re-start reg worker");
- }
- } else {
- // experimental iRet = RS_RET_TERMINATE_NOW;
- ;
- }
-#endif
}
RETiRet;
diff --git a/tests/daqueue-persist-drvr.sh b/tests/daqueue-persist-drvr.sh
index a4f4a73f..7934eb2b 100755
--- a/tests/daqueue-persist-drvr.sh
+++ b/tests/daqueue-persist-drvr.sh
@@ -8,8 +8,8 @@
echo \[daqueue-persist-drvr.sh\]: testing memory daqueue persisting to disk, mode $1
source $srcdir/diag.sh init
-export RSYSLOG_DEBUG="debug logfuncflow nostdout noprintmutexaction"
-export RSYSLOG_DEBUGLOG="log"
+#export RSYSLOG_DEBUG="debug nologfuncflow nostdout noprintmutexaction"
+#export RSYSLOG_DEBUGLOG="log"
# prepare config
echo \$MainMsgQueueType $1 > work-queuemode.conf