diff options
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 142 |
1 files changed, 111 insertions, 31 deletions
@@ -466,6 +466,72 @@ queueDel(queue_t *pThis, void *pUsr) } +/* Send a shutdown command to all workers and see if they terminate. + * A timeout may be specified. + * rgerhards, 2008-01-14 + */ +static rsRetVal +queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int iTimeout) +{ + DEFiRet; + int i; + int bTimedOut; + struct timespec t; + + /* first tell the workers our request */ + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) + if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED) + pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd; + + /* awake them... */ + pthread_cond_broadcast(pThis->notEmpty); + + /* and wait for their termination */ + clock_gettime(CLOCK_REALTIME, &t); /* set the timeout */ + t.tv_sec += iTimeout; /* TODO: can we just add to the seconds? - check */ + + pthread_mutex_lock(pThis->mut); + bTimedOut = 0; + while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { + dbgprintf("Queue 0x%lx: waiting on worker thread termination, %d still running\n", + (unsigned long) pThis, pThis->iCurNumWrkThrd); + + if(pthread_cond_timedwait(&pThis->condThrdTrm, pThis->mut, &t) != 0) { + dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", (unsigned long) pThis); + bTimedOut = 1; /* we exit the loop on timeout */ + } + } + pthread_mutex_unlock(pThis->mut); + + if(bTimedOut) + iRet = RS_RET_TIMED_OUT; + + return iRet; +} + + +/* Unconditionally cancel all running worker threads. + * rgerhards, 2008-01-14 + */ +static rsRetVal +queueWrkThrdCancel(queue_t *pThis) +{ + DEFiRet; + int i; + // TODO: we need to implement peek(), without it (today!) we lose one message upon + // worker cancellation! -- rgerhards, 2008-01-14 + + /* first tell the workers our request */ + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) + if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED) { + dbgprintf("Queue 0x%lx: canceling worker thread %d\n", (unsigned long) pThis, i); + pthread_cancel(pThis->pWrkThrds[i].thrdID); + } + + return iRet; +} + + /* Worker thread management function carried out when the main * worker is about to terminate. */ @@ -473,28 +539,39 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) { DEFiRet; int i; - qWrkCmd_t tShutdownCmd; assert(pThis != NULL); - /* select shutdown mode */ - tShutdownCmd = (pThis->bImmediateShutdown) ? eWRKTHRDCMD_SHUTDOWN_IMMEDIATE : eWRKTHRDCMD_SHUTDOWN; - - dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence, mode %d...\n", - (unsigned long) pThis, (int) tShutdownCmd); + dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis); - /* tell all workers to terminate */ - for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) - pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd; + iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, 2); // TODO: timeout configurable! + if(iRet == RS_RET_TIMED_OUT) { + /* OK, we now need to try force the shutdown */ + dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", (unsigned long) pThis); + iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 4); // TODO: timeout configurable! + } - /* awake them... */ - pthread_cond_broadcast(pThis->notEmpty); + if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */ + /* still didn't work out - so we now need to cancel the workers */ + dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis); + iRet = queueWrkThrdCancel(pThis); + } - /* and wait for their termination */ + /* finally join the threads + * In case of a cancellation, this may actually take some time. This is also + * needed to clean up the thread descriptors, even with a regular termination + */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { pthread_join(pThis->pWrkThrds[i].thrdID, NULL); } + /* as we may have cancelled a thread, clean up our internal structure. All are + * terminated now. For simplicity, we simply overwrite the states. + */ + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { + pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_TERMINATED; + } + dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", (unsigned long) pThis, pThis->iQueueSize); @@ -505,19 +582,6 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* Each queue has one associated worker (consumer) thread. It will pull * the message from the queue and pass it to a user-defined function. * This function was provided on construction. It MUST be thread-safe. - * - * There are two classes of worker threads, all implemented via the function - * below. The queue may start multiple workers. The first one carries out normal - * processing BUT also manages the other workers (the first one and all other - * ones are the two different classes). This is so that the queue can dynamically - * start and stop worker threads. So far, this dynamic mode is not yet supported, - * but we will need it at least for disk-assisted queue types. There, multiple - * workers are supported as long as the queue is running in memory, but only - * a single worker is supported if running in disk mode. To start and stop - * workers, we need to have one thread that is capable to wait. We could start - * up a specific management thread. However, this means additional overhead. So - * we have decided to use worker #0, which is always present, to carry out this - * management as an additional chore. -- rgerhards, 2008-01-10 */ static void * queueWorker(void *arg) @@ -541,6 +605,11 @@ queueWorker(void *arg) dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", (unsigned long) pThis, iMyThrdIndx); + /* tell the world there is one more worker */ + pthread_mutex_lock(pThis->mut); + pThis->iCurNumWrkThrd++; + pthread_mutex_unlock(pThis->mut); + /* now we have our identity, on to real processing */ while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) { @@ -600,9 +669,15 @@ queueWorker(void *arg) " %d messages to process.\n", (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize); } - dbgprintf("Queue 0x%lx/w%d: worker thread terminates with %d entries left in queue.\n", - (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize); - pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; /* indicate termination */ + /* indicate termination */ + pthread_mutex_lock(pThis->mut); + pThis->iCurNumWrkThrd--; + pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; + pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ + dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n", + (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd); + pthread_mutex_unlock(pThis->mut); + pthread_exit(0); } @@ -750,10 +825,18 @@ static rsRetVal queuePersist(queue_t *pThis) assert(pThis != NULL); + if(pThis->qType != QUEUETYPE_DISK) { + if(pThis->iQueueSize > 0) + ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */ + else + FINALIZE; /* if the queue is empty, we are happy and done... */ + } + dbgprintf("Queue 0x%lx: persisting queue to disk, %d entries...\n", queueGetID(pThis), pThis->iQueueSize); /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); + if(pThis->iQueueSize == 0) { if(pThis->bNeedDelQIF) { unlink((char*)pszQIFNam); @@ -764,9 +847,6 @@ static rsRetVal queuePersist(queue_t *pThis) FINALIZE; /* nothing left to do, so be happy */ } - if(pThis->qType != QUEUETYPE_DISK) - ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */ - CHKiRet(strmConstruct(&psQIF)); CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE)); |