summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-15 16:49:02 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-15 16:49:02 +0000
commitb0ea982f5802b020b9e35f37cfa3a4acacd048bb (patch)
tree26468d04fc6d9327e16d40b3d7042155e3ad00d4 /queue.c
parentbb7c2ef720618e4c7707013f732ef14ba751908c (diff)
downloadrsyslog-b0ea982f5802b020b9e35f37cfa3a4acacd048bb.tar.gz
rsyslog-b0ea982f5802b020b9e35f37cfa3a4acacd048bb.tar.xz
rsyslog-b0ea982f5802b020b9e35f37cfa3a4acacd048bb.zip
improved shutdown processing - in-memory queue is now drained to disk
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c36
1 files changed, 31 insertions, 5 deletions
diff --git a/queue.c b/queue.c
index a33c677a..c6f761e0 100644
--- a/queue.c
+++ b/queue.c
@@ -301,7 +301,8 @@ queueDAConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr)
dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, pThis->iQueueSize);
CHKiRet(queueEnqObj(pThis->pqDA, pUsr));
- if(pThis->iQueueSize == pThis->iLowWtrMrk) {
+ /* we check if we reached the low water mark, but only if we are not in shutdown mode */
+ if(pThis->iQueueSize == pThis->iLowWtrMrk && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n",
queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
@@ -363,20 +364,42 @@ queueStrtDA(queue_t *pThis)
CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
- CHKiRet(queueSettoQShutdown(pThis->pqDA, pThis->toQShutdown));
CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
+ if(pThis->toQShutdown == 0) {
+ CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
+ } else {
+ /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND
+ * have an obviously large backlog, we can't finish it in any case. So there is no point
+ * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15
+ */
+ CHKiRet(queueSettoQShutdown(pThis->pqDA, 1));
+ }
iRet = queueStart(pThis->pqDA);
/* file not found is expected, that means it is no previous QIF available */
if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
FINALIZE; /* something is wrong */
- /* tell our fellow workers to shut down */
+ /* tell our fellow workers to shut down
+ * NOTE: we do NOT join them by intension! If we did, we would hold draining
+ * the queue until some potentially long-running actions are finished. Having
+ * the ability to immediatly drain the queue was the primary intension of
+ * reserving worker thread 0 for DA queues. So if we would join the other
+ * workers, we would screw up and do against our design goal.
+ */
CHKiRet(queueTellWrkThrds(pThis, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE));
+ /* as we are right now starting DA mode because we are so busy, it is
+ * extremely unlikely that any worker is sleeping on empty queue. HOWEVER,
+ * we want to be on the safe side, and so we awake anyone that is waiting
+ * on one. So even if the scheduler plays badly with us, things should be
+ * quite well. -- rgerhards, 2008-01-15
+ */
+ pthread_cond_broadcast(pThis->notEmpty);
+
pThis->qRunsDA = QRUNS_DA; /* we are now in DA mode! */
dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n",
@@ -889,7 +912,6 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
/* awake them... */
pthread_cond_broadcast(pThis->notEmpty);
-dbgprintf("queueWrkThrdTrm broadcasted notEmpty\n");
if(pThis->qRunsDA != QRUNS_REGULAR) /* if running disk-assisted, workers may wait on that condition, too */
pthread_cond_broadcast(&pThis->condDA);
@@ -1079,9 +1101,13 @@ dbgprintf("CallConsumer returns %d\n", iRet);
}
-/* Each queue has one associated worker (consumer) thread. It will pull
+/* Each queue has at least one associated worker (consumer) thread. It will pull
* the message from the queue and pass it to a user-defined function.
* This function was provided on construction. It MUST be thread-safe.
+ * Worker thread 0 is always reserved for disk-assisted mode (if the queue
+ * is not DA, this worker will be dormant). All other workers are for
+ * regular operations mode. Workers are started and stopped as need arises.
+ * rgerhards, 2008-01-15
*/
static void *
queueWorker(void *arg)