summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-14 08:37:42 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-14 08:37:42 +0000
commitdd36718bd11c85af49546ab589fa42bf512075ce (patch)
tree64e028d492472f0a3fc7aace150bf36db2ea6da4
parent16f39256b6bdc3ce5dafc17a14b553841eef2120 (diff)
downloadrsyslog-dd36718bd11c85af49546ab589fa42bf512075ce.tar.gz
rsyslog-dd36718bd11c85af49546ab589fa42bf512075ce.tar.xz
rsyslog-dd36718bd11c85af49546ab589fa42bf512075ce.zip
worker shutdown sequence enhanced to try different ways to shut down and
terminate workers if none helps (this protects against badly written output plugins which hold the queue for too long)
-rw-r--r--obj.c1
-rw-r--r--queue.c142
-rw-r--r--queue.h2
-rw-r--r--rsyslog.h1
4 files changed, 114 insertions, 32 deletions
diff --git a/obj.c b/obj.c
index 3a9098a0..e28c2747 100644
--- a/obj.c
+++ b/obj.c
@@ -89,7 +89,6 @@ rsRetVal objInfoConstruct(objInfo_t **ppThis, objID_t objID, uchar *pszName, int
pThis->pszName = pszName;
pThis->iObjVers = iObjVers;
-fprintf(stderr, "objid %d set for %s\n", objID, pszName);
pThis->objID = objID;
pThis->objMethods[0] = pConstruct;
diff --git a/queue.c b/queue.c
index 31d2d711..07d2ccbf 100644
--- a/queue.c
+++ b/queue.c
@@ -466,6 +466,72 @@ queueDel(queue_t *pThis, void *pUsr)
}
+/* Send a shutdown command to all workers and see if they terminate.
+ * A timeout may be specified.
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int iTimeout)
+{
+ DEFiRet;
+ int i;
+ int bTimedOut;
+ struct timespec t;
+
+ /* first tell the workers our request */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
+ if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED)
+ pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd;
+
+ /* awake them... */
+ pthread_cond_broadcast(pThis->notEmpty);
+
+ /* and wait for their termination */
+ clock_gettime(CLOCK_REALTIME, &t); /* set the timeout */
+ t.tv_sec += iTimeout; /* TODO: can we just add to the seconds? - check */
+
+ pthread_mutex_lock(pThis->mut);
+ bTimedOut = 0;
+ while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
+ dbgprintf("Queue 0x%lx: waiting on worker thread termination, %d still running\n",
+ (unsigned long) pThis, pThis->iCurNumWrkThrd);
+
+ if(pthread_cond_timedwait(&pThis->condThrdTrm, pThis->mut, &t) != 0) {
+ dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", (unsigned long) pThis);
+ bTimedOut = 1; /* we exit the loop on timeout */
+ }
+ }
+ pthread_mutex_unlock(pThis->mut);
+
+ if(bTimedOut)
+ iRet = RS_RET_TIMED_OUT;
+
+ return iRet;
+}
+
+
+/* Unconditionally cancel all running worker threads.
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueWrkThrdCancel(queue_t *pThis)
+{
+ DEFiRet;
+ int i;
+ // TODO: we need to implement peek(), without it (today!) we lose one message upon
+ // worker cancellation! -- rgerhards, 2008-01-14
+
+ /* first tell the workers our request */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
+ if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED) {
+ dbgprintf("Queue 0x%lx: canceling worker thread %d\n", (unsigned long) pThis, i);
+ pthread_cancel(pThis->pWrkThrds[i].thrdID);
+ }
+
+ return iRet;
+}
+
+
/* Worker thread management function carried out when the main
* worker is about to terminate.
*/
@@ -473,28 +539,39 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
{
DEFiRet;
int i;
- qWrkCmd_t tShutdownCmd;
assert(pThis != NULL);
- /* select shutdown mode */
- tShutdownCmd = (pThis->bImmediateShutdown) ? eWRKTHRDCMD_SHUTDOWN_IMMEDIATE : eWRKTHRDCMD_SHUTDOWN;
-
- dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence, mode %d...\n",
- (unsigned long) pThis, (int) tShutdownCmd);
+ dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis);
- /* tell all workers to terminate */
- for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
- pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd;
+ iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, 2); // TODO: timeout configurable!
+ if(iRet == RS_RET_TIMED_OUT) {
+ /* OK, we now need to try force the shutdown */
+ dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", (unsigned long) pThis);
+ iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 4); // TODO: timeout configurable!
+ }
- /* awake them... */
- pthread_cond_broadcast(pThis->notEmpty);
+ if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */
+ /* still didn't work out - so we now need to cancel the workers */
+ dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis);
+ iRet = queueWrkThrdCancel(pThis);
+ }
- /* and wait for their termination */
+ /* finally join the threads
+ * In case of a cancellation, this may actually take some time. This is also
+ * needed to clean up the thread descriptors, even with a regular termination
+ */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
pthread_join(pThis->pWrkThrds[i].thrdID, NULL);
}
+ /* as we may have cancelled a thread, clean up our internal structure. All are
+ * terminated now. For simplicity, we simply overwrite the states.
+ */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_TERMINATED;
+ }
+
dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
(unsigned long) pThis, pThis->iQueueSize);
@@ -505,19 +582,6 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* Each queue has one associated worker (consumer) thread. It will pull
* the message from the queue and pass it to a user-defined function.
* This function was provided on construction. It MUST be thread-safe.
- *
- * There are two classes of worker threads, all implemented via the function
- * below. The queue may start multiple workers. The first one carries out normal
- * processing BUT also manages the other workers (the first one and all other
- * ones are the two different classes). This is so that the queue can dynamically
- * start and stop worker threads. So far, this dynamic mode is not yet supported,
- * but we will need it at least for disk-assisted queue types. There, multiple
- * workers are supported as long as the queue is running in memory, but only
- * a single worker is supported if running in disk mode. To start and stop
- * workers, we need to have one thread that is capable to wait. We could start
- * up a specific management thread. However, this means additional overhead. So
- * we have decided to use worker #0, which is always present, to carry out this
- * management as an additional chore. -- rgerhards, 2008-01-10
*/
static void *
queueWorker(void *arg)
@@ -541,6 +605,11 @@ queueWorker(void *arg)
dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", (unsigned long) pThis, iMyThrdIndx);
+ /* tell the world there is one more worker */
+ pthread_mutex_lock(pThis->mut);
+ pThis->iCurNumWrkThrd++;
+ pthread_mutex_unlock(pThis->mut);
+
/* now we have our identity, on to real processing */
while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN
|| (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) {
@@ -600,9 +669,15 @@ queueWorker(void *arg)
" %d messages to process.\n", (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize);
}
- dbgprintf("Queue 0x%lx/w%d: worker thread terminates with %d entries left in queue.\n",
- (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize);
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; /* indicate termination */
+ /* indicate termination */
+ pthread_mutex_lock(pThis->mut);
+ pThis->iCurNumWrkThrd--;
+ pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED;
+ pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
+ dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n",
+ (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd);
+ pthread_mutex_unlock(pThis->mut);
+
pthread_exit(0);
}
@@ -750,10 +825,18 @@ static rsRetVal queuePersist(queue_t *pThis)
assert(pThis != NULL);
+ if(pThis->qType != QUEUETYPE_DISK) {
+ if(pThis->iQueueSize > 0)
+ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */
+ else
+ FINALIZE; /* if the queue is empty, we are happy and done... */
+ }
+
dbgprintf("Queue 0x%lx: persisting queue to disk, %d entries...\n", queueGetID(pThis), pThis->iQueueSize);
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
(char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
+
if(pThis->iQueueSize == 0) {
if(pThis->bNeedDelQIF) {
unlink((char*)pszQIFNam);
@@ -764,9 +847,6 @@ static rsRetVal queuePersist(queue_t *pThis)
FINALIZE; /* nothing left to do, so be happy */
}
- if(pThis->qType != QUEUETYPE_DISK)
- ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */
-
CHKiRet(strmConstruct(&psQIF));
CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE));
diff --git a/queue.h b/queue.h
index e5b64ff8..ee38d725 100644
--- a/queue.h
+++ b/queue.h
@@ -79,6 +79,7 @@ typedef struct queue_s {
int iQueueSize; /* Current number of elements in the queue */
int iMaxQueueSize; /* how large can the queue grow? */
int iNumWorkerThreads;/* number of worker threads to use */
+ int iCurNumWrkThrd;/* current number of active worker threads */
qWrkThrd_t *pWrkThrds;/* array with control structure for the worker thread(s) associated with this queue */
int bImmediateShutdown;/* on shutdown, drain the queue --> 0 / do NOT drain the queue --> 1 */
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
@@ -94,6 +95,7 @@ typedef struct queue_s {
/* synchronization variables */
pthread_mutex_t *mut;
pthread_cond_t *notFull, *notEmpty;
+ pthread_cond_t condThrdTrm;/* signalled when threads terminate */
/* end sync variables */
/* the following variables are always present, because they
* are not only used for the "disk" queueing mode but also for
diff --git a/rsyslog.h b/rsyslog.h
index 1514cc3e..3e5c0397 100644
--- a/rsyslog.h
+++ b/rsyslog.h
@@ -109,6 +109,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_QTYPE_MISMATCH = -2038, /**< different qType when reading back a property type */
RS_RET_NO_FILE_ACCESS = -2039, /**< covers EACCES error on file open() */
RS_RET_FILE_NOT_FOUND = -2040, /**< file not found */
+ RS_RET_TIMED_OUT = -2041, /**< timeout occured (not necessarily an error) */
RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */
RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */
RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */