summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c142
1 files changed, 111 insertions, 31 deletions
diff --git a/queue.c b/queue.c
index 31d2d711..07d2ccbf 100644
--- a/queue.c
+++ b/queue.c
@@ -466,6 +466,72 @@ queueDel(queue_t *pThis, void *pUsr)
}
+/* Send a shutdown command to all workers and see if they terminate.
+ * A timeout may be specified.
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int iTimeout)
+{
+ DEFiRet;
+ int i;
+ int bTimedOut;
+ struct timespec t;
+
+ /* first tell the workers our request */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
+ if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED)
+ pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd;
+
+ /* awake them... */
+ pthread_cond_broadcast(pThis->notEmpty);
+
+ /* and wait for their termination */
+ clock_gettime(CLOCK_REALTIME, &t); /* set the timeout */
+ t.tv_sec += iTimeout; /* TODO: can we just add to the seconds? - check */
+
+ pthread_mutex_lock(pThis->mut);
+ bTimedOut = 0;
+ while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
+ dbgprintf("Queue 0x%lx: waiting on worker thread termination, %d still running\n",
+ (unsigned long) pThis, pThis->iCurNumWrkThrd);
+
+ if(pthread_cond_timedwait(&pThis->condThrdTrm, pThis->mut, &t) != 0) {
+ dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", (unsigned long) pThis);
+ bTimedOut = 1; /* we exit the loop on timeout */
+ }
+ }
+ pthread_mutex_unlock(pThis->mut);
+
+ if(bTimedOut)
+ iRet = RS_RET_TIMED_OUT;
+
+ return iRet;
+}
+
+
+/* Unconditionally cancel all running worker threads.
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueWrkThrdCancel(queue_t *pThis)
+{
+ DEFiRet;
+ int i;
+ // TODO: we need to implement peek(), without it (today!) we lose one message upon
+ // worker cancellation! -- rgerhards, 2008-01-14
+
+ /* first tell the workers our request */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
+ if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED) {
+ dbgprintf("Queue 0x%lx: canceling worker thread %d\n", (unsigned long) pThis, i);
+ pthread_cancel(pThis->pWrkThrds[i].thrdID);
+ }
+
+ return iRet;
+}
+
+
/* Worker thread management function carried out when the main
* worker is about to terminate.
*/
@@ -473,28 +539,39 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
{
DEFiRet;
int i;
- qWrkCmd_t tShutdownCmd;
assert(pThis != NULL);
- /* select shutdown mode */
- tShutdownCmd = (pThis->bImmediateShutdown) ? eWRKTHRDCMD_SHUTDOWN_IMMEDIATE : eWRKTHRDCMD_SHUTDOWN;
-
- dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence, mode %d...\n",
- (unsigned long) pThis, (int) tShutdownCmd);
+ dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis);
- /* tell all workers to terminate */
- for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
- pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd;
+ iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, 2); // TODO: timeout configurable!
+ if(iRet == RS_RET_TIMED_OUT) {
+ /* OK, we now need to try force the shutdown */
+ dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", (unsigned long) pThis);
+ iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 4); // TODO: timeout configurable!
+ }
- /* awake them... */
- pthread_cond_broadcast(pThis->notEmpty);
+ if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */
+ /* still didn't work out - so we now need to cancel the workers */
+ dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis);
+ iRet = queueWrkThrdCancel(pThis);
+ }
- /* and wait for their termination */
+ /* finally join the threads
+ * In case of a cancellation, this may actually take some time. This is also
+ * needed to clean up the thread descriptors, even with a regular termination
+ */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
pthread_join(pThis->pWrkThrds[i].thrdID, NULL);
}
+ /* as we may have cancelled a thread, clean up our internal structure. All are
+ * terminated now. For simplicity, we simply overwrite the states.
+ */
+ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_TERMINATED;
+ }
+
dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
(unsigned long) pThis, pThis->iQueueSize);
@@ -505,19 +582,6 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* Each queue has one associated worker (consumer) thread. It will pull
* the message from the queue and pass it to a user-defined function.
* This function was provided on construction. It MUST be thread-safe.
- *
- * There are two classes of worker threads, all implemented via the function
- * below. The queue may start multiple workers. The first one carries out normal
- * processing BUT also manages the other workers (the first one and all other
- * ones are the two different classes). This is so that the queue can dynamically
- * start and stop worker threads. So far, this dynamic mode is not yet supported,
- * but we will need it at least for disk-assisted queue types. There, multiple
- * workers are supported as long as the queue is running in memory, but only
- * a single worker is supported if running in disk mode. To start and stop
- * workers, we need to have one thread that is capable to wait. We could start
- * up a specific management thread. However, this means additional overhead. So
- * we have decided to use worker #0, which is always present, to carry out this
- * management as an additional chore. -- rgerhards, 2008-01-10
*/
static void *
queueWorker(void *arg)
@@ -541,6 +605,11 @@ queueWorker(void *arg)
dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", (unsigned long) pThis, iMyThrdIndx);
+ /* tell the world there is one more worker */
+ pthread_mutex_lock(pThis->mut);
+ pThis->iCurNumWrkThrd++;
+ pthread_mutex_unlock(pThis->mut);
+
/* now we have our identity, on to real processing */
while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN
|| (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) {
@@ -600,9 +669,15 @@ queueWorker(void *arg)
" %d messages to process.\n", (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize);
}
- dbgprintf("Queue 0x%lx/w%d: worker thread terminates with %d entries left in queue.\n",
- (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize);
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; /* indicate termination */
+ /* indicate termination */
+ pthread_mutex_lock(pThis->mut);
+ pThis->iCurNumWrkThrd--;
+ pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED;
+ pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
+ dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n",
+ (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd);
+ pthread_mutex_unlock(pThis->mut);
+
pthread_exit(0);
}
@@ -750,10 +825,18 @@ static rsRetVal queuePersist(queue_t *pThis)
assert(pThis != NULL);
+ if(pThis->qType != QUEUETYPE_DISK) {
+ if(pThis->iQueueSize > 0)
+ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */
+ else
+ FINALIZE; /* if the queue is empty, we are happy and done... */
+ }
+
dbgprintf("Queue 0x%lx: persisting queue to disk, %d entries...\n", queueGetID(pThis), pThis->iQueueSize);
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
(char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
+
if(pThis->iQueueSize == 0) {
if(pThis->bNeedDelQIF) {
unlink((char*)pszQIFNam);
@@ -764,9 +847,6 @@ static rsRetVal queuePersist(queue_t *pThis)
FINALIZE; /* nothing left to do, so be happy */
}
- if(pThis->qType != QUEUETYPE_DISK)
- ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */
-
CHKiRet(strmConstruct(&psQIF));
CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE));