diff options
-rw-r--r-- | runtime/queue.c | 46 | ||||
-rw-r--r-- | runtime/wtp.c | 24 | ||||
-rw-r--r-- | runtime/wtp.h | 1 | ||||
-rwxr-xr-x | tests/queue-persist.sh | 1 |
4 files changed, 28 insertions, 44 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index a2bb4c1d..0ef0174e 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -59,7 +59,6 @@ #ifdef OS_SOLARIS # include <sched.h> -# define pthread_yield() sched_yield() #endif /* static data */ @@ -1277,33 +1276,30 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ timeoutComp(&tTimeout, pThis->toActShutdown); - d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */ - if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { - d_pthread_mutex_unlock(pThis->mut); - dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n"); - iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); - if(iRetLocal == RS_RET_TIMED_OUT) { - dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and " - "triggers cancellation)\n"); - } else if(iRetLocal != RS_RET_OK) { - dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue " - "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); - } - /* we need to re-aquire the mutex for the next check in this case! */ - d_pthread_mutex_lock(pThis->mut); + dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers (if any)\n"); + iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and " + "triggers cancellation)\n"); + } else if(iRetLocal != RS_RET_OK) { + dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue " + "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } - if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pqDA->pWtpReg, LOCK_MUTEX) > 0) { - /* and now the same for the DA queue */ + d_pthread_mutex_lock(pThis->mut); + if(pThis->bRunsDA) { d_pthread_mutex_unlock(pThis->mut); - dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n"); - iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); - if(iRetLocal == RS_RET_TIMED_OUT) { - dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable and " - "triggers cancellation)\n"); - } else if(iRetLocal != RS_RET_OK) { - dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue " - "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); + if(wtpGetCurNumWrkr(pThis->pqDA->pWtpReg, LOCK_MUTEX) > 0) { + /* and now the same for the DA queue */ + dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n"); + iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable " + "and triggers cancellation)\n"); + } else if(iRetLocal != RS_RET_OK) { + dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA " + "queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); + } } } else { d_pthread_mutex_unlock(pThis->mut); diff --git a/runtime/wtp.c b/runtime/wtp.c index 46b5f4bb..4d4d0f0e 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -157,20 +157,6 @@ CODESTARTobjDestruct(wtp) ENDobjDestruct(wtp) -/* wake up at least one worker thread. - * rgerhards, 2008-01-20 - */ -rsRetVal -wtpWakeupWrkr(wtp_t *pThis) -{ - DEFiRet; - - /* TODO; mutex? I think not needed, as we do not need predictable exec order -- rgerhards, 2008-01-28 */ - ISOBJ_TYPE_assert(pThis, wtp); - pthread_cond_signal(pThis->pcondBusy); - RETiRet; -} - /* wake up all worker threads. * rgerhards, 2008-01-16 */ @@ -239,7 +225,9 @@ finalize_it: #pragma GCC diagnostic ignored "-Wempty-body" /* Send a shutdown command to all workers and see if they terminate. - * A timeout may be specified. + * A timeout may be specified. This function may also be called with + * the current number of workers being 0, in which case it does not + * shut down any worker. * rgerhards, 2008-01-14 */ rsRetVal @@ -383,8 +371,6 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mutWtp, bLockMutex); - pThis->iCurNumWrkThrd++; - /* find free spot in thread table. */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { if(wtiGetState(pThis->pWrkr[i]) == WRKTHRD_STOPPED) { @@ -395,6 +381,8 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) if(i == pThis->iNumWorkerThreads) ABORT_FINALIZE(RS_RET_NO_MORE_THREADS); + pThis->iCurNumWrkThrd++; /* we got one more! */ + pWti = pThis->pWrkr[i]; wtiSetState(pWti, WRKTHRD_RUNNING); pthread_attr_init(&attr); @@ -445,7 +433,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) } } else { if(nMaxWrkr > 0) { - wtpWakeupWrkr(pThis); + pthread_cond_signal(pThis->pcondBusy); } } diff --git a/runtime/wtp.h b/runtime/wtp.h index e2dd9409..88683ea2 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -82,7 +82,6 @@ rsRetVal wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr); rsRetVal wtpProcessThrdChanges(wtp_t *pThis); rsRetVal wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex); rsRetVal wtpSetState(wtp_t *pThis, wtpState_t iNewState); -rsRetVal wtpWakeupWrkr(wtp_t *pThis); rsRetVal wtpWakeupAllWrkr(wtp_t *pThis); rsRetVal wtpCancelAll(wtp_t *pThis); rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg); diff --git a/tests/queue-persist.sh b/tests/queue-persist.sh index 999655b1..e05b3da3 100755 --- a/tests/queue-persist.sh +++ b/tests/queue-persist.sh @@ -2,6 +2,7 @@ # to carry out multiple tests with different queue modes # added 2009-05-27 by Rgerhards # This file is part of the rsyslog project, released under GPLv3 +echo TEST: queue-persist.sh source $srcdir/queue-persist-drvr.sh LinkedList source $srcdir/queue-persist-drvr.sh FixedArray # the disk test should not fail, however, the config is extreme and using |