summaryrefslogtreecommitdiffstats
path: root/wti.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-25 19:25:46 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-25 19:25:46 +0000
commit87f0e9b5f91407418a43a06f39831febfbd4e3ad (patch)
tree810a4191b8cfd14a4a2a19399dbe894b16b5e6ae /wti.c
parent167abdb5b3fa6900edd6bbdb1cc7d586896a268c (diff)
downloadrsyslog-87f0e9b5f91407418a43a06f39831febfbd4e3ad.tar.gz
rsyslog-87f0e9b5f91407418a43a06f39831febfbd4e3ad.tar.xz
rsyslog-87f0e9b5f91407418a43a06f39831febfbd4e3ad.zip
disk-assisted queue mode finally begins to look good ;)
Diffstat (limited to 'wti.c')
-rw-r--r--wti.c65
1 files changed, 55 insertions, 10 deletions
diff --git a/wti.c b/wti.c
index 2e1dd548..3f60afb2 100644
--- a/wti.c
+++ b/wti.c
@@ -91,7 +91,6 @@ wtiGetState(wti_t *pThis, int bLockMutex)
}
-
/* send a command to a specific thread
* bActiveOnly specifies if the command should be sent only when the worker is
* in an active state. -- rgerhards, 2008-01-20
@@ -107,7 +106,6 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
-RUNLOG_VAR("%d", bActiveOnly);
/* all worker states must be followed sequentially, only termination can be set in any state */
if( (bActiveOnly && (pThis->tCurrCmd < eWRKTHRD_RUN_CREATED))
|| (pThis->tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) {
@@ -116,14 +114,13 @@ RUNLOG_VAR("%d", bActiveOnly);
} else {
dbgprintf("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd);
switch(tCmd) {
- case eWRKTHRD_RUN_CREATED:
- break;
case eWRKTHRD_TERMINATING:
/* TODO: re-enable meaningful debug msg! (via function callback?)
dbgprintf("%s: thread terminating with %d entries left in queue, %d workers running.\n",
wtiGetDbgHdr(pThis->pQueue), pThis->pQueue->iQueueSize,
pThis->pQueue->iCurNumWrkThrd);
*/
+ pthread_cond_signal(&pThis->condExitDone);
dbgprintf("%s: worker terminating\n", wtiGetDbgHdr(pThis));
break;
case eWRKTHRD_RUNNING:
@@ -131,6 +128,7 @@ RUNLOG_VAR("%d", bActiveOnly);
break;
/* these cases just to satisfy the compiler, we do (yet) not act an them: */
case eWRKTHRD_STOPPED:
+ case eWRKTHRD_RUN_CREATED:
case eWRKTHRD_RUN_INIT:
case eWRKTHRD_SHUTDOWN:
case eWRKTHRD_SHUTDOWN_IMMEDIATE:
@@ -145,6 +143,39 @@ RUNLOG_VAR("%d", bActiveOnly);
}
+#if 0
+/* check if the worker shall shutdown (1 = yes, 0 = no)
+ * TODO: check if we can use atomic operations to enhance performance
+ * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user"
+ * (e.g. the queue clas)
+ * rgerhards, 2008-01-24
+ * TODO: we can optimize this via function pointers, as the code is only called during
+ * termination. So we can call the function via ptr in wtiWorker () and change that pointer
+ * to this function here upon shutdown.
+ */
+static inline rsRetVal
+wtiChkStopWrkr(wti_t *pThis, wtp_t *pWtp, int bLockMutex, int bLockUsrMutex)
+{
+ DEFiRet;
+ DEFVARS_mutexProtection;
+
+ ISOBJ_TYPE_assert(pThis, wti);
+
+ BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ if(pThis->bShutdownRqtd) {
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ iRet = RS_RET_TERMINATE_NOW;
+ } else {
+ /* regular case */
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ iRet = wtpChkStopWrkr(pWtp, bLockMutex, bLockUsrMutex);
+ }
+
+ RETiRet;
+}
+#endif
+
+
/* Destructor */
rsRetVal wtiDestruct(wti_t **ppThis)
{
@@ -160,17 +191,28 @@ rsRetVal wtiDestruct(wti_t **ppThis)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
/* if we reach this point, we must make sure the associated worker has terminated. It is
- * the callers duty to make sure the worker has already terminated.
+ * the callers duty to make sure the worker already knows it shall terminate.
* TODO: is it *really* the caller's duty? ...mmmhhhh.... smells bad... rgerhards, 2008-01-25
*/
wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */
d_pthread_mutex_lock(&pThis->mut);
- assert(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) <= eWRKTHRD_TERMINATING); // I knew it smelled bad...
+RUNLOG_VAR("%d", pThis->tCurrCmd);
+ if(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) {
+ dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - joining it\n",
+ wtiGetDbgHdr(pThis), pThis);
+ /* let's hope the caller actually instructed it to shutdown... */
+ pthread_cond_wait(&pThis->condExitDone, &pThis->mut);
+RUNLOG;
+ wtiJoinThrd(pThis);
+RUNLOG;
+ }
+RUNLOG;
d_pthread_mutex_unlock(&pThis->mut);
/* actual destruction */
pthread_cond_destroy(&pThis->condInitDone);
+ pthread_cond_destroy(&pThis->condExitDone);
pthread_mutex_destroy(&pThis->mut);
if(pThis->pszDbgHdr != NULL)
@@ -191,6 +233,7 @@ rsRetVal wtiDestruct(wti_t **ppThis)
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
pthread_cond_init(&pThis->condInitDone, NULL);
+ pthread_cond_init(&pThis->condExitDone, NULL);
pthread_mutex_init(&pThis->mut, NULL);
ENDobjConstruct(wti)
@@ -228,7 +271,7 @@ wtiJoinThrd(wti_t *pThis)
pthread_join(pThis->thrdID, NULL);
RUNLOG;
wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */
-RUNLOG;
+RUNLOG_VAR("%p", pThis->thrdID);
pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
dbgprintf("wti: worker %s has stopped\n", wtiGetDbgHdr(pThis));
@@ -339,7 +382,9 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
+ BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
pWtp->pfOnWorkerStartup(pWtp->pUsr);
+ END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
@@ -354,6 +399,7 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis),
if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED))
|| wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
+ //|| wtiChkStopWrkr(pThis, pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
break; /* end worker thread run */
}
@@ -393,14 +439,13 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis),
// " %d messages to process.\n", wtiGetDbgHdr(pThis), pThis->iQueueSize);
}
- pWtp->pfOnWorkerShutdown(pWtp->pUsr);
-
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
-dbgprintf("%s: worker waiting for mutex\n", wtiGetDbgHdr(pThis));
d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_pop(0); /* remove cleanup handler */
+ pWtp->pfOnWorkerShutdown(pWtp->pUsr);
+
// TODO: I think we no longer need that - but check!
#if 0
/* if we ever need finalize_it, here would be the place for it! */