From af8582e50914cfc719be1a1a80eeb81030d611c5 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 17 May 2010 14:24:27 +0200 Subject: added new cancel-reduced action thread termination method We now manage to cancel threads that block inside a retry loop to terminate without the need to cancel the thread. Avoiding cancellation helps keep the system complexity minimal and thus provides for better stability. This also solves some issues with improper shutdown when inside an action retry loop. --- ChangeLog | 6 ++++++ action.c | 12 +++++++++--- action.h | 1 + runtime/wti.c | 32 ++++++++++++++++++++++++++++++++ runtime/wtp.c | 12 ++++++++++++ 5 files changed, 60 insertions(+), 3 deletions(-) diff --git a/ChangeLog b/ChangeLog index fe3517e4..e8ceda4c 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,11 @@ --------------------------------------------------------------------------- Version 5.5.5 [DEVEL] (rgerhards), 2010-05-?? +- added new cancel-reduced action thread termination method + We now manage to cancel threads that block inside a retry loop to + terminate without the need to cancel the thread. Avoiding cancellation + helps keep the system complexity minimal and thus provides for better + stability. This also solves some issues with improper shutdown when + inside an action retry loop. --------------------------------------------------------------------------- Version 5.5.4 [DEVEL] (rgerhards), 2010-05-03 - This version offers full support for Solaris on Intel and Sparc diff --git a/action.c b/action.c index 7751fe0a..c8622764 100644 --- a/action.c +++ b/action.c @@ -503,7 +503,7 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) ASSERT(pThis != NULL); iRetries = 0; - while(pThis->eState == ACT_STATE_RTRY) { + while((*pThis->pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { iRet = pThis->pMod->tryResume(pThis->pModData); if((pThis->iResumeOKinRow > 999) && (pThis->iResumeOKinRow % 1000 == 0)) { bTreatOKasSusp = 1; @@ -522,6 +522,9 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) iSleepPeriod = pThis->iResumeInterval; ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */ srSleep(iSleepPeriod, 0); + if(*pThis->pbShutdownImmediate) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } } } else if(iRet == RS_RET_DISABLE_ACTION) { actionDisable(pThis); @@ -532,6 +535,7 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) pThis->iNbrResRtry = 0; } +finalize_it: RETiRet; } @@ -859,6 +863,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme i = pBatch->iDoneUpTo; /* all messages below that index are processed */ iElemProcessed = 0; iCommittedUpTo = i; + pAction->pbShutdownImmediate = pbShutdownImmediate; while(iElemProcessed <= *pnElem && i < pBatch->nElem) { if(*pbShutdownImmediate) ABORT_FINALIZE(RS_RET_FORCE_TERM); @@ -920,8 +925,9 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmedi bDone = 0; do { localRet = tryDoAction(pAction, pBatch, &nElem, pbShutdownImmediate); - if(localRet == RS_RET_FORCE_TERM) - FINALIZE; + if(localRet == RS_RET_FORCE_TERM) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } if( localRet == RS_RET_OK || localRet == RS_RET_PREVIOUS_COMMITTED || localRet == RS_RET_DEFER_COMMIT) { diff --git a/action.h b/action.h index 13b0d92e..ab7dfec7 100644 --- a/action.h +++ b/action.h @@ -85,6 +85,7 @@ struct action_s { SYNC_OBJ_TOOL; /* required for mutex support */ pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */ uchar *pszName; /* action name (for documentation) */ + int *pbShutdownImmediate;/* to facilitate shutdown, if var is 1, shut down immediately */ //uchar **ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */ void *ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */ size_t *lenMsgs; /* length of message in ppMsgs */ diff --git a/runtime/wti.c b/runtime/wti.c index 44a27b3e..2dfc2d3f 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -109,6 +109,29 @@ wtiSetState(wti_t *pThis, sbool bNewVal) } +/* advise all workers to start by interrupting them. That should unblock all srSleep() + * calls. + */ +rsRetVal +wtiWakeupThrd(wti_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, wti); + + + if(wtiGetState(pThis)) { + /* we first try the cooperative "cancel" interface */ + pthread_kill(pThis->thrdID, SIGTTIN); + dbgprintf("sent SIGTTIN to worker thread %u, giving it a chance to terminate\n", (unsigned) pThis->thrdID); + srSleep(0, 10000); + dbgprintf("cooperative worker termination failed, using cancellation...\n"); + } + + RETiRet; +} + + /* Cancel the thread. If the thread is not running. But it is save and legal to * call wtiCancelThrd() in such situations. This function only returns when the * thread has terminated. Else we may get race conditions all over the code... @@ -125,7 +148,16 @@ wtiCancelThrd(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); + if(wtiGetState(pThis)) { + /* we first try the cooperative "cancel" interface */ +#if 0 + pthread_kill(pThis->thrdID, SIGTTIN); + dbgprintf("sent SIGTTIN to worker thread %u, giving it a chance to terminate\n", (unsigned) pThis->thrdID); + srSleep(0, 10000); + dbgprintf("cooperative worker termination failed, using cancellation...\n"); +#endif + dbgoprint((obj_t*) pThis, "canceling worker thread\n"); pthread_cancel(pThis->thrdID); /* now wait until the thread terminates... */ diff --git a/runtime/wtp.c b/runtime/wtp.c index 51fab191..65155efc 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -220,6 +220,7 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout { DEFiRet; int bTimedOut; + int i; ISOBJ_TYPE_assert(pThis, wtp); @@ -234,6 +235,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { + +for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { + wtiWakeupThrd(pThis->pWrkr[i]); +} + DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n", wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd)); @@ -348,9 +354,15 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in pThis = pWti->pWtp; ISOBJ_TYPE_assert(pThis, wtp); + /* block all signals */ sigfillset(&sigSet); pthread_sigmask(SIG_BLOCK, &sigSet, NULL); + /* but ignore SIGTTN, which we (ab)use to signal the thread to shutdown -- rgerhards, 2009-07-20 */ + sigemptyset(&sigSet); + sigaddset(&sigSet, SIGTTIN); + pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL); + # if HAVE_PRCTL && defined PR_SET_NAME /* set thread name - we ignore if the call fails, has no harsh consequences... */ pszDbgHdr = wtpGetDbgHdr(pThis); -- cgit