summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-17 16:30:49 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-17 16:30:49 +0000
commite0df42e01467dfb70498821114b581c02184c70c (patch)
treed739e422c8f71d679c42a9137bfa92a3a3cf8a5d /queue.c
parented0363210c34002e5cfbab553506573f5b8a13a5 (diff)
downloadrsyslog-e0df42e01467dfb70498821114b581c02184c70c.tar.gz
rsyslog-e0df42e01467dfb70498821114b581c02184c70c.tar.xz
rsyslog-e0df42e01467dfb70498821114b581c02184c70c.zip
fixed sync issue on shutdown process if need to persist pure memory queue
to disk
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c40
1 files changed, 32 insertions, 8 deletions
diff --git a/queue.c b/queue.c
index 2b241d82..83d65a4f 100644
--- a/queue.c
+++ b/queue.c
@@ -123,7 +123,6 @@ queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads);
if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_CREATED) {
- dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx);
qWrkrSetState(&pThis->pWrkThrds[iIdx], tCmd);
} else {
dbgprintf("Queue 0x%lx: command %d NOT sent to inactive thread %d\n", queueGetID(pThis), tCmd, iIdx);
@@ -156,6 +155,34 @@ qWrkrConstructFinalize(qWrkThrd_t *pThis, queue_t *pQueue, int i)
}
+/* Waitis until the specified worker thread
+ * changed to full running state (aka have started up). This function
+ * MUST NOT be called while the queue mutex is locked as it does
+ * this itself. The wait is without timeout.
+ * rgerhards, 2008-01-17
+ */
+static inline rsRetVal
+qWrkrWaitStartup(qWrkThrd_t *pThis)
+{
+ int iCancelStateSave;
+
+ assert(pThis != NULL);
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ pthread_mutex_lock(pThis->pQueue->mut);
+ if((pThis->tCurrCmd == eWRKTHRD_RUN_CREATED) || (pThis->tCurrCmd == eWRKTHRD_RUN_CREATED)) {
+ dbgprintf("Queue 0x%lx: waiting on worker thread %d startup\n", queueGetID(pThis->pQueue),
+ pThis->iThrd);
+ pthread_cond_wait(&pThis->condInitDone, pThis->pQueue->mut);
+dbgprintf("startup done!\n");
+ }
+ pthread_mutex_unlock(pThis->pQueue->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+
+ return RS_RET_OK;
+}
+
+
/* initialize the qWrkThrd_t structure - this MUST be called right after
* startup of a worker thread. -- rgerhards, 2008-01-17
*/
@@ -1188,19 +1215,14 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
struct timespec t;
queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */
-dbgprintf("WrkThrdTrm 0\n");
queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
/* race: must make sure all are running! */
-dbgprintf("WrkThrdTrm 1\n");
queueTimeoutComp(&t, iTimeout);/* get timeout */
-dbgprintf("WrkThrdTrm 2\n");
/* and wait for their termination */
pthread_mutex_lock(pThis->mut);
bTimedOut = 0;
-dbgprintf("WrkThrdTrm 3, thrds: %d\n", pThis->iCurNumWrkThrd);
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
-dbgprintf("WrkThrdTrm 4 to %d\n", bTimedOut);
dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n",
queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd);
@@ -1236,11 +1258,12 @@ queueWrkThrdCancel(queue_t *pThis)
queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
/* first tell the workers our request */
- for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i)
+ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) {
dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i);
pthread_cancel(pThis->pWrkThrds[i].thrdID);
}
+ }
return iRet;
}
@@ -1843,8 +1866,9 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
queueGetID(pThis), pThis->iQueueSize);
pThis->iLowWtrMrk = 0; /* disable low water mark algo */
queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* start DA queue in enqueue-only mode */
+ qWrkrWaitStartup(QUEUE_PTR_DA_WORKER(pThis)); /* wait until DA worker has actually started */
pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL;
- queueShutdownWorkers(pThis);
+ queueShutdownWorkers(pThis); /* and tell it to shut down. The trick is it will run until q is drained */
}
/* if running DA, terminate disk queue */