summaryrefslogtreecommitdiffstats
path: root/wti.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-02-26 17:49:26 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-02-26 17:49:26 +0000
commiteb4b1915d1655d801e0232f4196fbdc1af3c857f (patch)
treef0527b24b8c28a8f4acba20e5e637edb7a3532e8 /wti.c
parent3ae3033800076fca0e9f5f491da2f420b610ffc2 (diff)
downloadrsyslog-eb4b1915d1655d801e0232f4196fbdc1af3c857f.tar.gz
rsyslog-eb4b1915d1655d801e0232f4196fbdc1af3c857f.tar.xz
rsyslog-eb4b1915d1655d801e0232f4196fbdc1af3c857f.zip
worked on queue stability
Diffstat (limited to 'wti.c')
-rw-r--r--wti.c36
1 files changed, 36 insertions, 0 deletions
diff --git a/wti.c b/wti.c
index db31f4dd..402b6bda 100644
--- a/wti.c
+++ b/wti.c
@@ -143,6 +143,33 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
}
+/* Cancel the thread. If the thread is already cancelled or termination,
+ * we do not again cancel it. But it is save and legal to call wtiCancelThrd() in
+ * such situations.
+ * rgerhards, 2008-02-26
+ */
+rsRetVal
+wtiCancelThrd(wti_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, wti);
+
+ d_pthread_mutex_lock(&pThis->mut);
+
+ if(pThis->tCurrCmd >= eWRKTHRD_TERMINATING) {
+ dbgoprint((obj_t*) pThis, "canceling worker thread\n");
+ pthread_cancel(pThis->thrdID);
+ wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
+ pThis->pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
+ }
+
+ d_pthread_mutex_unlock(&pThis->mut);
+
+ RETiRet;
+}
+
+
/* Destructor */
BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(wti)
@@ -233,7 +260,13 @@ wtiProcessThrdChanges(wti_t *pThis, int bLockMutex)
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
switch(pThis->tCurrCmd) {
case eWRKTHRD_TERMINATING:
+ /* we need to at least temporarily release the mutex, because otherwise
+ * we may deadlock with the thread we intend to join (it aquires the mutex
+ * during termination processing). -- rgerhards, 2008-02-26
+ */
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
iRet = wtiJoinThrd(pThis);
+ BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
break;
/* these cases just to satisfy the compiler, we do not act an them: */
case eWRKTHRD_STOPPED:
@@ -270,6 +303,7 @@ wtiWorkerCancelCleanup(void *arg)
dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
/* call user supplied handler (that one e.g. requeues the element) */
+RUNLOG_VAR("%p", pThis->pUsrp);
pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
@@ -371,9 +405,11 @@ wtiWorker(wti_t *pThis)
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+// TODO: this deadlocks with wtiProcessThrdChanges(), where it waits on the thread to join (on US TO JOIN!!!)
d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_pop(0); /* remove cleanup handler */
+RUNLOG_VAR("%p", pWtp->pUsr);
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);