From 77a338e180fd51811041363f615760a14a2dc889 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 14 Jan 2008 11:55:24 +0000 Subject: - implemented $MainMsgQueueTimeoutActionCompletion config directive - implemented $MainMsgQueueTimeoutEnqueue config directive - implemented $MainMsgQueueTimeoutShutdown config directive - some cleanup --- queue.c | 124 ++++++++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 82 insertions(+), 42 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 07d2ccbf..1b90151b 100644 --- a/queue.c +++ b/queue.c @@ -53,6 +53,25 @@ rsRetVal queueChkPersist(queue_t *pThis); /* methods */ +/* compute an absolute time timeout suitable for calls to pthread_cond_timedwait() + * rgerhards, 2008-01-14 + */ +static rsRetVal +queueTimeoutComp(struct timespec *pt, int iTimeout) +{ + assert(pt != NULL); + /* compute timeout */ + clock_gettime(CLOCK_REALTIME, pt); + pt->tv_nsec += (iTimeout % 1000) * 1000000; /* think INTEGER arithmetic! */ + if(pt->tv_nsec > 999999999) { /* overrun? */ + pt->tv_nsec -= 1000000000; + ++pt->tv_sec; + } + pt->tv_sec += iTimeout / 1000; + return RS_RET_OK; /* so far, this is static... */ +} + + /* first, we define type-specific handlers. The provide a generic functionality, * but for this specific type of queue. The mapping to these handlers happens during * queue construction. Later on, handlers are called by pointers present in the @@ -206,7 +225,7 @@ queueTryLoadPersistedInfo(queue_t *pThis) strm_t *psQIF = NULL; uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; - AsPropBagstruct stat stat_buf; + struct stat stat_buf; } #endif @@ -406,7 +425,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr) iRetLocal = pThis->pConsumer(pUsr); if(iRetLocal != RS_RET_OK) dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n", - (unsigned long) pThis, iRetLocal); + queueGetID(pThis), iRetLocal); --pThis->iQueueSize; /* this is kind of a hack, but its the smartest thing we can do given * the somewhat astonishing fact that this queue type does not actually * queue anything ;) @@ -436,7 +455,7 @@ queueAdd(queue_t *pThis, void *pUsr) ++pThis->iQueueSize; - dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", (unsigned long) pThis, pThis->iQueueSize); + dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", queueGetID(pThis), pThis->iQueueSize); finalize_it: return iRet; @@ -460,7 +479,7 @@ queueDel(queue_t *pThis, void *pUsr) --pThis->iQueueSize; dbgprintf("Queue 0x%lx: entry deleted, state %d, size now %d entries\n", - (unsigned long) pThis, iRet, pThis->iQueueSize); + queueGetID(pThis), iRet, pThis->iQueueSize); return iRet; } @@ -471,7 +490,7 @@ queueDel(queue_t *pThis, void *pUsr) * rgerhards, 2008-01-14 */ static rsRetVal -queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int iTimeout) +queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) { DEFiRet; int i; @@ -486,18 +505,18 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int iTimeout) /* awake them... */ pthread_cond_broadcast(pThis->notEmpty); - /* and wait for their termination */ - clock_gettime(CLOCK_REALTIME, &t); /* set the timeout */ - t.tv_sec += iTimeout; /* TODO: can we just add to the seconds? - check */ + /* get timeout */ + queueTimeoutComp(&t, iTimeout); + /* and wait for their termination */ pthread_mutex_lock(pThis->mut); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { - dbgprintf("Queue 0x%lx: waiting on worker thread termination, %d still running\n", - (unsigned long) pThis, pThis->iCurNumWrkThrd); + dbgprintf("Queue 0x%lx: waiting %ld ms on worker thread termination, %d still running\n", + queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd); if(pthread_cond_timedwait(&pThis->condThrdTrm, pThis->mut, &t) != 0) { - dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", (unsigned long) pThis); + dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", queueGetID(pThis)); bTimedOut = 1; /* we exit the loop on timeout */ } } @@ -521,10 +540,13 @@ queueWrkThrdCancel(queue_t *pThis) // TODO: we need to implement peek(), without it (today!) we lose one message upon // worker cancellation! -- rgerhards, 2008-01-14 + /* awake the workers one more time, just to be sure */ + pthread_cond_broadcast(pThis->notEmpty); + /* first tell the workers our request */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED) { - dbgprintf("Queue 0x%lx: canceling worker thread %d\n", (unsigned long) pThis, i); + dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i); pthread_cancel(pThis->pWrkThrds[i].thrdID); } @@ -544,11 +566,21 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis); - iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, 2); // TODO: timeout configurable! + /* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This + * is necessary so that all threads get sent the termination command. With a timeout of 0, however, + * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as + * good. + */ + iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, pThis->toQShutdown); if(iRet == RS_RET_TIMED_OUT) { - /* OK, we now need to try force the shutdown */ - dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", (unsigned long) pThis); - iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 4); // TODO: timeout configurable! + if(pThis->toQShutdown == 0) { + iRet = RS_RET_OK; + } else { + /* OK, we now need to try force the shutdown */ + dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", + queueGetID(pThis)); + iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); + } } if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */ @@ -559,9 +591,12 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* finally join the threads * In case of a cancellation, this may actually take some time. This is also - * needed to clean up the thread descriptors, even with a regular termination + * needed to clean up the thread descriptors, even with a regular termination. + * And, most importantly, this is needed if we have an indifitite termination + * time set (timeout == 0)! -- rgerhards, 2008-01-14 */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { +dbgprintf("join thred %d\n", i); pthread_join(pThis->pWrkThrds[i].thrdID, NULL); } @@ -573,7 +608,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) } dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", - (unsigned long) pThis, pThis->iQueueSize); + queueGetID(pThis), pThis->iQueueSize); return iRet; } @@ -591,6 +626,7 @@ queueWorker(void *arg) void *pUsr; sigset_t sigSet; int iMyThrdIndx; /* index for this thread in queue thread table */ + int iCancelStateSave; assert(pThis != NULL); @@ -603,28 +639,37 @@ queueWorker(void *arg) break; assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()); - dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", (unsigned long) pThis, iMyThrdIndx); + dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx); /* tell the world there is one more worker */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); pThis->iCurNumWrkThrd++; pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); /* now we have our identity, on to real processing */ while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) { + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", - (unsigned long) pThis, iMyThrdIndx); - pthread_cond_wait (pThis->notEmpty, pThis->mut); + queueGetID(pThis), iMyThrdIndx); + pthread_cond_wait(pThis->notEmpty, pThis->mut); } if(pThis->iQueueSize > 0) { /* dequeue element (still protected from mutex) */ iRet = queueDel(pThis, &pUsr); queueChkPersist(pThis); // when we support peek(), we must do this down after the del! pthread_mutex_unlock(pThis->mut); - pthread_cond_signal (pThis->notFull); + pthread_cond_signal(pThis->notFull); + pthread_setcancelstate(iCancelStateSave, NULL); + /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is + * a cancellation point in itself. As we run most of the time without cancel enabled, I fear + * we may never get cancelled if we do not create a cancellation point ourselfs. + */ + pthread_testcancel(); /* do actual processing (the lengthy part, runs in parallel) * If we had a problem while dequeing, we do not call the consumer, * but we otherwise ignore it. This is in the hopes that it will be @@ -634,19 +679,20 @@ queueWorker(void *arg) if(iRet == RS_RET_OK) { rsRetVal iRetLocal; dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n", - (unsigned long) pThis, iMyThrdIndx); + queueGetID(pThis), iMyThrdIndx); iRetLocal = pThis->pConsumer(pUsr); dbgprintf("Queue 0x%lx/w%d: worker: consumer returnd %d\n", - (unsigned long) pThis, iMyThrdIndx, iRetLocal); + queueGetID(pThis), iMyThrdIndx, iRetLocal); if(iRetLocal != RS_RET_OK) dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n", (unsigned long) pThis, iMyThrdIndx, iRetLocal); } else { dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things " - "may happen\n", (unsigned long) pThis, iMyThrdIndx, iRet); + "may happen\n", queueGetID(pThis), iMyThrdIndx, iRet); } } else { /* the mutex must be unlocked in any case (important for termination) */ pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); } /* We now yield to give the other threads a chance to obtain the mutex. If we do not @@ -666,17 +712,19 @@ queueWorker(void *arg) if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN) && pThis->iQueueSize > 0) dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has " - " %d messages to process.\n", (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize); + " %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); } /* indicate termination */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); pThis->iCurNumWrkThrd--; pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n", - (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd); + queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd); pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); pthread_exit(0); } @@ -772,7 +820,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); - dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", (unsigned long) pThis, pThis->qType, + dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", queueGetID(pThis), pThis->qType, pThis->iMaxFileSize); if(pThis->qType != QUEUETYPE_DIRECT) { @@ -819,7 +867,7 @@ finalize_it: static rsRetVal queuePersist(queue_t *pThis) { DEFiRet; - strm_t *psQIF = NULL;; /* Queue Info File */ + strm_t *psQIF = NULL; /* Queue Info File */ uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; @@ -850,6 +898,7 @@ static rsRetVal queuePersist(queue_t *pThis) CHKiRet(strmConstruct(&psQIF)); CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE)); + CHKiRet(strmSetiAddtlOpenFlags(psQIF, O_TRUNC)); CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE)); CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam)); CHKiRet(strmConstructFinalize(psQIF)); @@ -1017,10 +1066,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) while(pThis->iQueueSize >= pThis->iMaxQueueSize) { dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", (unsigned long) pThis); - - clock_gettime (CLOCK_REALTIME, &t); - t.tv_sec += 2; /* TODO: configurable! */ - + queueTimeoutComp(&t, pThis->toEnq); if(pthread_cond_timedwait (pThis->notFull, pThis->mut, &t) != 0) { dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", (unsigned long) pThis); @@ -1047,15 +1093,9 @@ finalize_it: /* some simple object access methods */ DEFpropSetMeth(queue, bImmediateShutdown, int); DEFpropSetMeth(queue, iPersistUpdCnt, int); -#if 0 -rsRetVal queueSetiPersistUpdCnt(queue_t *pThis, int pVal) -{ - dbgprintf("queueSetiPersistUpdCnt(), val %d\n", pVal); - pThis->iPersistUpdCnt = pVal; -dbgprintf("queSetiPersist..(): PersUpdCnt %d, UpdsSincePers %d\n", pThis->iPersistUpdCnt, pThis->iUpdsSincePersist); - return RS_RET_OK; -} -#endif +DEFpropSetMeth(queue, toQShutdown, long); +DEFpropSetMeth(queue, toActShutdown, long); +DEFpropSetMeth(queue, toEnq, long); /* This function can be used as a generic way to set properties. Only the subset -- cgit