summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-14 08:37:42 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-14 08:37:42 +0000
commitdd36718bd11c85af49546ab589fa42bf512075ce (patch)
tree64e028d492472f0a3fc7aace150bf36db2ea6da4 /queue.c
parent16f39256b6bdc3ce5dafc17a14b553841eef2120 (diff)
downloadrsyslog-dd36718bd11c85af49546ab589fa42bf512075ce.tar.gz
rsyslog-dd36718bd11c85af49546ab589fa42bf512075ce.tar.xz
rsyslog-dd36718bd11c85af49546ab589fa42bf512075ce.zip
worker shutdown sequence enhanced to try different ways to shut down and
terminate workers if none helps (this protects against badly written output plugins which hold the queue for too long)
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c142
1 files changed, 111 insertions, 31 deletions
diff --git a/queue.c b/queue.c
index 31d2d711..07d2ccbf 100644
--- a/queue.c
+++ b/queue.c
@@ -466,6 +466,72 @@ queueDel(queue_t *pThis, void *pUsr)
}
+/* Send a shutdown command to all workers and see if they terminate.
+ * A timeout may be specified.
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int iTimeout)
+{
+ DEFiRet;
+ int i;
+ int bTimedOut;
+ struct timespec t;
+
+ /* first tell the workers our request */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
+ if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED)
+ pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd;
+
+ /* awake them... */
+ pthread_cond_broadcast(pThis->notEmpty);
+
+ /* and wait for their termination */
+ clock_gettime(CLOCK_REALTIME, &t); /* set the timeout */
+ t.tv_sec += iTimeout; /* TODO: can we just add to the seconds? - check */
+
+ pthread_mutex_lock(pThis->mut);
+ bTimedOut = 0;
+ while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
+ dbgprintf("Queue 0x%lx: waiting on worker thread termination, %d still running\n",
+ (unsigned long) pThis, pThis->iCurNumWrkThrd);
+
+ if(pthread_cond_timedwait(&pThis->condThrdTrm, pThis->mut, &t) != 0) {
+ dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", (unsigned long) pThis);
+ bTimedOut = 1; /* we exit the loop on timeout */
+ }
+ }
+ pthread_mutex_unlock(pThis->mut);
+
+ if(bTimedOut)
+ iRet = RS_RET_TIMED_OUT;
+
+ return iRet;
+}
+
+
+/* Unconditionally cancel all running worker threads.
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueWrkThrdCancel(queue_t *pThis)
+{
+ DEFiRet;
+ int i;
+ // TODO: we need to implement peek(), without it (today!) we lose one message upon
+ // worker cancellation! -- rgerhards, 2008-01-14
+
+ /* first tell the workers our request */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
+ if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED) {
+ dbgprintf("Queue 0x%lx: canceling worker thread %d\n", (unsigned long) pThis, i);
+ pthread_cancel(pThis->pWrkThrds[i].thrdID);
+ }
+
+ return iRet;
+}
+
+
/* Worker thread management function carried out when the main
* worker is about to terminate.
*/
@@ -473,28 +539,39 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
{
DEFiRet;
int i;
- qWrkCmd_t tShutdownCmd;
assert(pThis != NULL);
- /* select shutdown mode */
- tShutdownCmd = (pThis->bImmediateShutdown) ? eWRKTHRDCMD_SHUTDOWN_IMMEDIATE : eWRKTHRDCMD_SHUTDOWN;
-
- dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence, mode %d...\n",
- (unsigned long) pThis, (int) tShutdownCmd);
+ dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis);
- /* tell all workers to terminate */
- for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
- pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd;
+ iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, 2); // TODO: timeout configurable!
+ if(iRet == RS_RET_TIMED_OUT) {
+ /* OK, we now need to try force the shutdown */
+ dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", (unsigned long) pThis);
+ iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 4); // TODO: timeout configurable!
+ }
- /* awake them... */
- pthread_cond_broadcast(pThis->notEmpty);
+ if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */
+ /* still didn't work out - so we now need to cancel the workers */
+ dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis);
+ iRet = queueWrkThrdCancel(pThis);
+ }
- /* and wait for their termination */
+ /* finally join the threads
+ * In case of a cancellation, this may actually take some time. This is also
+ * needed to clean up the thread descriptors, even with a regular termination
+ */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
pthread_join(pThis->pWrkThrds[i].thrdID, NULL);
}
+ /* as we may have cancelled a thread, clean up our internal structure. All are
+ * terminated now. For simplicity, we simply overwrite the states.
+ */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_TERMINATED;
+ }
+
dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
(unsigned long) pThis, pThis->iQueueSize);
@@ -505,19 +582,6 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* Each queue has one associated worker (consumer) thread. It will pull
* the message from the queue and pass it to a user-defined function.
* This function was provided on construction. It MUST be thread-safe.
- *
- * There are two classes of worker threads, all implemented via the function
- * below. The queue may start multiple workers. The first one carries out normal
- * processing BUT also manages the other workers (the first one and all other
- * ones are the two different classes). This is so that the queue can dynamically
- * start and stop worker threads. So far, this dynamic mode is not yet supported,
- * but we will need it at least for disk-assisted queue types. There, multiple
- * workers are supported as long as the queue is running in memory, but only
- * a single worker is supported if running in disk mode. To start and stop
- * workers, we need to have one thread that is capable to wait. We could start
- * up a specific management thread. However, this means additional overhead. So
- * we have decided to use worker #0, which is always present, to carry out this
- * management as an additional chore. -- rgerhards, 2008-01-10
*/
static void *
queueWorker(void *arg)
@@ -541,6 +605,11 @@ queueWorker(void *arg)
dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", (unsigned long) pThis, iMyThrdIndx);
+ /* tell the world there is one more worker */
+ pthread_mutex_lock(pThis->mut);
+ pThis->iCurNumWrkThrd++;
+ pthread_mutex_unlock(pThis->mut);
+
/* now we have our identity, on to real processing */
while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN
|| (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) {
@@ -600,9 +669,15 @@ queueWorker(void *arg)
" %d messages to process.\n", (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize);
}
- dbgprintf("Queue 0x%lx/w%d: worker thread terminates with %d entries left in queue.\n",
- (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize);
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; /* indicate termination */
+ /* indicate termination */
+ pthread_mutex_lock(pThis->mut);
+ pThis->iCurNumWrkThrd--;
+ pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED;
+ pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
+ dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n",
+ (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd);
+ pthread_mutex_unlock(pThis->mut);
+
pthread_exit(0);
}
@@ -750,10 +825,18 @@ static rsRetVal queuePersist(queue_t *pThis)
assert(pThis != NULL);
+ if(pThis->qType != QUEUETYPE_DISK) {
+ if(pThis->iQueueSize > 0)
+ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */
+ else
+ FINALIZE; /* if the queue is empty, we are happy and done... */
+ }
+
dbgprintf("Queue 0x%lx: persisting queue to disk, %d entries...\n", queueGetID(pThis), pThis->iQueueSize);
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
(char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
+
if(pThis->iQueueSize == 0) {
if(pThis->bNeedDelQIF) {
unlink((char*)pszQIFNam);
@@ -764,9 +847,6 @@ static rsRetVal queuePersist(queue_t *pThis)
FINALIZE; /* nothing left to do, so be happy */
}
- if(pThis->qType != QUEUETYPE_DISK)
- ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */
-
CHKiRet(strmConstruct(&psQIF));
CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE));