summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog6
-rw-r--r--action.c12
-rw-r--r--action.h1
-rw-r--r--runtime/wti.c32
-rw-r--r--runtime/wtp.c12
5 files changed, 60 insertions, 3 deletions
diff --git a/ChangeLog b/ChangeLog
index fe3517e4..e8ceda4c 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,11 @@
---------------------------------------------------------------------------
Version 5.5.5 [DEVEL] (rgerhards), 2010-05-??
+- added new cancel-reduced action thread termination method
+ We now manage to cancel threads that block inside a retry loop to
+ terminate without the need to cancel the thread. Avoiding cancellation
+ helps keep the system complexity minimal and thus provides for better
+ stability. This also solves some issues with improper shutdown when
+ inside an action retry loop.
---------------------------------------------------------------------------
Version 5.5.4 [DEVEL] (rgerhards), 2010-05-03
- This version offers full support for Solaris on Intel and Sparc
diff --git a/action.c b/action.c
index 7751fe0a..c8622764 100644
--- a/action.c
+++ b/action.c
@@ -503,7 +503,7 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
ASSERT(pThis != NULL);
iRetries = 0;
- while(pThis->eState == ACT_STATE_RTRY) {
+ while((*pThis->pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) {
iRet = pThis->pMod->tryResume(pThis->pModData);
if((pThis->iResumeOKinRow > 999) && (pThis->iResumeOKinRow % 1000 == 0)) {
bTreatOKasSusp = 1;
@@ -522,6 +522,9 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
iSleepPeriod = pThis->iResumeInterval;
ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */
srSleep(iSleepPeriod, 0);
+ if(*pThis->pbShutdownImmediate) {
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ }
}
} else if(iRet == RS_RET_DISABLE_ACTION) {
actionDisable(pThis);
@@ -532,6 +535,7 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
pThis->iNbrResRtry = 0;
}
+finalize_it:
RETiRet;
}
@@ -859,6 +863,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme
i = pBatch->iDoneUpTo; /* all messages below that index are processed */
iElemProcessed = 0;
iCommittedUpTo = i;
+ pAction->pbShutdownImmediate = pbShutdownImmediate;
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
if(*pbShutdownImmediate)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
@@ -920,8 +925,9 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmedi
bDone = 0;
do {
localRet = tryDoAction(pAction, pBatch, &nElem, pbShutdownImmediate);
- if(localRet == RS_RET_FORCE_TERM)
- FINALIZE;
+ if(localRet == RS_RET_FORCE_TERM) {
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ }
if( localRet == RS_RET_OK
|| localRet == RS_RET_PREVIOUS_COMMITTED
|| localRet == RS_RET_DEFER_COMMIT) {
diff --git a/action.h b/action.h
index 13b0d92e..ab7dfec7 100644
--- a/action.h
+++ b/action.h
@@ -85,6 +85,7 @@ struct action_s {
SYNC_OBJ_TOOL; /* required for mutex support */
pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */
uchar *pszName; /* action name (for documentation) */
+ int *pbShutdownImmediate;/* to facilitate shutdown, if var is 1, shut down immediately */
//uchar **ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */
void *ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */
size_t *lenMsgs; /* length of message in ppMsgs */
diff --git a/runtime/wti.c b/runtime/wti.c
index 44a27b3e..2dfc2d3f 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -109,6 +109,29 @@ wtiSetState(wti_t *pThis, sbool bNewVal)
}
+/* advise all workers to start by interrupting them. That should unblock all srSleep()
+ * calls.
+ */
+rsRetVal
+wtiWakeupThrd(wti_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, wti);
+
+
+ if(wtiGetState(pThis)) {
+ /* we first try the cooperative "cancel" interface */
+ pthread_kill(pThis->thrdID, SIGTTIN);
+ dbgprintf("sent SIGTTIN to worker thread %u, giving it a chance to terminate\n", (unsigned) pThis->thrdID);
+ srSleep(0, 10000);
+ dbgprintf("cooperative worker termination failed, using cancellation...\n");
+ }
+
+ RETiRet;
+}
+
+
/* Cancel the thread. If the thread is not running. But it is save and legal to
* call wtiCancelThrd() in such situations. This function only returns when the
* thread has terminated. Else we may get race conditions all over the code...
@@ -125,7 +148,16 @@ wtiCancelThrd(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
+
if(wtiGetState(pThis)) {
+ /* we first try the cooperative "cancel" interface */
+#if 0
+ pthread_kill(pThis->thrdID, SIGTTIN);
+ dbgprintf("sent SIGTTIN to worker thread %u, giving it a chance to terminate\n", (unsigned) pThis->thrdID);
+ srSleep(0, 10000);
+ dbgprintf("cooperative worker termination failed, using cancellation...\n");
+#endif
+
dbgoprint((obj_t*) pThis, "canceling worker thread\n");
pthread_cancel(pThis->thrdID);
/* now wait until the thread terminates... */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 51fab191..65155efc 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -220,6 +220,7 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
{
DEFiRet;
int bTimedOut;
+ int i;
ISOBJ_TYPE_assert(pThis, wtp);
@@ -234,6 +235,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
+
+for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ wtiWakeupThrd(pThis->pWrkr[i]);
+}
+
DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n",
wtpGetDbgHdr(pThis), timeoutVal(ptTimeout),
ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
@@ -348,9 +354,15 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
pThis = pWti->pWtp;
ISOBJ_TYPE_assert(pThis, wtp);
+ /* block all signals */
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
+ /* but ignore SIGTTN, which we (ab)use to signal the thread to shutdown -- rgerhards, 2009-07-20 */
+ sigemptyset(&sigSet);
+ sigaddset(&sigSet, SIGTTIN);
+ pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL);
+
# if HAVE_PRCTL && defined PR_SET_NAME
/* set thread name - we ignore if the call fails, has no harsh consequences... */
pszDbgHdr = wtpGetDbgHdr(pThis);