summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2010-05-17 14:24:27 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2010-05-17 14:24:27 +0200
commitaf8582e50914cfc719be1a1a80eeb81030d611c5 (patch)
tree0ece18d61b635e07c4f1f6bfeefe31a96286b2ad
parent74f8bf146546275e296a3d5af17cc4c6d0397778 (diff)
downloadrsyslog-af8582e50914cfc719be1a1a80eeb81030d611c5.tar.gz
rsyslog-af8582e50914cfc719be1a1a80eeb81030d611c5.tar.xz
rsyslog-af8582e50914cfc719be1a1a80eeb81030d611c5.zip
added new cancel-reduced action thread termination method
We now manage to cancel threads that block inside a retry loop to terminate without the need to cancel the thread. Avoiding cancellation helps keep the system complexity minimal and thus provides for better stability. This also solves some issues with improper shutdown when inside an action retry loop.
-rw-r--r--ChangeLog6
-rw-r--r--action.c12
-rw-r--r--action.h1
-rw-r--r--runtime/wti.c32
-rw-r--r--runtime/wtp.c12
5 files changed, 60 insertions, 3 deletions
diff --git a/ChangeLog b/ChangeLog
index fe3517e4..e8ceda4c 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,11 @@
---------------------------------------------------------------------------
Version 5.5.5 [DEVEL] (rgerhards), 2010-05-??
+- added new cancel-reduced action thread termination method
+ We now manage to cancel threads that block inside a retry loop to
+ terminate without the need to cancel the thread. Avoiding cancellation
+ helps keep the system complexity minimal and thus provides for better
+ stability. This also solves some issues with improper shutdown when
+ inside an action retry loop.
---------------------------------------------------------------------------
Version 5.5.4 [DEVEL] (rgerhards), 2010-05-03
- This version offers full support for Solaris on Intel and Sparc
diff --git a/action.c b/action.c
index 7751fe0a..c8622764 100644
--- a/action.c
+++ b/action.c
@@ -503,7 +503,7 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
ASSERT(pThis != NULL);
iRetries = 0;
- while(pThis->eState == ACT_STATE_RTRY) {
+ while((*pThis->pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) {
iRet = pThis->pMod->tryResume(pThis->pModData);
if((pThis->iResumeOKinRow > 999) && (pThis->iResumeOKinRow % 1000 == 0)) {
bTreatOKasSusp = 1;
@@ -522,6 +522,9 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
iSleepPeriod = pThis->iResumeInterval;
ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */
srSleep(iSleepPeriod, 0);
+ if(*pThis->pbShutdownImmediate) {
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ }
}
} else if(iRet == RS_RET_DISABLE_ACTION) {
actionDisable(pThis);
@@ -532,6 +535,7 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
pThis->iNbrResRtry = 0;
}
+finalize_it:
RETiRet;
}
@@ -859,6 +863,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme
i = pBatch->iDoneUpTo; /* all messages below that index are processed */
iElemProcessed = 0;
iCommittedUpTo = i;
+ pAction->pbShutdownImmediate = pbShutdownImmediate;
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
if(*pbShutdownImmediate)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
@@ -920,8 +925,9 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmedi
bDone = 0;
do {
localRet = tryDoAction(pAction, pBatch, &nElem, pbShutdownImmediate);
- if(localRet == RS_RET_FORCE_TERM)
- FINALIZE;
+ if(localRet == RS_RET_FORCE_TERM) {
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ }
if( localRet == RS_RET_OK
|| localRet == RS_RET_PREVIOUS_COMMITTED
|| localRet == RS_RET_DEFER_COMMIT) {
diff --git a/action.h b/action.h
index 13b0d92e..ab7dfec7 100644
--- a/action.h
+++ b/action.h
@@ -85,6 +85,7 @@ struct action_s {
SYNC_OBJ_TOOL; /* required for mutex support */
pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */
uchar *pszName; /* action name (for documentation) */
+ int *pbShutdownImmediate;/* to facilitate shutdown, if var is 1, shut down immediately */
//uchar **ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */
void *ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */
size_t *lenMsgs; /* length of message in ppMsgs */
diff --git a/runtime/wti.c b/runtime/wti.c
index 44a27b3e..2dfc2d3f 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -109,6 +109,29 @@ wtiSetState(wti_t *pThis, sbool bNewVal)
}
+/* advise all workers to start by interrupting them. That should unblock all srSleep()
+ * calls.
+ */
+rsRetVal
+wtiWakeupThrd(wti_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, wti);
+
+
+ if(wtiGetState(pThis)) {
+ /* we first try the cooperative "cancel" interface */
+ pthread_kill(pThis->thrdID, SIGTTIN);
+ dbgprintf("sent SIGTTIN to worker thread %u, giving it a chance to terminate\n", (unsigned) pThis->thrdID);
+ srSleep(0, 10000);
+ dbgprintf("cooperative worker termination failed, using cancellation...\n");
+ }
+
+ RETiRet;
+}
+
+
/* Cancel the thread. If the thread is not running. But it is save and legal to
* call wtiCancelThrd() in such situations. This function only returns when the
* thread has terminated. Else we may get race conditions all over the code...
@@ -125,7 +148,16 @@ wtiCancelThrd(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
+
if(wtiGetState(pThis)) {
+ /* we first try the cooperative "cancel" interface */
+#if 0
+ pthread_kill(pThis->thrdID, SIGTTIN);
+ dbgprintf("sent SIGTTIN to worker thread %u, giving it a chance to terminate\n", (unsigned) pThis->thrdID);
+ srSleep(0, 10000);
+ dbgprintf("cooperative worker termination failed, using cancellation...\n");
+#endif
+
dbgoprint((obj_t*) pThis, "canceling worker thread\n");
pthread_cancel(pThis->thrdID);
/* now wait until the thread terminates... */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 51fab191..65155efc 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -220,6 +220,7 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
{
DEFiRet;
int bTimedOut;
+ int i;
ISOBJ_TYPE_assert(pThis, wtp);
@@ -234,6 +235,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
pthread_cleanup_push(mutexCancelCleanup, &pThis->mutWtp);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
+
+for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ wtiWakeupThrd(pThis->pWrkr[i]);
+}
+
DBGPRINTF("%s: waiting %ldms on worker thread termination, %d still running\n",
wtpGetDbgHdr(pThis), timeoutVal(ptTimeout),
ATOMIC_FETCH_32BIT(&pThis->iCurNumWrkThrd, &pThis->mutCurNumWrkThrd));
@@ -348,9 +354,15 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
pThis = pWti->pWtp;
ISOBJ_TYPE_assert(pThis, wtp);
+ /* block all signals */
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
+ /* but ignore SIGTTN, which we (ab)use to signal the thread to shutdown -- rgerhards, 2009-07-20 */
+ sigemptyset(&sigSet);
+ sigaddset(&sigSet, SIGTTIN);
+ pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL);
+
# if HAVE_PRCTL && defined PR_SET_NAME
/* set thread name - we ignore if the call fails, has no harsh consequences... */
pszDbgHdr = wtpGetDbgHdr(pThis);