summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--obj.c1
-rw-r--r--queue.c142
-rw-r--r--queue.h2
-rw-r--r--rsyslog.h1
4 files changed, 114 insertions, 32 deletions
diff --git a/obj.c b/obj.c
index 3a9098a0..e28c2747 100644
--- a/obj.c
+++ b/obj.c
@@ -89,7 +89,6 @@ rsRetVal objInfoConstruct(objInfo_t **ppThis, objID_t objID, uchar *pszName, int
pThis->pszName = pszName;
pThis->iObjVers = iObjVers;
-fprintf(stderr, "objid %d set for %s\n", objID, pszName);
pThis->objID = objID;
pThis->objMethods[0] = pConstruct;
diff --git a/queue.c b/queue.c
index 31d2d711..07d2ccbf 100644
--- a/queue.c
+++ b/queue.c
@@ -466,6 +466,72 @@ queueDel(queue_t *pThis, void *pUsr)
}
+/* Send a shutdown command to all workers and see if they terminate.
+ * A timeout may be specified.
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int iTimeout)
+{
+ DEFiRet;
+ int i;
+ int bTimedOut;
+ struct timespec t;
+
+ /* first tell the workers our request */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
+ if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED)
+ pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd;
+
+ /* awake them... */
+ pthread_cond_broadcast(pThis->notEmpty);
+
+ /* and wait for their termination */
+ clock_gettime(CLOCK_REALTIME, &t); /* set the timeout */
+ t.tv_sec += iTimeout; /* TODO: can we just add to the seconds? - check */
+
+ pthread_mutex_lock(pThis->mut);
+ bTimedOut = 0;
+ while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
+ dbgprintf("Queue 0x%lx: waiting on worker thread termination, %d still running\n",
+ (unsigned long) pThis, pThis->iCurNumWrkThrd);
+
+ if(pthread_cond_timedwait(&pThis->condThrdTrm, pThis->mut, &t) != 0) {
+ dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", (unsigned long) pThis);
+ bTimedOut = 1; /* we exit the loop on timeout */
+ }
+ }
+ pthread_mutex_unlock(pThis->mut);
+
+ if(bTimedOut)
+ iRet = RS_RET_TIMED_OUT;
+
+ return iRet;
+}
+
+
+/* Unconditionally cancel all running worker threads.
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueWrkThrdCancel(queue_t *pThis)
+{
+ DEFiRet;
+ int i;
+ // TODO: we need to implement peek(), without it (today!) we lose one message upon
+ // worker cancellation! -- rgerhards, 2008-01-14
+
+ /* first tell the workers our request */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
+ if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED) {
+ dbgprintf("Queue 0x%lx: canceling worker thread %d\n", (unsigned long) pThis, i);
+ pthread_cancel(pThis->pWrkThrds[i].thrdID);
+ }
+
+ return iRet;
+}
+
+
/* Worker thread management function carried out when the main
* worker is about to terminate.
*/
@@ -473,28 +539,39 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
{
DEFiRet;
int i;
- qWrkCmd_t tShutdownCmd;
assert(pThis != NULL);
- /* select shutdown mode */
- tShutdownCmd = (pThis->bImmediateShutdown) ? eWRKTHRDCMD_SHUTDOWN_IMMEDIATE : eWRKTHRDCMD_SHUTDOWN;
-
- dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence, mode %d...\n",
- (unsigned long) pThis, (int) tShutdownCmd);
+ dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis);
- /* tell all workers to terminate */
- for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
- pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd;
+ iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, 2); // TODO: timeout configurable!
+ if(iRet == RS_RET_TIMED_OUT) {
+ /* OK, we now need to try force the shutdown */
+ dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", (unsigned long) pThis);
+ iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 4); // TODO: timeout configurable!
+ }
- /* awake them... */
- pthread_cond_broadcast(pThis->notEmpty);
+ if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */
+ /* still didn't work out - so we now need to cancel the workers */
+ dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis);
+ iRet = queueWrkThrdCancel(pThis);
+ }
- /* and wait for their termination */
+ /* finally join the threads
+ * In case of a cancellation, this may actually take some time. This is also
+ * needed to clean up the thread descriptors, even with a regular termination
+ */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
pthread_join(pThis->pWrkThrds[i].thrdID, NULL);
}
+ /* as we may have cancelled a thread, clean up our internal structure. All are
+ * terminated now. For simplicity, we simply overwrite the states.
+ */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_TERMINATED;
+ }
+
dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
(unsigned long) pThis, pThis->iQueueSize);
@@ -505,19 +582,6 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* Each queue has one associated worker (consumer) thread. It will pull
* the message from the queue and pass it to a user-defined function.
* This function was provided on construction. It MUST be thread-safe.
- *
- * There are two classes of worker threads, all implemented via the function
- * below. The queue may start multiple workers. The first one carries out normal
- * processing BUT also manages the other workers (the first one and all other
- * ones are the two different classes). This is so that the queue can dynamically
- * start and stop worker threads. So far, this dynamic mode is not yet supported,
- * but we will need it at least for disk-assisted queue types. There, multiple
- * workers are supported as long as the queue is running in memory, but only
- * a single worker is supported if running in disk mode. To start and stop
- * workers, we need to have one thread that is capable to wait. We could start
- * up a specific management thread. However, this means additional overhead. So
- * we have decided to use worker #0, which is always present, to carry out this
- * management as an additional chore. -- rgerhards, 2008-01-10
*/
static void *
queueWorker(void *arg)
@@ -541,6 +605,11 @@ queueWorker(void *arg)
dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", (unsigned long) pThis, iMyThrdIndx);
+ /* tell the world there is one more worker */
+ pthread_mutex_lock(pThis->mut);
+ pThis->iCurNumWrkThrd++;
+ pthread_mutex_unlock(pThis->mut);
+
/* now we have our identity, on to real processing */
while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN
|| (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) {
@@ -600,9 +669,15 @@ queueWorker(void *arg)
" %d messages to process.\n", (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize);
}
- dbgprintf("Queue 0x%lx/w%d: worker thread terminates with %d entries left in queue.\n",
- (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize);
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; /* indicate termination */
+ /* indicate termination */
+ pthread_mutex_lock(pThis->mut);
+ pThis->iCurNumWrkThrd--;
+ pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED;
+ pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
+ dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n",
+ (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd);
+ pthread_mutex_unlock(pThis->mut);
+
pthread_exit(0);
}
@@ -750,10 +825,18 @@ static rsRetVal queuePersist(queue_t *pThis)
assert(pThis != NULL);
+ if(pThis->qType != QUEUETYPE_DISK) {
+ if(pThis->iQueueSize > 0)
+ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */
+ else
+ FINALIZE; /* if the queue is empty, we are happy and done... */
+ }
+
dbgprintf("Queue 0x%lx: persisting queue to disk, %d entries...\n", queueGetID(pThis), pThis->iQueueSize);
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
(char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
+
if(pThis->iQueueSize == 0) {
if(pThis->bNeedDelQIF) {
unlink((char*)pszQIFNam);
@@ -764,9 +847,6 @@ static rsRetVal queuePersist(queue_t *pThis)
FINALIZE; /* nothing left to do, so be happy */
}
- if(pThis->qType != QUEUETYPE_DISK)
- ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */
-
CHKiRet(strmConstruct(&psQIF));
CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE));
diff --git a/queue.h b/queue.h
index e5b64ff8..ee38d725 100644
--- a/queue.h
+++ b/queue.h
@@ -79,6 +79,7 @@ typedef struct queue_s {
int iQueueSize; /* Current number of elements in the queue */
int iMaxQueueSize; /* how large can the queue grow? */
int iNumWorkerThreads;/* number of worker threads to use */
+ int iCurNumWrkThrd;/* current number of active worker threads */
qWrkThrd_t *pWrkThrds;/* array with control structure for the worker thread(s) associated with this queue */
int bImmediateShutdown;/* on shutdown, drain the queue --> 0 / do NOT drain the queue --> 1 */
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
@@ -94,6 +95,7 @@ typedef struct queue_s {
/* synchronization variables */
pthread_mutex_t *mut;
pthread_cond_t *notFull, *notEmpty;
+ pthread_cond_t condThrdTrm;/* signalled when threads terminate */
/* end sync variables */
/* the following variables are always present, because they
* are not only used for the "disk" queueing mode but also for
diff --git a/rsyslog.h b/rsyslog.h
index 1514cc3e..3e5c0397 100644
--- a/rsyslog.h
+++ b/rsyslog.h
@@ -109,6 +109,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_QTYPE_MISMATCH = -2038, /**< different qType when reading back a property type */
RS_RET_NO_FILE_ACCESS = -2039, /**< covers EACCES error on file open() */
RS_RET_FILE_NOT_FOUND = -2040, /**< file not found */
+ RS_RET_TIMED_OUT = -2041, /**< timeout occured (not necessarily an error) */
RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */
RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */
RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */