summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-20 10:25:02 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-20 10:25:02 +0200
commitef70e6174d4b373a601b73757ca19bb0f7dd6502 (patch)
tree50bc958008b9cd4c7faf838e8911acc0e4c089b5
parentb3978e7f7381c694a30a83c67c3fe2e1acc54207 (diff)
downloadrsyslog-ef70e6174d4b373a601b73757ca19bb0f7dd6502.tar.gz
rsyslog-ef70e6174d4b373a601b73757ca19bb0f7dd6502.tar.xz
rsyslog-ef70e6174d4b373a601b73757ca19bb0f7dd6502.zip
architecture change: queue now always has at least one worker thread
...if not running in direct mode. Previous versions could run without any active workers. This simplifies the code at a very small expense. See v5 compatibility note document for more in-depth discussion.
-rw-r--r--ChangeLog4
-rw-r--r--doc/v5compatibility.html11
-rw-r--r--runtime/queue.c17
-rw-r--r--runtime/wti.c14
-rw-r--r--runtime/wti.h2
-rw-r--r--runtime/wtp.c6
6 files changed, 35 insertions, 19 deletions
diff --git a/ChangeLog b/ChangeLog
index 8ddfa647..887aba00 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -5,6 +5,10 @@ Version 5.1.3 [DEVEL] (rgerhards), 2009-07-??
- bugfix: message could be truncated after TAG, often when forwarding
This was a result of an internal processing error if maximum field
sizes had been specified in the property replacer.
+- architecture change: queue now always has at least one worker thread
+ if not running in direct mode. Previous versions could run without
+ any active workers. This simplifies the code at a very small expense.
+ See v5 compatibility note document for more in-depth discussion.
---------------------------------------------------------------------------
Version 5.1.2 [DEVEL] (rgerhards), 2009-07-08
- bugfix: properties inputname, fromhost, fromhost-ip, msg were lost when
diff --git a/doc/v5compatibility.html b/doc/v5compatibility.html
index 24fcbd25..6d60062f 100644
--- a/doc/v5compatibility.html
+++ b/doc/v5compatibility.html
@@ -16,4 +16,15 @@ available. This processing was redundant and had a lot a drawbacks.
For details, please see the
<a href="v4compatibility.html">rsyslog v4 compatibility notes</a> which elaborate
on the reasons and the (few) things you may need to change.
+<h2>Queue Worker Thread Shutdown</h2>
+<p>Previous rsyslog versions had the capability to &quot;run&quot; on zero queue worker
+if no work was required. This was done to save a very limited number of resources. However,
+it came at the price of great complexity. In v5, we have decided to let a minium of one
+worker run all the time. The additional resource consumption is probably not noticable at
+all, however, this enabled us to do some important code cleanups, resulting in faster
+and more reliable code (complex code is hard to maintain and error-prone). From the
+regular user's point of view, this change should be barely noticable. I am including the
+note for expert users, who will notice it in rsyslog debug output and other analysis tools.
+So it is no error if each queue in non-direct mode now always runs at least one worker
+thread.
</body></html>
diff --git a/runtime/queue.c b/runtime/queue.c
index 9123a3f5..78859e8d 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -298,19 +298,6 @@ TurnOffDAMode(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->bRunsDA);
-
- /* if we need to pull any data that we still need from the (child) disk queue,
- * now would be the time to do so. At present, we do not need this, but I'd like to
- * keep that comment if future need arises.
- */
-
- /* we need to check if the DA queue is empty because the DA worker may simply have
- * terminated due to no new messages arriving. That does not, however, mean that the
- * DA queue is empty. If there is still data in that queue, we do nothing and leave
- * that for a later incarnation of this function (it will be called multiple times
- * during the lifetime of DA-mode, depending on how often the DA worker receives an
- * inactivity timeout. -- rgerhards, 2008-01-25
- */
if(getLogicalQueueSize(pThis->pqDA) == 0) {
pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
@@ -319,10 +306,6 @@ TurnOffDAMode(qqueue_t *pThis)
//XXX: TODO qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
iRet);
- } else {
- /* the queue has data again! */
- dbgprintf("DA queue has data during shutdown, restarting...\n");
- qqueueAdviseMaxWorkers(pThis->pqDA);
}
RETiRet;
diff --git a/runtime/wti.c b/runtime/wti.c
index b55ff69c..1d8f075f 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -86,6 +86,17 @@ wtiGetState(wti_t *pThis)
}
+/* Set this thread to "always running" state (can not be unset)
+ * rgerhards, 2009-07-20
+ */
+rsRetVal
+wtiSetAlwaysRunning(wti_t *pThis)
+{
+ ISOBJ_TYPE_assert(pThis, wti);
+ pThis->bAlwaysRunning = TRUE;
+ return RS_RET_OK;
+}
+
/* Set status (thread is running or not), actually an property of
* use for wtp, but we need to have it per thread instance (thus it
* is inside wti). -- rgerhards, 2009-07-17
@@ -202,7 +213,8 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
d_pthread_mutex_lock(pWtp->pmutUsr);
- if(pWtp->toWrkShutdown == -1) {
+RUNLOG_VAR("%d", pThis->bAlwaysRunning);
+ if(pThis->bAlwaysRunning) {
/* never shut down any started worker */
d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
} else {
diff --git a/runtime/wti.h b/runtime/wti.h
index 67d89a2f..cd408bde 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -34,6 +34,7 @@
struct wti_s {
BEGINobjInstance;
pthread_t thrdID; /* thread ID */
+ bool bAlwaysRunning; /* should this thread always run? */
bool bIsRunning; /* is this thread currently running? */
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
@@ -48,6 +49,7 @@ rsRetVal wtiDestruct(wti_t **ppThis);
rsRetVal wtiWorker(wti_t *pThis);
rsRetVal wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg);
rsRetVal wtiCancelThrd(wti_t *pThis);
+rsRetVal wtiSetAlwaysRunning(wti_t *pThis);
rsRetVal wtiSetState(wti_t *pThis, bool bNew);
bool wtiGetState(wti_t *pThis);
PROTOTYPEObjClassInit(wti);
diff --git a/runtime/wtp.c b/runtime/wtp.c
index e1ebcd4c..e8bc5120 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -381,7 +381,9 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
if(i == pThis->iNumWorkerThreads)
ABORT_FINALIZE(RS_RET_NO_MORE_THREADS);
- pThis->iCurNumWrkThrd++; /* we got one more! */
+ if(i == 0 || pThis->toWrkShutdown == -1) {
+ wtiSetAlwaysRunning(pThis->pWrkr[i]);
+ }
pWti = pThis->pWrkr[i];
wtiSetState(pWti, WRKTHRD_RUNNING);
@@ -389,6 +391,8 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti);
pthread_attr_destroy(&attr); /* TODO: we could globally reuse such an attribute 2009-07-08 */
+ pThis->iCurNumWrkThrd++; /* we got one more! */
+
dbgprintf("%s: started with state %d, num workers now %d\n",
wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd);