diff options
-rw-r--r-- | debug.c | 3 | ||||
-rw-r--r-- | queue.c | 256 | ||||
-rwxr-xr-x | srUtils.c | 34 | ||||
-rwxr-xr-x | srUtils.h | 3 | ||||
-rw-r--r-- | wti.c | 12 | ||||
-rw-r--r-- | wtp.c | 32 | ||||
-rw-r--r-- | wtp.h | 2 |
7 files changed, 222 insertions, 120 deletions
@@ -55,6 +55,7 @@ static int bPrintFuncDBOnExit = 0; /* shall the function entry and exit be logge static char *pszAltDbgFileName = "/home/rger/proj/rsyslog/log"; /* if set, debug output is *also* sent to here */ static FILE *altdbg; /* and the handle for alternate debug output */ static FILE *stddbg; +static dbgFuncDB_t pCurrFunc; /* list of all known FuncDBs. We use a special list, because it must only be single-linked. As @@ -653,7 +654,7 @@ sigsegvHdlr(int signum) signame = ""; } - dbgprintf("Signal %d%s occured, execution must be terminated %d.\n", signum, signame, SIGSEGV); + dbgprintf("\n\n\n\nSignal %d%s occured, execution must be terminated %d.\n\n\n\n", signum, signame, SIGSEGV); dbgCallStackPrintAll(); @@ -1,3 +1,5 @@ + // TODO: we need to implement peek(), without it (today!) we lose one message upon + // worker cancellation! -- rgerhards, 2008-01-14 // TODO: think about mutDA - I think it's no longer needed // TODO: start up the correct num of workers when switching to non-DA mode // TODO: "preforked" worker threads @@ -776,81 +778,157 @@ queueDel(queue_t *pThis, void *pUsr) static rsRetVal queueShutdownWorkers(queue_t *pThis) { DEFiRet; + DEFVARS_mutexProtection; int i; + struct timespec tTimeout; + rsRetVal iRetLocal; ISOBJ_TYPE_assert(pThis, queue); dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis)); - /* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This - * is necessary so that all threads get sent the termination command. With a timeout of 0, however, - * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as - * good. - */ - wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, pThis->toQShutdown); - if(iRet == RS_RET_TIMED_OUT) { - if(pThis->toQShutdown == 0) { - iRet = RS_RET_OK; - } else { - /* OK, we now need to try force the shutdown */ - dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", - queueGetID(pThis)); - iRet = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); + // TODO: reminder, delte after testing: do we need to modify the high wtr mark? I dont' think so 2008-01-25 + /* first try to shutdown the queue within the regular shutdown period */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + if(pThis->iQueueSize > 0) { + if(pThis->bRunsDA) { + /* worker threads may be inactive after reaching low water + * mark. Lower the mark and react workers. + */ + pThis->iLowWtrMrk = 0; + wtpAdviseMaxWorkers(pThis->pWtpReg, 1); } } + END_MTX_PROTECTED_OPERATIONS(pThis->mut); - if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */ - /* still didn't work out - so we now need to cancel the workers */ - dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis); - iRet = wtpCancelAll(pThis->pWtpReg); - } - - // TODO: do it just once but right ;) - if(pThis->pWtpDA != NULL) { - wtpShutdownAll(pThis->pWtpDA, pThis->toQShutdown, pThis->toQShutdown); - if(iRet == RS_RET_TIMED_OUT) { - if(pThis->toQShutdown == 0) { - iRet = RS_RET_OK; - } else { - /* OK, we now need to try force the shutdown */ - dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", + /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found + * out there are no active workers - that doesn't matter: the wtp knows about that and so will + * return immediately. + * We do not yet care about the DA worker - that will be handled down later in the process. + * Note that we must not request shutdown right now - that may introduce a race: if the regular queue + * still runs DA assisted and the DA worker gets scheduled first, it will terminate itself (if the DA + * queue happens to be empty at that instant). Then the regular worker enqueues messages, what will lead + * to a restart of the worker. Of course, everything will continue to run, but in a bit sub-optimal way + * (from a performance point of view). So we don't do anything right now. The DA queue will continue to + * process messages and shutdown itself in any case if there is nothing to do. So we don't loose anything + * by not requesting shutdown now. + * rgerhards, 2008-01-25 + */ + /* first calculate absolute timeout - we need the absolute value here, because we need to coordinate + * shutdown of both the regular and DA queue on *the same* timeout. + */ + timeoutComp(&tTimeout, pThis->toQShutdown); + iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgprintf("Queue 0x%lx: regular shutdown timed out on primary queue (this is OK)\n", queueGetID(pThis)); + } else { + /* OK, the regular queue is now shut down. So we can now wait for the DA queue (if running DA) */ + dbgprintf("Queue 0x%lx: regular queue workers shut down.\n", queueGetID(pThis)); + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + if(pThis->bRunsDA) { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgprintf("Queue 0x%lx: we have a DA queue (0x%lx), requesting its shutdown.\n", + queueGetID(pThis), queueGetID(pThis->pqDA)); + /* we use the same absolute timeout as above, so we do not use more than the configured + * timeout interval! + */ + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgprintf("Queue 0x%lx: shutdown timed out on DA queue (this is OK)\n", queueGetID(pThis)); - iRet = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); } + } else { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } + } - if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */ - /* still didn't work out - so we now need to cancel the workers */ - dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis); - iRet = wtpCancelAll(pThis->pWtpDA); + /* when we reach this point, both queues are either empty or the regular queue shutdown timeout + * has expired. Now we need to check if we areconfigured to not loose messages. If so, we need + * to persist the queue to disk (this is only possible if the queue is DA-enabled). + */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + /* optimize parameters for shutdown of DA-enabled queues */ + if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) { + /* switch to enqueue-only mode so that no more actions happen */ + if(pThis->bRunsDA == 0) { + queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ + } else { + queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* switch to enqueue-only mode */ } + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + /* make sure we do not timeout before we are done */ + dbgprintf("Queue 0x%lx: bSaveOnShutdown configured, eternal timeout set\n", queueGetID(pThis)); + timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); + /* and run the primary's queue worker to drain the queue */ + iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout); + if(iRetLocal != RS_RET_OK) { + dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying to shut down primary queue in disk save mode, " + "continuing, but results are unpredictable\n", + queueGetID(pThis), iRetLocal); + } + } else { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } - - - /* finally join the threads - * In case of a cancellation, this may actually take some time. This is also - * needed to clean up the thread descriptors, even with a regular termination. - * And, most importantly, this is needed if we have an indifitite termination - * time set (timeout == 0)! -- rgerhards, 2008-01-14 + + /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we + * can now request immediate shutdown of any remaining workers. */ -#if 0 // totally wrong, we must implement something along these lines in wtp! -RUNLOG; - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWtpReg->pWrkr[i]->tCurrCmd != eWRKTHRD_STOPPED) { - wtiJoinThrd(pThis->pWtpReg->pWrkr[i]); + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + if(pThis->iQueueSize > 0) { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); + iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal != RS_RET_OK) { + dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue " + "in disk save mode. Continuing, but results are unpredictable\n", + queueGetID(pThis), iRetLocal); } + } else { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } -RUNLOG; - if(pThis->pWtpDA != NULL) { - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWtpDA->pWrkr[i]->tCurrCmd != eWRKTHRD_STOPPED) { - wtiJoinThrd(pThis->pWtpDA->pWrkr[i]); - } + /* Now queue workers should have terminated. If not, we need to cancel them as we have applied + * all timeout setting. If any worker in any queue still executes, its consumer is possibly + * long-running and cancelling is the only way to get rid of it. Note that the + * cancellation handler will probably re-queue a user pointer, so the queue's enqueue + * function is still needed (what is no problem as we do not yet destroy the queue - but I + * thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25 + */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + if(pThis->iQueueSize > 0) { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgprintf("Queue 0x%lx: primary queue worker threads could not be shutdown, now canceling them\n", + queueGetID(pThis)); + iRetLocal = wtpCancelAll(pThis->pWtpReg); + if(iRetLocal != RS_RET_OK) { + dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel primary queue worker " + "threads, continuing, but results are unpredictable\n", + queueGetID(pThis), iRetLocal); + } + } else { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + } + + /* ... and now the DA queue, if it exists (should always be after the primary one) */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgprintf("Queue 0x%lx: DA worker threads could not be shutdown, now canceling them\n", + queueGetID(pThis)); + iRetLocal = wtpCancelAll(pThis->pWtpReg); + if(iRetLocal != RS_RET_OK) { + dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel DA queue worker " + "threads, continuing, but results are unpredictable\n", + queueGetID(pThis), iRetLocal); } + } else { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } -#endif + /* ... finally ... all worker threads have terminated :-) + * Well, more precisely, they *are in termination*. Some cancel cleanup handlers + * may still be running. + */ dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", queueGetID(pThis), pThis->iQueueSize); @@ -1112,20 +1190,21 @@ queueChkStopWrkrReg(queue_t *pThis) /* must only be called when the queue mutex is locked, else results - * are not stable! DA version + * are not stable! DA queue version */ static int queueIsIdleDA(queue_t *pThis) { - return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk)); + /* remember: iQueueSize is the DA queue size, not the main queue! */ + return (pThis->iQueueSize == 0); } /* must only be called when the queue mutex is locked, else results - * are not stable! Regular version + * are not stable! Regular queue version */ static int queueIsIdleReg(queue_t *pThis) { - return (pThis->iQueueSize == 0); + return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk)); } @@ -1322,7 +1401,6 @@ rsRetVal queueDestruct(queue_t **ppThis) { DEFiRet; queue_t *pThis; - DEFVARS_mutexProtection; assert(ppThis != NULL); pThis = *ppThis; @@ -1332,55 +1410,37 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ - /* we do not need to take care of any messages left in queue if we are in enqueue only mode */ - if(!pThis->bEnqOnly) { - /* in regular mode, need look at termination */ - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - /* optimize parameters for shutdown of DA-enabled queues */ - if(pThis->bIsDA && pThis->iQueueSize > 0) { // TODO: atomic iQueueSize! - dbgprintf("IsDA queue, modifying params for draining\n"); - pThis->iHighWtrMrk = 1; /* make sure we drain */ - pThis->iLowWtrMrk = 0; /* disable low water mark algo */ - if(pThis->bRunsDA == 0) { - if(pThis->iQueueSize > 0) { - queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ - } - } else { - queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */ - /* worker may have been waited on low water mark, reactivate */ - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); - } - if(pThis->bSaveOnShutdown) { - dbgprintf("bSaveOnShutdown set, eternal timeout set\n"); - pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; - } - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - } - } + /* shut down all workers (handles *all* of the persistence logic) */ + queueShutdownWorkers(pThis); - /* at this point, the queue is either empty with all workers being idle (or deact) or the queue - * is full and all workers are running. We now need to wait for everyone to become idle. - */ + /* finally destruct our (regular) worker thread pool */ if(pThis->qType != QUEUETYPE_DIRECT) { - queueShutdownWorkers(pThis); + wtpDestruct(&pThis->pWtpReg); } - /* if still running DA, terminate disk queue (note that the DA queue is NULL if it was never used) */ - if(pThis->bRunsDA && pThis->pqDA != NULL) + /* Now check if we actually have a DA queue and, if so, destruct it. + * Note that the wtp must be destructed first, it may be in cancel cleanup handler + * *right now* and actually *need* to access the queue object to persist some final + * data (re-queueing case). So we need to destruct the wtp first, which will make + * sure all workers have terminated. + */ + if(pThis->pWtpDA != NULL) { + wtpDestruct(&pThis->pWtpDA); queueDestruct(&pThis->pqDA); + } - /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) */ + /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) + * This handler is most important for disk queues, it will finally persist the necessary + * on-disk structures. In theory, other queueing modes may implement their other (non-DA) + * methods of persisting a queue between runs, but in practice all of this is done via + * disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here + * if need arises (what I doubt...) -- rgerhards, 2008-01-25 + */ CHKiRet_Hdlr(queuePersist(pThis)) { dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet); } - /* ... then free resources */ - if(pThis->qType != QUEUETYPE_DIRECT) { - wtpDestruct(&pThis->pWtpReg); - if(pThis->pWtpDA != NULL) - wtpDestruct(&pThis->pWtpDA); - } - + /* finally, clean up some simple things... */ if(pThis->pqParent == NULL) { /* if we are not a child, we allocated our own mutex, which we now need to destroy */ pthread_mutex_destroy(pThis->mut); @@ -1389,6 +1449,7 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove pthread_mutex_destroy(&pThis->mutThrdMgmt); pthread_cond_destroy(&pThis->notFull); pthread_cond_destroy(&pThis->notEmpty); + /* type-specific destructor */ iRet = pThis->qDestruct(pThis); @@ -1467,6 +1528,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) ISOBJ_TYPE_assert(pThis, queue); +// TODO: check if queue is terminating and if so either discard message or enqeue it to the DA queue *directly* dbgprintf("Queue %p: EnqObj() 1\n", pThis); /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE @@ -316,7 +316,7 @@ int getNumberDigits(long lNum) * rgerhards, 2008-01-14 */ rsRetVal -timeoutComp(struct timespec *pt, int iTimeout) +timeoutComp(struct timespec *pt, long iTimeout) { assert(pt != NULL); /* compute timeout */ @@ -331,6 +331,38 @@ timeoutComp(struct timespec *pt, int iTimeout) } +/* This function is kind of the reverse of timeoutComp() - it takes an absolute + * timeout value and computes how far this is in the future. If the value is already + * in the past, 0 is returned. The return value is in ms. + * rgerhards, 2008-01-25 + */ +long +timeoutVal(struct timespec *pt) +{ + struct timespec t; + long iTimeout; + + assert(pt != NULL); + /* compute timeout */ + clock_gettime(CLOCK_REALTIME, &t); + if(pt->tv_sec < t.tv_sec) { + iTimeout = 0; /* in the past! */ + } else if(pt->tv_sec == t.tv_sec) { + if(pt->tv_nsec < t.tv_nsec) { + iTimeout = 0; /* in the past! */ + } else { + iTimeout = (pt->tv_nsec - t.tv_nsec) / 1000; + } + } else { + iTimeout = pt->tv_sec - t.tv_nsec; + iTimeout += 1000 - (pt->tv_nsec / 1000); + iTimeout += t.tv_nsec / 1000; + } + + return iTimeout; +} + + /* cancellation cleanup handler - frees provided mutex * rgerhards, 2008-01-14 */ @@ -66,7 +66,8 @@ void skipWhiteSpace(uchar **pp); rsRetVal genFileName(uchar **ppName, uchar *pDirName, size_t lenDirName, uchar *pFName, size_t lenFName, long lNum, int lNumDigits); int getNumberDigits(long lNum); -rsRetVal timeoutComp(struct timespec *pt, int iTimeout); +rsRetVal timeoutComp(struct timespec *pt, long iTimeout); +long timeoutVal(struct timespec *pt); void mutexCancelCleanup(void *arg); /* mutex operations */ @@ -159,6 +159,16 @@ rsRetVal wtiDestruct(wti_t **ppThis) /* we can not be canceled, that would have a myriad of side-effects */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + /* if we reach this point, we must make sure the associated worker has terminated. It is + * the callers duty to make sure the worker has already terminated. + * TODO: is it *really* the caller's duty? ...mmmhhhh.... smells bad... rgerhards, 2008-01-25 + */ + wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */ + + d_pthread_mutex_lock(&pThis->mut); + assert(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) <= eWRKTHRD_TERMINATING); // I knew it smelled bad... + d_pthread_mutex_unlock(&pThis->mut); + /* actual destruction */ pthread_cond_destroy(&pThis->condInitDone); pthread_mutex_destroy(&pThis->mut); @@ -166,7 +176,7 @@ rsRetVal wtiDestruct(wti_t **ppThis) if(pThis->pszDbgHdr != NULL) free(pThis->pszDbgHdr); - /* and finally delete the queue objet itself */ + /* and finally delete the wti object itself */ free(pThis); *ppThis = NULL; @@ -106,7 +106,6 @@ wtpConstructFinalize(wtp_t *pThis) /* alloc and construct workers - this can only be done in finalizer as we previously do * not know the max number of workers */ -RUNLOG; if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -276,22 +275,19 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) * rgerhards, 2008-01-14 */ rsRetVal -wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, long iTimeout) +wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout) { DEFiRet; int bTimedOut; - struct timespec t; int iCancelStateSave; dbgPrintAllDebugInfo(); RUNLOG_VAR("%p", pThis); -RUNLOG_VAR("%ld", iTimeout); RUNLOG_VAR("%d", tShutdownCmd); ISOBJ_TYPE_assert(pThis, wtp); wtpSetState(pThis, tShutdownCmd); wtpWakeupAllWrkr(pThis); - timeoutComp(&t, iTimeout);/* get timeout */ /* and wait for their termination */ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); @@ -302,9 +298,9 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", - wtpGetDbgHdr(pThis), iTimeout, pThis->iCurNumWrkThrd); + wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd); - if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, &t) != 0) { + if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, ptTimeout) != 0) { dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis)); bTimedOut = 1; /* we exit the loop on timeout */ } @@ -313,6 +309,11 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); if(bTimedOut) iRet = RS_RET_TIMED_OUT; + + /* see if we need to harvest (join) any terminated threads (even in timeout case, + * some may have terminated... + */ + wtpProcessThrdChanges(pThis); dbgprintf("wtpShutdownAll exit"); RETiRet; @@ -345,25 +346,25 @@ wtpCancelAll(wtp_t *pThis) { DEFiRet; int i; - // TODO: we need to implement peek(), without it (today!) we lose one message upon - // worker cancellation! -- rgerhards, 2008-01-14 + // TODO: mutex?? // TODO: cancellation in wti! ISOBJ_TYPE_assert(pThis, wtp); /* process any pending thread requests so that we know who actually is still running */ wtpProcessThrdChanges(pThis); - /* awake the workers one more time, just to be sure */ - wtpWakeupAllWrkr(pThis); - +RUNLOG_VAR("%d", pThis->iNumWorkerThreads);; /* first tell the workers our request */ - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { // TODO: mutex lock! +RUNLOG_VAR("%p", pThis->pWrkr[i]); if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) { +RUNLOG; dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); pthread_cancel(pThis->pWrkr[i]->thrdID); } } +RUNLOG; RETiRet; } @@ -492,9 +493,6 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) * we do NOT start a new one. Let's give the other one a chance, first. */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - // TODO: sync! -RUNLOG; -dbgprintf("%s: i %d, wti_T* %p\n", wtpGetDbgHdr(pThis), i, pThis->pWrkr[i]); if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) { break; } @@ -510,7 +508,6 @@ dbgprintf("%s: after thrd search: i %d, max %d\n", wtpGetDbgHdr(pThis), i, pThis dbgprintf("%s: started with state %d, num workers now %d\n", wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd); -RUNLOG; /* we try to give the starting worker a little boost. It won't help much as we still * hold the queue's mutex, but at least it has a chance to start on a single-CPU system. */ @@ -521,7 +518,6 @@ RUNLOG; finalize_it: END_MTX_PROTECTED_OPERATIONS(&pThis->mut); -RUNLOG; RETiRet; } @@ -92,10 +92,10 @@ rsRetVal wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex); rsRetVal wtpSetState(wtp_t *pThis, wtpState_t iNewState); rsRetVal wtpWakeupWrkr(wtp_t *pThis); rsRetVal wtpWakeupAllWrkr(wtp_t *pThis); -rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, long iTimeout); rsRetVal wtpCancelAll(wtp_t *pThis); rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg); rsRetVal wtpSignalWrkrTermination(wtp_t *pWtp); +rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout); PROTOTYPEObjClassInit(wtp); PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)); |