From ef70e6174d4b373a601b73757ca19bb0f7dd6502 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 20 Jul 2009 10:25:02 +0200 Subject: architecture change: queue now always has at least one worker thread ...if not running in direct mode. Previous versions could run without any active workers. This simplifies the code at a very small expense. See v5 compatibility note document for more in-depth discussion. --- ChangeLog | 4 ++++ doc/v5compatibility.html | 11 +++++++++++ runtime/queue.c | 17 ----------------- runtime/wti.c | 14 +++++++++++++- runtime/wti.h | 2 ++ runtime/wtp.c | 6 +++++- 6 files changed, 35 insertions(+), 19 deletions(-) diff --git a/ChangeLog b/ChangeLog index 8ddfa647..887aba00 100644 --- a/ChangeLog +++ b/ChangeLog @@ -5,6 +5,10 @@ Version 5.1.3 [DEVEL] (rgerhards), 2009-07-?? - bugfix: message could be truncated after TAG, often when forwarding This was a result of an internal processing error if maximum field sizes had been specified in the property replacer. +- architecture change: queue now always has at least one worker thread + if not running in direct mode. Previous versions could run without + any active workers. This simplifies the code at a very small expense. + See v5 compatibility note document for more in-depth discussion. --------------------------------------------------------------------------- Version 5.1.2 [DEVEL] (rgerhards), 2009-07-08 - bugfix: properties inputname, fromhost, fromhost-ip, msg were lost when diff --git a/doc/v5compatibility.html b/doc/v5compatibility.html index 24fcbd25..6d60062f 100644 --- a/doc/v5compatibility.html +++ b/doc/v5compatibility.html @@ -16,4 +16,15 @@ available. This processing was redundant and had a lot a drawbacks. For details, please see the rsyslog v4 compatibility notes which elaborate on the reasons and the (few) things you may need to change. +

Queue Worker Thread Shutdown

+

Previous rsyslog versions had the capability to "run" on zero queue worker +if no work was required. This was done to save a very limited number of resources. However, +it came at the price of great complexity. In v5, we have decided to let a minium of one +worker run all the time. The additional resource consumption is probably not noticable at +all, however, this enabled us to do some important code cleanups, resulting in faster +and more reliable code (complex code is hard to maintain and error-prone). From the +regular user's point of view, this change should be barely noticable. I am including the +note for expert users, who will notice it in rsyslog debug output and other analysis tools. +So it is no error if each queue in non-direct mode now always runs at least one worker +thread. diff --git a/runtime/queue.c b/runtime/queue.c index 9123a3f5..78859e8d 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -298,19 +298,6 @@ TurnOffDAMode(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->bRunsDA); - - /* if we need to pull any data that we still need from the (child) disk queue, - * now would be the time to do so. At present, we do not need this, but I'd like to - * keep that comment if future need arises. - */ - - /* we need to check if the DA queue is empty because the DA worker may simply have - * terminated due to no new messages arriving. That does not, however, mean that the - * DA queue is empty. If there is still data in that queue, we do nothing and leave - * that for a later incarnation of this function (it will be called multiple times - * during the lifetime of DA-mode, depending on how often the DA worker receives an - * inactivity timeout. -- rgerhards, 2008-01-25 - */ if(getLogicalQueueSize(pThis->pqDA) == 0) { pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, @@ -319,10 +306,6 @@ TurnOffDAMode(qqueue_t *pThis) //XXX: TODO qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n", iRet); - } else { - /* the queue has data again! */ - dbgprintf("DA queue has data during shutdown, restarting...\n"); - qqueueAdviseMaxWorkers(pThis->pqDA); } RETiRet; diff --git a/runtime/wti.c b/runtime/wti.c index b55ff69c..1d8f075f 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -86,6 +86,17 @@ wtiGetState(wti_t *pThis) } +/* Set this thread to "always running" state (can not be unset) + * rgerhards, 2009-07-20 + */ +rsRetVal +wtiSetAlwaysRunning(wti_t *pThis) +{ + ISOBJ_TYPE_assert(pThis, wti); + pThis->bAlwaysRunning = TRUE; + return RS_RET_OK; +} + /* Set status (thread is running or not), actually an property of * use for wtp, but we need to have it per thread instance (thus it * is inside wti). -- rgerhards, 2009-07-17 @@ -202,7 +213,8 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); d_pthread_mutex_lock(pWtp->pmutUsr); - if(pWtp->toWrkShutdown == -1) { +RUNLOG_VAR("%d", pThis->bAlwaysRunning); + if(pThis->bAlwaysRunning) { /* never shut down any started worker */ d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); } else { diff --git a/runtime/wti.h b/runtime/wti.h index 67d89a2f..cd408bde 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -34,6 +34,7 @@ struct wti_s { BEGINobjInstance; pthread_t thrdID; /* thread ID */ + bool bAlwaysRunning; /* should this thread always run? */ bool bIsRunning; /* is this thread currently running? */ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ @@ -48,6 +49,7 @@ rsRetVal wtiDestruct(wti_t **ppThis); rsRetVal wtiWorker(wti_t *pThis); rsRetVal wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg); rsRetVal wtiCancelThrd(wti_t *pThis); +rsRetVal wtiSetAlwaysRunning(wti_t *pThis); rsRetVal wtiSetState(wti_t *pThis, bool bNew); bool wtiGetState(wti_t *pThis); PROTOTYPEObjClassInit(wti); diff --git a/runtime/wtp.c b/runtime/wtp.c index e1ebcd4c..e8bc5120 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -381,7 +381,9 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) if(i == pThis->iNumWorkerThreads) ABORT_FINALIZE(RS_RET_NO_MORE_THREADS); - pThis->iCurNumWrkThrd++; /* we got one more! */ + if(i == 0 || pThis->toWrkShutdown == -1) { + wtiSetAlwaysRunning(pThis->pWrkr[i]); + } pWti = pThis->pWrkr[i]; wtiSetState(pWti, WRKTHRD_RUNNING); @@ -389,6 +391,8 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti); pthread_attr_destroy(&attr); /* TODO: we could globally reuse such an attribute 2009-07-08 */ + pThis->iCurNumWrkThrd++; /* we got one more! */ + dbgprintf("%s: started with state %d, num workers now %d\n", wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd); -- cgit