From eb4b1915d1655d801e0232f4196fbdc1af3c857f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 26 Feb 2008 17:49:26 +0000 Subject: worked on queue stability --- debug.c | 4 ++-- queue.c | 19 ++++++++++++------- syslogd.c | 3 ++- wti.c | 36 ++++++++++++++++++++++++++++++++++++ wti.h | 1 + wtp.c | 9 +++------ 6 files changed, 56 insertions(+), 16 deletions(-) diff --git a/debug.c b/debug.c index c99dfa93..b3a6aa8a 100644 --- a/debug.c +++ b/debug.c @@ -57,10 +57,10 @@ static dbgThrdInfo_t *dbgGetThrdInfo(void); /* static data (some time to be replaced) */ int Debug; /* debug flag - read-only after startup */ int debugging_on = 0; /* read-only, except on sig USR1 */ -static int bLogFuncFlow = 1; /* shall the function entry and exit be logged to the debug log? */ +static int bLogFuncFlow = 0; /* shall the function entry and exit be logged to the debug log? */ static int bLogAllocFree = 0; /* shall calls to (m/c)alloc and free be logged to the debug log? */ static int bPrintFuncDBOnExit = 0; /* shall the function entry and exit be logged to the debug log? */ -static int bPrintMutexAction = 0; /* shall mutex calls be printed to the debug log? */ +static int bPrintMutexAction = 1; /* shall mutex calls be printed to the debug log? */ static int bPrintTime = 1; /* print a timestamp together with debug message */ static char *pszAltDbgFileName = NULL; /* if set, debug output is *also* sent to here */ static FILE *altdbg = NULL; /* and the handle for alternate debug output */ diff --git a/queue.c b/queue.c index 18ea416c..70ee6f1f 100644 --- a/queue.c +++ b/queue.c @@ -1152,10 +1152,8 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue " "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } - if(pThis->bIsDA) { - /* we need to re-aquire the mutex for the next check in this case! */ - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - } + /* we need to re-aquire the mutex for the next check in this case! */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ } if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) { /* and now the same for the DA queue */ @@ -1169,6 +1167,8 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue " "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } + } else { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } } else { END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -1316,6 +1316,7 @@ queueConsumerCancelCleanup(void *arg1, void *arg2) ISOBJ_TYPE_assert(pThis, queue); +RUNLOG_VAR("%p", pUsr); if(pUsr != NULL) { /* make sure the data element is not lost */ dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n"); @@ -1560,11 +1561,15 @@ queueRegOnWrkrShutdown(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); if(pThis->pqParent != NULL) { - ASSERT(pThis->pqParent->pWtpDA != NULL); - pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ - wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ +RUNLOG_VAR("%p", pThis->pqParent->pWtpDA); +if(pThis->pqParent->pWtpDA == NULL) + FINALIZE; + ASSERT(pThis->pqParent->pWtpDA != NULL); + pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ + wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ } +finalize_it: RETiRet; } diff --git a/syslogd.c b/syslogd.c index 037eddf8..506f90a8 100644 --- a/syslogd.c +++ b/syslogd.c @@ -436,7 +436,8 @@ int option_DisallowWarning = 1; /* complain if message from disallowed sender is /* hardcoded standard templates (used for defaults) */ static uchar template_TraditionalFormat[] = "\"%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::drop-last-lf%\n\""; -static uchar template_WallFmt[] = "\"\r\n\7Message from syslogd@%HOSTNAME% at %timegenerated% ...\r\n %syslogtag%%msg%\n\r\""; +static uchar template_WallFmt[] = "\"\r\nMessage from syslogd@%HOSTNAME% at %timegenerated% ...\r\n %syslogtag%%msg%\n\r\""; +// TODO: re-enable BEL! static uchar template_WallFmt[] = "\"\r\n\7Message from syslogd@%HOSTNAME% at %timegenerated% ...\r\n %syslogtag%%msg%\n\r\""; static uchar template_StdFwdFmt[] = "\"<%PRI%>%TIMESTAMP% %HOSTNAME% %syslogtag%%msg%\""; static uchar template_StdUsrMsgFmt[] = "\" %syslogtag%%msg%\n\r\""; static uchar template_StdDBFmt[] = "\"insert into SystemEvents (Message, Facility, FromHost, Priority, DeviceReportedTime, ReceivedAt, InfoUnitID, SysLogTag) values ('%msg%', %syslogfacility%, '%HOSTNAME%', %syslogpriority%, '%timereported:::date-mysql%', '%timegenerated:::date-mysql%', %iut%, '%syslogtag%')\",SQL"; diff --git a/wti.c b/wti.c index db31f4dd..402b6bda 100644 --- a/wti.c +++ b/wti.c @@ -143,6 +143,33 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) } +/* Cancel the thread. If the thread is already cancelled or termination, + * we do not again cancel it. But it is save and legal to call wtiCancelThrd() in + * such situations. + * rgerhards, 2008-02-26 + */ +rsRetVal +wtiCancelThrd(wti_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, wti); + + d_pthread_mutex_lock(&pThis->mut); + + if(pThis->tCurrCmd >= eWRKTHRD_TERMINATING) { + dbgoprint((obj_t*) pThis, "canceling worker thread\n"); + pthread_cancel(pThis->thrdID); + wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); + pThis->pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ + } + + d_pthread_mutex_unlock(&pThis->mut); + + RETiRet; +} + + /* Destructor */ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(wti) @@ -233,7 +260,13 @@ wtiProcessThrdChanges(wti_t *pThis, int bLockMutex) BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); switch(pThis->tCurrCmd) { case eWRKTHRD_TERMINATING: + /* we need to at least temporarily release the mutex, because otherwise + * we may deadlock with the thread we intend to join (it aquires the mutex + * during termination processing). -- rgerhards, 2008-02-26 + */ + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); iRet = wtiJoinThrd(pThis); + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); break; /* these cases just to satisfy the compiler, we do not act an them: */ case eWRKTHRD_STOPPED: @@ -270,6 +303,7 @@ wtiWorkerCancelCleanup(void *arg) dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); /* call user supplied handler (that one e.g. requeues the element) */ +RUNLOG_VAR("%p", pThis->pUsrp); pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); @@ -371,9 +405,11 @@ wtiWorker(wti_t *pThis) /* indicate termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); +// TODO: this deadlocks with wtiProcessThrdChanges(), where it waits on the thread to join (on US TO JOIN!!!) d_pthread_mutex_lock(&pThis->mut); pthread_cleanup_pop(0); /* remove cleanup handler */ +RUNLOG_VAR("%p", pWtp->pUsr); pWtp->pfOnWorkerShutdown(pWtp->pUsr); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); diff --git a/wti.h b/wti.h index 8bd47d1d..0b4ec2be 100644 --- a/wti.h +++ b/wti.h @@ -53,6 +53,7 @@ rsRetVal wtiProcessThrdChanges(wti_t *pThis, int bLockMutex); rsRetVal wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg); rsRetVal wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex); rsRetVal wtiJoinThrd(wti_t *pThis); +rsRetVal wtiCancelThrd(wti_t *pThis); qWrkCmd_t wtiGetState(wti_t *pThis, int bLockMutex); PROTOTYPEObjClassInit(wti); PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*); diff --git a/wtp.c b/wtp.c index 90f1a599..b1745865 100644 --- a/wtp.c +++ b/wtp.c @@ -333,12 +333,9 @@ wtpCancelAll(wtp_t *pThis) /* go through all workers and cancel those that are active */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - /* TODO: mutex lock!*/ - if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) { - dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); - pthread_cancel(pThis->pWrkr[i]->thrdID); - ++numCancelled; - } + dbgprintf("%s: try canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); + wtiCancelThrd(pThis->pWrkr[i]); +RUNLOG; } dbgprintf("%s: cancelled %d worker threads\n", wtpGetDbgHdr(pThis), numCancelled); -- cgit