summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-02-26 17:49:26 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-02-26 17:49:26 +0000
commiteb4b1915d1655d801e0232f4196fbdc1af3c857f (patch)
treef0527b24b8c28a8f4acba20e5e637edb7a3532e8
parent3ae3033800076fca0e9f5f491da2f420b610ffc2 (diff)
downloadrsyslog-eb4b1915d1655d801e0232f4196fbdc1af3c857f.tar.gz
rsyslog-eb4b1915d1655d801e0232f4196fbdc1af3c857f.tar.xz
rsyslog-eb4b1915d1655d801e0232f4196fbdc1af3c857f.zip
worked on queue stability
-rw-r--r--debug.c4
-rw-r--r--queue.c19
-rw-r--r--syslogd.c3
-rw-r--r--wti.c36
-rw-r--r--wti.h1
-rw-r--r--wtp.c9
6 files changed, 56 insertions, 16 deletions
diff --git a/debug.c b/debug.c
index c99dfa93..b3a6aa8a 100644
--- a/debug.c
+++ b/debug.c
@@ -57,10 +57,10 @@ static dbgThrdInfo_t *dbgGetThrdInfo(void);
/* static data (some time to be replaced) */
int Debug; /* debug flag - read-only after startup */
int debugging_on = 0; /* read-only, except on sig USR1 */
-static int bLogFuncFlow = 1; /* shall the function entry and exit be logged to the debug log? */
+static int bLogFuncFlow = 0; /* shall the function entry and exit be logged to the debug log? */
static int bLogAllocFree = 0; /* shall calls to (m/c)alloc and free be logged to the debug log? */
static int bPrintFuncDBOnExit = 0; /* shall the function entry and exit be logged to the debug log? */
-static int bPrintMutexAction = 0; /* shall mutex calls be printed to the debug log? */
+static int bPrintMutexAction = 1; /* shall mutex calls be printed to the debug log? */
static int bPrintTime = 1; /* print a timestamp together with debug message */
static char *pszAltDbgFileName = NULL; /* if set, debug output is *also* sent to here */
static FILE *altdbg = NULL; /* and the handle for alternate debug output */
diff --git a/queue.c b/queue.c
index 18ea416c..70ee6f1f 100644
--- a/queue.c
+++ b/queue.c
@@ -1152,10 +1152,8 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue "
"in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
- if(pThis->bIsDA) {
- /* we need to re-aquire the mutex for the next check in this case! */
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- }
+ /* we need to re-aquire the mutex for the next check in this case! */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
}
if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
/* and now the same for the DA queue */
@@ -1169,6 +1167,8 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue "
"in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
+ } else {
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
} else {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -1316,6 +1316,7 @@ queueConsumerCancelCleanup(void *arg1, void *arg2)
ISOBJ_TYPE_assert(pThis, queue);
+RUNLOG_VAR("%p", pUsr);
if(pUsr != NULL) {
/* make sure the data element is not lost */
dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
@@ -1560,11 +1561,15 @@ queueRegOnWrkrShutdown(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
if(pThis->pqParent != NULL) {
- ASSERT(pThis->pqParent->pWtpDA != NULL);
- pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
- wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
+RUNLOG_VAR("%p", pThis->pqParent->pWtpDA);
+if(pThis->pqParent->pWtpDA == NULL)
+ FINALIZE;
+ ASSERT(pThis->pqParent->pWtpDA != NULL);
+ pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
+ wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
}
+finalize_it:
RETiRet;
}
diff --git a/syslogd.c b/syslogd.c
index 037eddf8..506f90a8 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -436,7 +436,8 @@ int option_DisallowWarning = 1; /* complain if message from disallowed sender is
/* hardcoded standard templates (used for defaults) */
static uchar template_TraditionalFormat[] = "\"%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::drop-last-lf%\n\"";
-static uchar template_WallFmt[] = "\"\r\n\7Message from syslogd@%HOSTNAME% at %timegenerated% ...\r\n %syslogtag%%msg%\n\r\"";
+static uchar template_WallFmt[] = "\"\r\nMessage from syslogd@%HOSTNAME% at %timegenerated% ...\r\n %syslogtag%%msg%\n\r\"";
+// TODO: re-enable BEL! static uchar template_WallFmt[] = "\"\r\n\7Message from syslogd@%HOSTNAME% at %timegenerated% ...\r\n %syslogtag%%msg%\n\r\"";
static uchar template_StdFwdFmt[] = "\"<%PRI%>%TIMESTAMP% %HOSTNAME% %syslogtag%%msg%\"";
static uchar template_StdUsrMsgFmt[] = "\" %syslogtag%%msg%\n\r\"";
static uchar template_StdDBFmt[] = "\"insert into SystemEvents (Message, Facility, FromHost, Priority, DeviceReportedTime, ReceivedAt, InfoUnitID, SysLogTag) values ('%msg%', %syslogfacility%, '%HOSTNAME%', %syslogpriority%, '%timereported:::date-mysql%', '%timegenerated:::date-mysql%', %iut%, '%syslogtag%')\",SQL";
diff --git a/wti.c b/wti.c
index db31f4dd..402b6bda 100644
--- a/wti.c
+++ b/wti.c
@@ -143,6 +143,33 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
}
+/* Cancel the thread. If the thread is already cancelled or termination,
+ * we do not again cancel it. But it is save and legal to call wtiCancelThrd() in
+ * such situations.
+ * rgerhards, 2008-02-26
+ */
+rsRetVal
+wtiCancelThrd(wti_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, wti);
+
+ d_pthread_mutex_lock(&pThis->mut);
+
+ if(pThis->tCurrCmd >= eWRKTHRD_TERMINATING) {
+ dbgoprint((obj_t*) pThis, "canceling worker thread\n");
+ pthread_cancel(pThis->thrdID);
+ wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
+ pThis->pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
+ }
+
+ d_pthread_mutex_unlock(&pThis->mut);
+
+ RETiRet;
+}
+
+
/* Destructor */
BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(wti)
@@ -233,7 +260,13 @@ wtiProcessThrdChanges(wti_t *pThis, int bLockMutex)
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
switch(pThis->tCurrCmd) {
case eWRKTHRD_TERMINATING:
+ /* we need to at least temporarily release the mutex, because otherwise
+ * we may deadlock with the thread we intend to join (it aquires the mutex
+ * during termination processing). -- rgerhards, 2008-02-26
+ */
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
iRet = wtiJoinThrd(pThis);
+ BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
break;
/* these cases just to satisfy the compiler, we do not act an them: */
case eWRKTHRD_STOPPED:
@@ -270,6 +303,7 @@ wtiWorkerCancelCleanup(void *arg)
dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
/* call user supplied handler (that one e.g. requeues the element) */
+RUNLOG_VAR("%p", pThis->pUsrp);
pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
@@ -371,9 +405,11 @@ wtiWorker(wti_t *pThis)
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+// TODO: this deadlocks with wtiProcessThrdChanges(), where it waits on the thread to join (on US TO JOIN!!!)
d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_pop(0); /* remove cleanup handler */
+RUNLOG_VAR("%p", pWtp->pUsr);
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
diff --git a/wti.h b/wti.h
index 8bd47d1d..0b4ec2be 100644
--- a/wti.h
+++ b/wti.h
@@ -53,6 +53,7 @@ rsRetVal wtiProcessThrdChanges(wti_t *pThis, int bLockMutex);
rsRetVal wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg);
rsRetVal wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex);
rsRetVal wtiJoinThrd(wti_t *pThis);
+rsRetVal wtiCancelThrd(wti_t *pThis);
qWrkCmd_t wtiGetState(wti_t *pThis, int bLockMutex);
PROTOTYPEObjClassInit(wti);
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
diff --git a/wtp.c b/wtp.c
index 90f1a599..b1745865 100644
--- a/wtp.c
+++ b/wtp.c
@@ -333,12 +333,9 @@ wtpCancelAll(wtp_t *pThis)
/* go through all workers and cancel those that are active */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- /* TODO: mutex lock!*/
- if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) {
- dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
- pthread_cancel(pThis->pWrkr[i]->thrdID);
- ++numCancelled;
- }
+ dbgprintf("%s: try canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
+ wtiCancelThrd(pThis->pWrkr[i]);
+RUNLOG;
}
dbgprintf("%s: cancelled %d worker threads\n", wtpGetDbgHdr(pThis), numCancelled);