diff options
authorRainer Gerhards <>2011-05-03 12:27:49 +0200
committerRainer Gerhards <>2011-05-03 12:27:49 +0200
commit1bfaf4ac06e82c0e4008a2b851043a09aee1a2e3 (patch)
parent415b95cf453403f726f654cbc03ef3984446a870 (diff)
better handling of queue i/o errors in disk queues.
This is kind of a bugfix, but a very intrusive one, thus it goes into the devel version first. Right now, "file not found" is handled and leads to the new emergency mode, in which disk action is stopped and the queue run in direct mode. An error message is emited if this happens.
4 files changed, 69 insertions, 7 deletions
diff --git a/ChangeLog b/ChangeLog
index d4fe37a6..dde6f77a 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -8,6 +8,11 @@ Version 5.9.0 [V5-DEVEL] (rgerhards), 2011-03-??
nobody still depends on it (and, if so, they lost...).
- bugfix: pipes not opened in full priv mode when privs are to be dropped
- this begins a new devel branch for v5
+- better handling of queue i/o errors in disk queues. This is kind of a
+ bugfix, but a very intrusive one, this it goes into the devel version
+ first. Right now, "file not found" is handled and leads to the new
+ emergency mode, in which disk action is stopped and the queue run
+ in direct mode. An error message is emited if this happens.
- added support for user-level PRI provided via systemd
- added new config directive $InputTCPFlowControl to select if tcp
received messages shall be flagged as light delayable or not.
diff --git a/runtime/queue.c b/runtime/queue.c
index 50ae307c..70f0ba0c 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -83,6 +83,11 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
+static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr);
+static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
+static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
+static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
+static rsRetVal qDestructDisk(qqueue_t *pThis);
/* some constants for queuePersist () */
@@ -583,6 +588,47 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis)
/* -------------------- disk -------------------- */
+/* The following function is used to "save" ourself from being killed by
+ * a fatally failed disk queue. A fatal failure is, for example, if no
+ * data can be read or written. In that case, the disk support is disabled,
+ * with all on-disk structures kept as-is as much as possible. Instead, the
+ * queue is switched to direct mode, so that at least
+ * some processing can happen. Of course, this may still have lots of
+ * undesired side-effects, but is probably better than aborting the
+ * syslogd. Note that this function *must* succeed in one way or another, as
+ * we can not recover from failure here. But it may emit different return
+ * states, which can trigger different processing in the higher layers.
+ * rgerhards, 2011-05-03
+ */
+static inline rsRetVal
+queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError)
+ pThis->iQueueSize = 0;
+ pThis->nLogDeq = 0;
+ qDestructDisk(pThis); /* free disk structures */
+ pThis->qType = QUEUETYPE_DIRECT;
+ pThis->qConstruct = qConstructDirect;
+ pThis->qDestruct = qDestructDirect;
+ pThis->qAdd = qAddDirect;
+ pThis->qDel = qDelDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ if(pThis->pqParent != NULL) {
+ DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n");
+ pThis->pqParent->bIsDA = 0;
+ pThis->pqParent->pqDA = NULL;
+ /* This may have undesired side effects, not sure if I really evaluated
+ * all. So you know where to look at if you come to this point during
+ * troubleshooting ;) -- rgerhards, 2011-05-03
+ */
+ }
+ errmsg.LogError(0, initiatingError, "fatal error on disk queue '%s', emergency switch to direct mode",
+ obj.GetName((obj_t*) pThis));
static rsRetVal
qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pThis)
@@ -785,10 +831,7 @@ finalize_it:
static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr)
- CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL));
+ iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL);
@@ -1683,7 +1726,18 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(DequeueForConsumer(pThis, pWti));
+ iRet = DequeueForConsumer(pThis, pWti);
+ if(iRet == RS_RET_FILE_NOT_FOUND) {
+ /* This is a fatal condition and means the queue is almost unusable */
+ d_pthread_mutex_unlock(pThis->mut);
+ DBGOPRINT((obj_t*) pThis, "got 'file not found' error %d, queue defunct\n", iRet);
+ iRet = queueSwitchToEmergencyMode(pThis, iRet);
+ // TODO: think about what to return as iRet -- keep RS_RET_FILE_NOT_FOUND?
+ d_pthread_mutex_lock(pThis->mut);
+ }
+ if (iRet != RS_RET_OK) {
+ }
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
@@ -1774,7 +1828,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
-//DBGPRINTF("XXXX: chkStopWrkrDA called, low watermark %d, phys Size %d\n", pThis->iLowWtrMrk, getPhysicalQueueSize(pThis));
if(pThis->bEnqOnly) {
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index d63dbe4f..d7f785f2 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -342,6 +342,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_ERR_HDFS_OPEN = -2179, /**< error during hdfsOpen (e.g. file does not exist) */
RS_RET_FILE_NOT_SPECIFIED = -2180, /**< file name not configured where this was required */
RS_RET_ERR_WRKDIR = -2181, /**< problems with the rsyslog working directory */
+ RS_RET_ERR_QUEUE_EMERGENCY = -2182, /**< some fatal error caused queue to switch to emergency mode */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
diff --git a/runtime/wti.c b/runtime/wti.c
index 9343f5c5..ec56c2d5 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -314,7 +314,10 @@ wtiWorker(wti_t *pThis)
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
- if(localRet == RS_RET_IDLE) {
+ if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) {
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
+ break; /* end of loop */
+ } else if(localRet == RS_RET_IDLE) {
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
break; /* end of loop */