summaryrefslogtreecommitdiffstats
path: root/wtp.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-25 19:25:46 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-25 19:25:46 +0000
commit87f0e9b5f91407418a43a06f39831febfbd4e3ad (patch)
tree810a4191b8cfd14a4a2a19399dbe894b16b5e6ae /wtp.c
parent167abdb5b3fa6900edd6bbdb1cc7d586896a268c (diff)
downloadrsyslog-87f0e9b5f91407418a43a06f39831febfbd4e3ad.tar.gz
rsyslog-87f0e9b5f91407418a43a06f39831febfbd4e3ad.tar.xz
rsyslog-87f0e9b5f91407418a43a06f39831febfbd4e3ad.zip
disk-assisted queue mode finally begins to look good ;)
Diffstat (limited to 'wtp.c')
-rw-r--r--wtp.c51
1 files changed, 24 insertions, 27 deletions
diff --git a/wtp.c b/wtp.c
index 4133e7b4..1bda60e8 100644
--- a/wtp.c
+++ b/wtp.c
@@ -135,8 +135,6 @@ wtpDestruct(wtp_t **ppThis)
int iCancelStateSave;
int i;
-dbgPrintAllDebugInfo();
-RUNLOG;
assert(ppThis != NULL);
pThis = *ppThis;
ISOBJ_TYPE_assert(pThis, wtp);
@@ -179,9 +177,7 @@ wtpWakeupWrkr(wtp_t *pThis)
// TODO; mutex?
ISOBJ_TYPE_assert(pThis, wtp);
-dbgprintf("wtpWakeupWrkr 1, cond %p\n", pThis->pcondBusy);
pthread_cond_signal(pThis->pcondBusy);
-dbgprintf("wtpWakeupWrkr 2\n");
RETiRet;
}
/* wake up all worker threads.
@@ -211,10 +207,8 @@ wtpProcessThrdChanges(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
- RUNLOG;
if(pThis->bThrdStateChanged == 0)
FINALIZE;
- RUNLOG;
/* go through all threads */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
@@ -222,7 +216,6 @@ wtpProcessThrdChanges(wtp_t *pThis)
}
finalize_it:
- RUNLOG;
RETiRet;
}
@@ -255,6 +248,8 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
DEFiRet;
DEFVARS_mutexProtection;
+ ISOBJ_TYPE_assert(pThis, wtp);
+
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
|| ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex)))
@@ -281,14 +276,17 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
int bTimedOut;
int iCancelStateSave;
-dbgPrintAllDebugInfo();
-RUNLOG_VAR("%p", pThis);
-RUNLOG_VAR("%d", tShutdownCmd);
ISOBJ_TYPE_assert(pThis, wtp);
wtpSetState(pThis, tShutdownCmd);
wtpWakeupAllWrkr(pThis);
+
+ /* see if we need to harvest (join) any terminated threads (even in timeout case,
+ * some may have terminated...
+ */
+ wtpProcessThrdChanges(pThis);
+RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
/* and wait for their termination */
dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
@@ -297,6 +295,7 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
+RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd);
@@ -315,7 +314,6 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
*/
wtpProcessThrdChanges(pThis);
-dbgprintf("wtpShutdownAll exit");
RETiRet;
}
@@ -346,6 +344,7 @@ wtpCancelAll(wtp_t *pThis)
{
DEFiRet;
int i;
+ int numCancelled = 0;
// TODO: mutex?? // TODO: cancellation in wti!
ISOBJ_TYPE_assert(pThis, wtp);
@@ -353,19 +352,17 @@ wtpCancelAll(wtp_t *pThis)
/* process any pending thread requests so that we know who actually is still running */
wtpProcessThrdChanges(pThis);
-RUNLOG_VAR("%d", pThis->iNumWorkerThreads);;
/* first tell the workers our request */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
// TODO: mutex lock!
-RUNLOG_VAR("%p", pThis->pWrkr[i]);
if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) {
-RUNLOG;
dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
pthread_cancel(pThis->pWrkr[i]->thrdID);
+ ++numCancelled;
}
}
-RUNLOG;
+ dbgprintf("%s: cancelled %d worker threads\n", wtpGetDbgHdr(pThis), numCancelled);
RETiRet;
}
@@ -380,13 +377,9 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex)
DEFiRet;
DEFVARS_mutexProtection;
-RUNLOG;
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
-RUNLOG;
pThis->bInactivityGuard = bNewState;
-RUNLOG;
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
-RUNLOG;
RETiRet;
}
@@ -403,6 +396,7 @@ wtpWrkrExecCancelCleanup(void *arg)
ISOBJ_TYPE_assert(pThis, wtp);
pThis->iCurNumWrkThrd--;
+RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
wtpSignalWrkrTermination(pThis);
dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd);
@@ -459,6 +453,7 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
pthread_cleanup_pop(0);
pThis->iCurNumWrkThrd--;
+RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
wtpSignalWrkrTermination(pThis);
dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n",
@@ -488,6 +483,8 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
pThis->iCurNumWrkThrd++;
+dbgPrintAllDebugInfo();
+RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
/* find free spot in thread table. If we find at least one worker that is in initialization,
* we do NOT start a new one. Let's give the other one a chance, first.
@@ -538,7 +535,6 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
int nMissing; /* number workers missing to run */
int i;
- if(pThis == NULL) dbgPrintAllDebugInfo();
ISOBJ_TYPE_assert(pThis, wtp);
dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetDbgHdr(pThis), nMaxWrkr, pThis->iCurNumWrkThrd, pThis->iNumWorkerThreads);
@@ -547,11 +543,10 @@ dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetD
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
+ if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */
+ nMaxWrkr = pThis->iNumWorkerThreads;
+
nMissing = nMaxWrkr - pThis->iCurNumWrkThrd;
- if(nMissing > pThis->iNumWorkerThreads)
- nMissing = pThis->iNumWorkerThreads;
- else if(nMissing < 0)
- nMissing = 0;
if(nMissing > 0) {
dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
@@ -559,9 +554,11 @@ dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetD
for(i = 0 ; i < nMissing ; ++i) {
CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED));
}
- } else {
-dbgprintf("wtpAdviseMaxWorkers signals busy\n");
- wtpWakeupWrkr(pThis);
+ } else {
+ if(nMaxWrkr > 0) {
+ dbgprintf("wtpAdviseMaxWorkers signals busy\n");
+ wtpWakeupWrkr(pThis);
+ }
}