diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-17 16:30:49 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-17 16:30:49 +0000 |
commit | e0df42e01467dfb70498821114b581c02184c70c (patch) | |
tree | d739e422c8f71d679c42a9137bfa92a3a3cf8a5d /queue.c | |
parent | ed0363210c34002e5cfbab553506573f5b8a13a5 (diff) | |
download | rsyslog-e0df42e01467dfb70498821114b581c02184c70c.tar.gz rsyslog-e0df42e01467dfb70498821114b581c02184c70c.tar.xz rsyslog-e0df42e01467dfb70498821114b581c02184c70c.zip |
fixed sync issue on shutdown process if need to persist pure memory queue
to disk
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 40 |
1 files changed, 32 insertions, 8 deletions
@@ -123,7 +123,6 @@ queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_CREATED) { - dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx); qWrkrSetState(&pThis->pWrkThrds[iIdx], tCmd); } else { dbgprintf("Queue 0x%lx: command %d NOT sent to inactive thread %d\n", queueGetID(pThis), tCmd, iIdx); @@ -156,6 +155,34 @@ qWrkrConstructFinalize(qWrkThrd_t *pThis, queue_t *pQueue, int i) } +/* Waitis until the specified worker thread + * changed to full running state (aka have started up). This function + * MUST NOT be called while the queue mutex is locked as it does + * this itself. The wait is without timeout. + * rgerhards, 2008-01-17 + */ +static inline rsRetVal +qWrkrWaitStartup(qWrkThrd_t *pThis) +{ + int iCancelStateSave; + + assert(pThis != NULL); + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + pthread_mutex_lock(pThis->pQueue->mut); + if((pThis->tCurrCmd == eWRKTHRD_RUN_CREATED) || (pThis->tCurrCmd == eWRKTHRD_RUN_CREATED)) { + dbgprintf("Queue 0x%lx: waiting on worker thread %d startup\n", queueGetID(pThis->pQueue), + pThis->iThrd); + pthread_cond_wait(&pThis->condInitDone, pThis->pQueue->mut); +dbgprintf("startup done!\n"); + } + pthread_mutex_unlock(pThis->pQueue->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + + return RS_RET_OK; +} + + /* initialize the qWrkThrd_t structure - this MUST be called right after * startup of a worker thread. -- rgerhards, 2008-01-17 */ @@ -1188,19 +1215,14 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) struct timespec t; queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ -dbgprintf("WrkThrdTrm 0\n"); queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ /* race: must make sure all are running! */ -dbgprintf("WrkThrdTrm 1\n"); queueTimeoutComp(&t, iTimeout);/* get timeout */ -dbgprintf("WrkThrdTrm 2\n"); /* and wait for their termination */ pthread_mutex_lock(pThis->mut); bTimedOut = 0; -dbgprintf("WrkThrdTrm 3, thrds: %d\n", pThis->iCurNumWrkThrd); while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { -dbgprintf("WrkThrdTrm 4 to %d\n", bTimedOut); dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n", queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd); @@ -1236,11 +1258,12 @@ queueWrkThrdCancel(queue_t *pThis) queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ /* first tell the workers our request */ - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) + for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) { dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i); pthread_cancel(pThis->pWrkThrds[i].thrdID); } + } return iRet; } @@ -1843,8 +1866,9 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove queueGetID(pThis), pThis->iQueueSize); pThis->iLowWtrMrk = 0; /* disable low water mark algo */ queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* start DA queue in enqueue-only mode */ + qWrkrWaitStartup(QUEUE_PTR_DA_WORKER(pThis)); /* wait until DA worker has actually started */ pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; - queueShutdownWorkers(pThis); + queueShutdownWorkers(pThis); /* and tell it to shut down. The trick is it will run until q is drained */ } /* if running DA, terminate disk queue */ |