summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-10-26 12:21:07 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2009-10-26 12:21:07 +0100
commitda53802c96a59a990859706219398dce709ba1b3 (patch)
tree43242bbabf5fbb91d70caf1bc7a87cbfdf535298 /runtime
parent672c1b25d603006361836649c558777a1a872053 (diff)
downloadrsyslog-da53802c96a59a990859706219398dce709ba1b3.tar.gz
rsyslog-da53802c96a59a990859706219398dce709ba1b3.tar.xz
rsyslog-da53802c96a59a990859706219398dce709ba1b3.zip
implemented solution for cancel at shutdown/unprocessed entries
We do now enqueue those objects that are left unprocessed. This enables us to delete the full batch, what is exactly what we need to do.
Diffstat (limited to 'runtime')
-rw-r--r--runtime/batch.h2
-rw-r--r--runtime/queue.c58
2 files changed, 24 insertions, 36 deletions
diff --git a/runtime/batch.h b/runtime/batch.h
index 031718a7..2b3aa83e 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -34,7 +34,7 @@
typedef enum {
BATCH_STATE_RDY = 0, /* object ready for processing */
BATCH_STATE_BAD = 1, /* unrecoverable failure while processing, do NOT resubmit to same action */
- BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unkonwn */
+ BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unknown */
BATCH_STATE_COMM = 3, /* message successfully commited */
BATCH_STATE_DISC = 4, /* discarded - processed OK, but do not submit to any other action */
} batch_state_t;
diff --git a/runtime/queue.c b/runtime/queue.c
index d9dc599a..d1eefde6 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -70,6 +70,7 @@ DEFobjCurrIf(strm)
DEFobjCurrIf(errmsg)
/* forward-definitions */
+static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr);
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
@@ -1396,10 +1397,8 @@ finalize_it:
/* Delete a batch of processed user objects from the queue, which includes
- * destructing the objects themself. It is assumed that batches
- * are processed in sequential order, that is if we find one unprocessed entry,
- * that indicates the end of the delete operation. Note that this function MUST
- * be called only for non-empty batches!
+ * destructing the objects themself. Any entries not marked as finally
+ * processed are enqueued again. The new enqueue is necessary because we have a
* rgerhards, 2009-05-13
*/
static inline rsRetVal
@@ -1407,19 +1406,34 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
{
int i;
void *pUsr;
+ int nEnqueued = 0;
+ rsRetVal localRet;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state);
- for(i = 0 ; i < (pBatch->nElem) && (pBatch->pElem[i].state != BATCH_STATE_RDY); ++i) {
+ for(i = 0 ; i < pBatch->nElem ; ++i) {
dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state);
pUsr = pBatch->pElem[i].pUsrp;
+ if( pBatch->pElem[i].state == BATCH_STATE_RDY
+ || pBatch->pElem[i].state == BATCH_STATE_SUB) {
+RUNLOG_STR("we need to requeue the entry");
+ localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
+ (obj_t*)MsgAddRef((msg_t*) pUsr));
+ ++nEnqueued;
+ if(localRet != RS_RET_OK) {
+ DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
+ }
+ }
objDestruct(pUsr);
}
-dbgprintf("we deleted %d objects\n", i);
+dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued);
+
+ if(nEnqueued > 0)
+ qqueueChkPersist(pThis, nEnqueued);
if(i > 0)
iRet = DeleteBatchFromQStore(pThis, pBatch, i);
@@ -1735,7 +1749,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
* rgerhards, 2009-05-28
*/
dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg);
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY,
+ (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
+ pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
}
/* now we are done, but need to re-aquire the mutex */
@@ -1749,12 +1765,6 @@ finalize_it:
/* must only be called when the queue mutex is locked, else results
* are not stable!
- * If we are a child, we have done our duty when the queue is empty. In that case,
- * we can terminate.
- * Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from
- * the DA queue.
- * If our queue is in destruction, we drain to the DA queue and so we shall not terminate
- * until we have done so.
*/
static rsRetVal
qqueueChkStopWrkrDA(qqueue_t *pThis)
@@ -1763,28 +1773,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
if(pThis->bEnqOnly) {
iRet = RS_RET_TERMINATE_WHEN_IDLE;
-#if 0
- } else {
- if(pThis->bRunsDA) {
- ASSERT(pThis->pqDA != NULL);
- if( pThis->pqDA->bEnqOnly
- && pThis->pqDA->sizeOnDiskMax > 0
- && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
- /* this queue can never grow, so we can give up... */
- iRet = RS_RET_TERMINATE_NOW;
- } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
-dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk);
- iRet = RS_RET_TERMINATE_NOW;
-RUNLOG_STR("XXX: re-start reg worker");
-if(!pThis->bShutdownImmediate)
- qqueueAdviseMaxWorkers(pThis);
-RUNLOG_STR("XXX: done re-start reg worker");
- }
- } else {
- // experimental iRet = RS_RET_TERMINATE_NOW;
- ;
- }
-#endif
}
RETiRet;