diff options
-rw-r--r-- | ChangeLog | 6 | ||||
-rw-r--r-- | action.c | 12 | ||||
-rw-r--r-- | action.h | 1 | ||||
-rw-r--r-- | runtime/wti.c | 32 | ||||
-rw-r--r-- | runtime/wtp.c | 12 |
5 files changed, 60 insertions, 3 deletions
@@ -1,5 +1,11 @@ --------------------------------------------------------------------------- Version 5.5.5 [DEVEL] (rgerhards), 2010-05-?? +- added new cancel-reduced action thread termination method + We now manage to cancel threads that block inside a retry loop to + terminate without the need to cancel the thread. Avoiding cancellation + helps keep the system complexity minimal and thus provides for better + stability. This also solves some issues with improper shutdown when + inside an action retry loop. --------------------------------------------------------------------------- Version 5.5.4 [DEVEL] (rgerhards), 2010-05-03 - This version offers full support for Solaris on Intel and Sparc @@ -503,7 +503,7 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) ASSERT(pThis != NULL); iRetries = 0; - while(pThis->eState == ACT_STATE_RTRY) { + while((*pThis->pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { iRet = pThis->pMod->tryResume(pThis->pModData); if((pThis->iResumeOKinRow > 999) && (pThis->iResumeOKinRow % 1000 == 0)) { bTreatOKasSusp = 1; @@ -522,6 +522,9 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) iSleepPeriod = pThis->iResumeInterval; ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */ srSleep(iSleepPeriod, 0); + if(*pThis->pbShutdownImmediate) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } } } else if(iRet == RS_RET_DISABLE_ACTION) { actionDisable(pThis); @@ -532,6 +535,7 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) pThis->iNbrResRtry = 0; } +finalize_it: RETiRet; } @@ -859,6 +863,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme i = pBatch->iDoneUpTo; /* all messages below that index are processed */ iElemProcessed = 0; iCommittedUpTo = i; + pAction->pbShutdownImmediate = pbShutdownImmediate; while(iElemProcessed <= *pnElem && i < pBatch->nElem) { if(*pbShutdownImmediate) ABORT_FINALIZE(RS_RET_FORCE_TERM); @@ -920,8 +925,9 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmedi bDone = 0; do { localRet = tryDoAction(pAction, pBatch, &nElem, pbShutdownImmediate); - if(localRet == RS_RET_FORCE_TERM) - FINALIZE; + if(localRet == RS_RET_FORCE_TERM) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } if( localRet == RS_RET_OK || localRet == RS_RET_PREVIOUS_COMMITTED || localRet == RS_RET_DEFER_COMMIT) { @@ -85,6 +85,7 @@ struct action_s { SYNC_OBJ_TOOL; /* required for mutex support */ pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */ uchar *pszName; /* action name (for documentation) */ + int *pbShutdownImmediate;/* to facilitate shutdown, if var is 1, shut down immediately */ //uchar **ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */ void *ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */ size_t *lenMsgs; /* length of message in ppMsgs */ diff --git a/runtime/wti.c b/runtime/wti.c index 44a27b3e..2dfc2d3f 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -109,6 +109,29 @@ wtiSetState(wti_t *pThis, sbool bNewVal) } +/* advise all workers to start by interrupting them. That should unblock all srSleep() + * calls. + */ +rsRetVal +wtiWakeupThrd(wti_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, wti); + + + if(wtiGetState(pThis)) { + /* we first try the cooperative "cancel" interface */ + pthread_kill(pThis->thrdID, SIGTTIN); + dbgprintf("sent SIGTTIN to worker thread %u, giving it a chance to terminate\n", (unsigned) pThis->thrdID); + srSleep(0, 10000); + dbgprintf("cooperative worker termination failed, using cancellation...\n"); + } + + RETiRet; +} + + /* Cancel the thread. If the thread is not running. But it is save and legal to * call wtiCancelThrd() in such situations. This function only returns when the * thread has terminated. Else we may get race conditions all over the code... @@ -125,7 +148,16 @@ wtiCancelThrd(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); + if(wtiGetState(pThis)) { + /* we first try the cooperative "cancel" interface */ +#if 0 + pthread_kill(pThis->thrdID, SIGTTIN); + dbgprintf("sent SIGTTIN to worker thread %u, giving it a chance to terminate\n", (unsigned) pThis->thrdID); + srSleep(0, 10000); + dbgprintf("cooperative worker termination failed, using cancellation...\n"); +#endif + dbgoprint((obj_t*) pThis, "canceling worker thread\n"); pthread_cancel(pThis->thrdID); /* now wait until the thread terminates... */ diff --git a/runtime/wtp.c b/runtime/wtp.c index 51fab191..65155efc 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -220,6 +220,7 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout { DEFiRet; int bTimedOut; + int i; ISOBJ_TYPE_assert(pThis, wtp); @@ -234,6 +235,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { + +for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { + wtiWakeupThrd(pThis->pWrkr[i]); +} + DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n", wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd)); @@ -348,9 +354,15 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in pThis = pWti->pWtp; ISOBJ_TYPE_assert(pThis, wtp); + /* block all signals */ sigfillset(&sigSet); pthread_sigmask(SIG_BLOCK, &sigSet, NULL); + /* but ignore SIGTTN, which we (ab)use to signal the thread to shutdown -- rgerhards, 2009-07-20 */ + sigemptyset(&sigSet); + sigaddset(&sigSet, SIGTTIN); + pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL); + # if HAVE_PRCTL && defined PR_SET_NAME /* set thread name - we ignore if the call fails, has no harsh consequences... */ pszDbgHdr = wtpGetDbgHdr(pThis); |