diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-28 11:35:33 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-28 11:35:33 +0000 |
commit | 94bfc28855393a1a688aa5fdc3339b9e2139e10a (patch) | |
tree | d253acc9e48aa6193046920c8d08401289197e96 | |
parent | ba94662f209ccf17798d706eb4dc5df19360c7e1 (diff) | |
download | rsyslog-94bfc28855393a1a688aa5fdc3339b9e2139e10a.tar.gz rsyslog-94bfc28855393a1a688aa5fdc3339b9e2139e10a.tar.xz rsyslog-94bfc28855393a1a688aa5fdc3339b9e2139e10a.zip |
cleanup to prepare for release
-rw-r--r-- | debug.c | 2 | ||||
-rw-r--r-- | queue.c | 84 | ||||
-rw-r--r-- | queue.h | 1 | ||||
-rw-r--r-- | rsyslog.h | 1 | ||||
-rw-r--r-- | wti.c | 57 | ||||
-rw-r--r-- | wtp.c | 30 |
6 files changed, 47 insertions, 128 deletions
@@ -57,7 +57,7 @@ static dbgThrdInfo_t *dbgGetThrdInfo(void); int Debug; /* debug flag - read-only after startup */ int debugging_on = 0; /* read-only, except on sig USR1 */ static int bLogFuncFlow = 0; /* shall the function entry and exit be logged to the debug log? */ -static int bPrintFuncDBOnExit = 1; /* shall the function entry and exit be logged to the debug log? */ +static int bPrintFuncDBOnExit = 0; /* shall the function entry and exit be logged to the debug log? */ static int bPrintMutexAction = 0; /* shall mutex calls be printed to the debug log? */ static int bPrintTime = 1; /* print a timestamp together with debug message */ static char *pszAltDbgFileName = NULL; /* if set, debug output is *also* sent to here */ @@ -1,13 +1,3 @@ -// TODO: DA worker must not wait eternal on shutdown when in enqueue only mode! -// TODO: we need to implement peek(), without it (today!) we lose one message upon -// worker cancellation! -- rgerhards, 2008-01-14 -// TODO: think about mutDA - I think it's no longer needed -// TODO: start up the correct num of workers when switching to non-DA mode -// TODO: "preforked" worker threads -// TODO: do an if(debug) in dbgrintf - performance in release build! -// TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in -// call consumer state. Facilitates retaining messages in queue until action could -// be called! /* queue.c * * This file implements the queue object and its several queueing methods. @@ -218,9 +208,6 @@ queueStartDA(queue_t *pThis) if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */ FINALIZE; /* ... then we are already done! */ - /* set up sync objects */ - pthread_mutex_init(&pThis->mutDA, NULL); - /* create message queue */ dbgprintf("Queue %p: queueSTrtDA pre child queue construct,\n", pThis); CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer)); @@ -309,12 +296,12 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx:DA", (unsigned long) pThis); CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, queueChkStopWrkrDA)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, queueIsIdleDA)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, queueConsumerDA)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, queueConsumerCancelCleanup)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, queueStartDA)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, queueTurnOffDAMode)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) queueConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueStartDA)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueTurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); @@ -564,7 +551,7 @@ queueHaveQIF(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); if(pThis->pszFilePrefix == NULL) - ABORT_FINALIZE(RS_RET_ERR); // TODO: change code! + ABORT_FINALIZE(RS_RET_NO_FILEPREFIX); /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", @@ -921,17 +908,16 @@ RUNLOG_VAR("%d", pThis->toQShutdown); * set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate as soon as its consumer * is done. This is especially important as we otherwise may interfere with queue order while the * DA consumer is running. -- rgerhards, 2008-01-27 + * Note: there was a note that we should not wait eternally on the DA worker if we run in + * enqueue-only note. I have reviewed the code and think there is no need for this check. Howerver, + * I'd like to keep this note in here should we happen to run into some related trouble. + * rgerhards, 2008-01-28 */ wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */ -// TODO: what about pure disk queues and bSaveOnShutdown? BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ -//RUNLOG_VAR("%d", pThis->bSaveOnShutdown); -//RUNLOG_VAR("%d", pThis->bIsDA); -//RUNLOG_VAR("%d", pThis->iQueueSize); if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) { -//RUNLOG; /* switch to enqueue-only mode so that no more actions happen */ if(pThis->bRunsDA == 0) { queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ @@ -953,21 +939,17 @@ RUNLOG_VAR("%d", pThis->toQShutdown); END_MTX_PROTECTED_OPERATIONS(pThis->mut); } -RUNLOG; /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we * can now request immediate shutdown of any remaining workers. Note that if bSaveOnShutdown was set, * the queue is now empty. If regular workers are still running, and try to pull the next message, * they will automatically terminate as there no longer is any message left to process. */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ -RUNLOG_VAR("%d", pThis->iQueueSize); - //old: if(pThis->iQueueSize > 0) { if(pThis->iQueueSize > 0) { timeoutComp(&tTimeout, pThis->toActShutdown); if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); dbgprintf("Queue 0x%lx: trying immediate shutdown of regular workers\n", queueGetID(pThis)); - // TODO: ??? cut&paste error? iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and " @@ -1047,8 +1029,6 @@ RUNLOG_VAR("%d", pThis->iQueueSize); } } -// TODO: think about joining all workers, so that the destructors are called -// /* ... finally ... all worker threads have terminated :-) * Well, more precisely, they *are in termination*. Some cancel cleanup handlers * may still be running. @@ -1152,7 +1132,7 @@ queueConsumerCancelCleanup(void *arg1, void *arg2) dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n", queueGetID(pThis)); - /* TODO: re-enqueue the data element! */ + /* TODO: re-enqueue the data element! This will also make the compiler warning go away... */ RETiRet; } @@ -1213,7 +1193,7 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) /* dequeue element (still protected from mutex) */ iRet = queueDel(pThis, &pUsr); - queueChkPersist(pThis); // when we support peek(), we must do this down after the del! + queueChkPersist(pThis); iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */ bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ pWti->pUsrp = pUsr; /* save it for the cancel cleanup handler */ @@ -1316,12 +1296,6 @@ queueChkStopWrkrDA(queue_t *pThis) bStopWrkr = 1; } else { if(pThis->bRunsDA) { -#if 0 -RUNLOG_VAR("%d", pThis->iQueueSize); -RUNLOG_VAR("%d", pThis->iHighWtrMrk); -if(pThis->pqDA != NULL) - RUNLOG_VAR("%d", pThis->pqDA->bQueueStarted); -#endif if(pThis->iQueueSize < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { bStopWrkr = 1; } else { @@ -1332,7 +1306,6 @@ if(pThis->pqDA != NULL) } } -//RUNLOG_VAR("%d", bStopWrkr); ENDfunc return bStopWrkr; } @@ -1364,10 +1337,7 @@ queueIsIdleDA(queue_t *pThis) { /* remember: iQueueSize is the DA queue size, not the main queue! */ BEGINfunc -RUNLOG_VAR("%d", pThis->iLowWtrMrk); -dbgprintf("queueIsIdleDA(%p) returns %d, qsize %d\n", pThis, pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk), pThis->iQueueSize); - //// TODO: I think we need just a single function... - //return (pThis->iQueueSize == 0); + /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */ ENDfunc return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk)); } @@ -1483,12 +1453,12 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx:Reg", (unsigned long) pThis); CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, queueChkStopWrkrReg)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, queueIsIdleReg)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, queueConsumerReg)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, queueConsumerCancelCleanup)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, queueRegOnWrkrStartup)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, queueRegOnWrkrShutdown)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))queueConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrStartup)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); @@ -1511,7 +1481,7 @@ RUNLOG; queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ bInitialized = 1; /* we are done */ } else { - // TODO: use logerror? -- rgerhards, 2008-01-16 + /* TODO: use logerror? -- rgerhards, 2008-01-16 */ dbgprintf("Queue 0x%lx: error %d trying to access on-disk queue files, starting without them. " "Some data may be lost\n", queueGetID(pThis), iRetLocal); } @@ -1556,9 +1526,14 @@ static rsRetVal queuePersist(queue_t *pThis) assert(pThis != NULL); if(pThis->qType != QUEUETYPE_DISK) { - if(pThis->iQueueSize > 0) - ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */ - else + if(pThis->iQueueSize > 0) { + /* This error code is OK, but we will probably not implement this any time + * The reason is that persistence happens via DA queues. But I would like to + * leave the code as is, as we so have a hook in case we need one. + * -- rgerhards, 2008-01-28 + */ + ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); + } else FINALIZE; /* if the queue is empty, we are happy and done... */ } @@ -1938,7 +1913,6 @@ finalize_it: */ BEGINObjClassInit(queue, 1) OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty); - //OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize); ENDObjClassInit(queue) /* @@ -107,7 +107,6 @@ typedef struct queue_s { size_t iMaxFileSize; /* max size for a single queue file */ int bIsDA; /* is this queue disk assisted? */ int bRunsDA; /* is this queue actually *running* disk assisted? */ - pthread_mutex_t mutDA; /* mutex for low water mark algo */ struct queue_s *pqDA; /* queue for disk-assisted modes */ struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */ int bDAEnqOnly; /* EnqOnly setting for DA queue */ @@ -113,6 +113,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_QSIZE_ZERO = -2042, /**< queue size is zero where this is not supported */ RS_RET_ALREADY_STARTING = -2043, /**< something (a thread?) is already starting - not necessarily an error */ RS_RET_NO_MORE_THREADS = -2044, /**< no more threads available, not necessarily an error */ + RS_RET_NO_FILEPREFIX = -2045, /**< file prefix is not specified where one is needed */ RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */ RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */ RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */ @@ -143,39 +143,6 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) } -#if 0 -/* check if the worker shall shutdown (1 = yes, 0 = no) - * TODO: check if we can use atomic operations to enhance performance - * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user" - * (e.g. the queue clas) - * rgerhards, 2008-01-24 - * TODO: we can optimize this via function pointers, as the code is only called during - * termination. So we can call the function via ptr in wtiWorker () and change that pointer - * to this function here upon shutdown. - */ -static inline rsRetVal -wtiChkStopWrkr(wti_t *pThis, wtp_t *pWtp, int bLockMutex, int bLockUsrMutex) -{ - DEFiRet; - DEFVARS_mutexProtection; - - ISOBJ_TYPE_assert(pThis, wti); - - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut); - if(pThis->bShutdownRqtd) { - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - iRet = RS_RET_TERMINATE_NOW; - } else { - /* regular case */ - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - iRet = wtpChkStopWrkr(pWtp, bLockMutex, bLockUsrMutex); - } - - RETiRet; -} -#endif - - /* Destructor */ rsRetVal wtiDestruct(wti_t **ppThis) { @@ -328,7 +295,7 @@ wtiWorkerCancelCleanup(void *arg) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pWtp->mut); wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); - // TODO: sync access! + /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */ pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ d_pthread_mutex_unlock(&pWtp->mut); @@ -393,7 +360,6 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) || wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) { - //|| wtiChkStopWrkr(pThis, pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) { END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); break; /* end worker thread run */ } @@ -413,7 +379,6 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); } else { timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ -dbgprintf("timeout value is %ld\n", timeoutVal(&t)); if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { dbgprintf("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis)); bInactivityTOOccured = 1; /* indicate we had a timeout */ @@ -427,11 +392,6 @@ dbgprintf("timeout value is %ld\n", timeoutVal(&t)); /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */ dbgprintf("%s: calling consumer\n", wtiGetDbgHdr(pThis)); pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave); - - /* TODO: move this above into one of the chck Term functions */ - //if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0) - // dbgprintf("%s: worker does not yet terminate because it still has " - // " %d messages to process.\n", wtiGetDbgHdr(pThis), pThis->iQueueSize); } /* indicate termination */ @@ -441,22 +401,7 @@ dbgprintf("timeout value is %ld\n", timeoutVal(&t)); pWtp->pfOnWorkerShutdown(pWtp->pUsr); - // TODO: I think we no longer need that - but check! -#if 0 - /* if we ever need finalize_it, here would be the place for it! */ - if(qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN || - qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN_IMMEDIATE || - qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT || - qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_CREATED) { - /* in shutdown case, we need to flag termination. All other commands - * have a meaning to the thread harvester, so we can not overwrite them - */ -dbgprintf("%s: setting termination state\n", wtiGetDbgHdr(pThis)); - wtiSetState(pWrkrInst, eWRKTHRD_TERMINATING, 0); - } -#else wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); -#endif pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ d_pthread_mutex_unlock(&pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); @@ -173,11 +173,12 @@ wtpWakeupWrkr(wtp_t *pThis) { DEFiRet; - // TODO; mutex? + /* TODO; mutex? I think not needed, as we do not need predictable exec order -- rgerhards, 2008-01-28 */ ISOBJ_TYPE_assert(pThis, wtp); pthread_cond_signal(pThis->pcondBusy); RETiRet; } + /* wake up all worker threads. * rgerhards, 2008-01-16 */ @@ -187,7 +188,6 @@ wtpWakeupAllWrkr(wtp_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, wtp); - // TODO; mutex? pthread_cond_broadcast(pThis->pcondBusy); RETiRet; } @@ -228,7 +228,7 @@ wtpSetState(wtp_t *pThis, wtpState_t iNewState) ISOBJ_TYPE_assert(pThis, wtp); pThis->wtpState = iNewState; - // TODO: must wakeup workers? + /* TODO: must wakeup workers? seen to be not needed -- rgerhards, 2008-01-28 */ RETiRet; } @@ -321,14 +321,19 @@ RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); rsRetVal wtpSignalWrkrTermination(wtp_t *pThis) { DEFiRet; - //TODO: mutex or not mutex, that's the question ;)DEFVARS_mutexProtection; + /* I leave the mutex code here out as it give as deadlocks. I think it is not really + * needed and we are on the safe side. I leave this comment in if practice proves us + * wrong. The whole thing should be removed after half a your or year if we see there + * actually is no issue (or revisit it from a theoretical POV). + * rgerhards, 2008-01-28 + */ + /*TODO: mutex or not mutex, that's the question ;)DEFVARS_mutexProtection;*/ ISOBJ_TYPE_assert(pThis, wtp); - //BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX); -dbgprintf("signaling thread termination, cond %p\n", &pThis->condThrdTrm); + /*BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);*/ pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - //END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + /*END_MTX_PROTECTED_OPERATIONS(&pThis->mut);*/ RETiRet; } @@ -342,18 +347,16 @@ wtpCancelAll(wtp_t *pThis) DEFiRet; int i; int numCancelled = 0; - // TODO: mutex?? // TODO: cancellation in wti! + /* TODO: mutex?? TODO: cancellation in wti (but OK as is [though ugly form an isolation POV]!) */ ISOBJ_TYPE_assert(pThis, wtp); /* process any pending thread requests so that we know who actually is still running */ wtpProcessThrdChanges(pThis); -//RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); /* go through all workers and cancel those that are active */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - // TODO: mutex lock! -//RUNLOG_VAR("%d", pThis->pWrkr[i]->tCurrCmd); + /* TODO: mutex lock!*/ if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) { dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); pthread_cancel(pThis->pWrkr[i]->thrdID); @@ -430,13 +433,11 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in pthread_cleanup_push(wtpWrkrExecCancelCleanup, pThis); - // TODO: review code below - if still needed (setState yes!)? /* finally change to RUNNING state. We need to check if we actually should still run, * because someone may have requested us to shut down even before we got a chance to do * our init. That would be a bad race... -- rgerhards, 2008-01-16 */ - //if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT) - wtiSetState(pWti, eWRKTHRD_RUNNING, 0, MUTEX_ALREADY_LOCKED); /* we are running now! */ + wtiSetState(pWti, eWRKTHRD_RUNNING, 0, MUTEX_ALREADY_LOCKED); /* we are running now! */ do { END_MTX_PROTECTED_OPERATIONS(&pThis->mut); @@ -600,7 +601,6 @@ wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex) iNumWrkr = pThis->iCurNumWrkThrd; END_MTX_PROTECTED_OPERATIONS(&pThis->mut); -RUNLOG_VAR("%d", iNumWrkr); ENDfunc return iNumWrkr; } |