diff options
-rw-r--r-- | debug.c | 33 | ||||
-rw-r--r-- | obj-types.h | 13 | ||||
-rw-r--r-- | queue.c | 220 | ||||
-rw-r--r-- | queue.h | 5 | ||||
-rwxr-xr-x | srUtils.c | 6 | ||||
-rwxr-xr-x | srUtils.h | 4 | ||||
-rw-r--r-- | syslogd.c | 1 | ||||
-rw-r--r-- | wti.c | 65 | ||||
-rw-r--r-- | wti.h | 2 | ||||
-rw-r--r-- | wtp.c | 51 | ||||
-rw-r--r-- | wtp.h | 1 |
11 files changed, 269 insertions, 132 deletions
@@ -52,10 +52,12 @@ int Debug; /* debug flag - read-only after startup */ int debugging_on = 0; /* read-only, except on sig USR1 */ static int bLogFuncFlow = 0; /* shall the function entry and exit be logged to the debug log? */ static int bPrintFuncDBOnExit = 0; /* shall the function entry and exit be logged to the debug log? */ +static int bPrintMutexAction = 0; /* shall mutex calls be printed to the debug log? */ +static int bPrintTime = 1; /* print a timestamp together with debug message */ static char *pszAltDbgFileName = "/home/rger/proj/rsyslog/log"; /* if set, debug output is *also* sent to here */ static FILE *altdbg; /* and the handle for alternate debug output */ static FILE *stddbg; -static dbgFuncDB_t pCurrFunc; +//static dbgFuncDB_t pCurrFunc; /* list of all known FuncDBs. We use a special list, because it must only be single-linked. As @@ -77,6 +79,7 @@ typedef struct dbgMutLog_s { pthread_mutex_t *mut; pthread_t thrd; dbgFuncDB_t *pFuncDB; + int lockLn; /* the actual line where the mutex was locked */ short mutexOp; } dbgMutLog_t; static dbgMutLog_t *dbgMutLogListRoot = NULL; @@ -276,7 +279,7 @@ static inline void dbgFuncDBRemoveMutexLock(dbgFuncDB_t *pFuncDB, pthread_mutex_ /* constructor & add new entry to list */ -dbgMutLog_t *dbgMutLogAddEntry(pthread_mutex_t *pmut, short mutexOp, dbgFuncDB_t *pFuncDB) +dbgMutLog_t *dbgMutLogAddEntry(pthread_mutex_t *pmut, short mutexOp, dbgFuncDB_t *pFuncDB, int lockLn) { dbgMutLog_t *pLog; @@ -287,6 +290,7 @@ dbgMutLog_t *dbgMutLogAddEntry(pthread_mutex_t *pmut, short mutexOp, dbgFuncDB_t pLog->mut = pmut; pLog->thrd = pthread_self(); pLog->mutexOp = mutexOp; + pLog->lockLn = lockLn; pLog->pFuncDB = pFuncDB; DLL_Add(MutLog, pLog); @@ -327,7 +331,9 @@ static void dbgMutLogPrintOne(dbgMutLog_t *pLog) dbgGetThrdName(pszThrdName, sizeof(pszThrdName), pLog->thrd, 1); dbgprintf("mutex 0x%lx is being %s by code at %s:%d, thread %s\n", (unsigned long) pLog->mut, - strmutop, pLog->pFuncDB->file, pLog->pFuncDB->line, pszThrdName); + strmutop, pLog->pFuncDB->file, + (pLog->mutexOp == MUTOP_LOCK) ? pLog->lockLn : pLog->pFuncDB->line, + pszThrdName); } /* print the complete mutex log */ @@ -411,17 +417,18 @@ static inline void dbgMutexPreLockLog(pthread_mutex_t *pmut, dbgFuncDB_t *pFuncD pthread_mutex_lock(&mutMutLog); pHolder = dbgMutLogFindHolder(pmut); - pLog = dbgMutLogAddEntry(pmut, MUTOP_LOCKWAIT, pFuncDB); + pLog = dbgMutLogAddEntry(pmut, MUTOP_LOCKWAIT, pFuncDB, ln); if(pHolder == NULL) pszHolder = "[NONE]"; else { dbgGetThrdName(pszHolderThrdName, sizeof(pszHolderThrdName), pHolder->thrd, 1); - snprintf(pszBuf, sizeof(pszBuf)/sizeof(char), "%s:%d [%s]", pHolder->pFuncDB->file, pHolder->pFuncDB->line, pszHolderThrdName); + snprintf(pszBuf, sizeof(pszBuf)/sizeof(char), "%s:%d [%s]", pHolder->pFuncDB->file, pHolder->lockLn, pszHolderThrdName); pszHolder = pszBuf; } - dbgprintf("%s:%d:%s: mutex %p waiting on lock, held by %s\n", pFuncDB->file, ln, pFuncDB->func, (void*)pmut, pszHolder); + if(bPrintMutexAction) + dbgprintf("%s:%d:%s: mutex %p waiting on lock, held by %s\n", pFuncDB->file, ln, pFuncDB->func, (void*)pmut, pszHolder); pthread_mutex_unlock(&mutMutLog); } @@ -439,10 +446,11 @@ static inline void dbgMutexLockLog(pthread_mutex_t *pmut, dbgFuncDB_t *pFuncDB, dbgMutLogDelEntry(pLog); /* add "lock" entry */ - pLog = dbgMutLogAddEntry(pmut, MUTOP_LOCK, pFuncDB); + pLog = dbgMutLogAddEntry(pmut, MUTOP_LOCK, pFuncDB, lockLn); dbgFuncDBAddMutexLock(pFuncDB, pmut, lockLn); pthread_mutex_unlock(&mutMutLog); - dbgprintf("%s:%d:%s: mutex %p aquired\n", pFuncDB->file, lockLn, pFuncDB->func, (void*)pmut); + if(bPrintMutexAction) + dbgprintf("%s:%d:%s: mutex %p aquired\n", pFuncDB->file, lockLn, pFuncDB->func, (void*)pmut); } /* if we unlock, we just remove the lock aquired entry from the log list */ @@ -463,7 +471,8 @@ static inline void dbgMutexUnlockLog(pthread_mutex_t *pmut, dbgFuncDB_t *pFuncDB dbgMutLogDelEntry(pLog); pthread_mutex_unlock(&mutMutLog); - dbgprintf("%s:%d:%s: mutex %p UNlocked\n", pFuncDB->file, unlockLn, pFuncDB->func, (void*)pmut); + if(bPrintMutexAction) + dbgprintf("%s:%d:%s: mutex %p UNlocked\n", pFuncDB->file, unlockLn, pFuncDB->func, (void*)pmut); } @@ -685,6 +694,7 @@ dbgprintf(char *fmt, ...) static char pszThrdName[64]; /* 64 is to be on the safe side, anything over 20 is bad... */ static char pszWriteBuf[1024]; size_t lenWriteBuf; + struct timespec t; if(!(Debug && debugging_on)) return; @@ -717,6 +727,11 @@ dbgprintf(char *fmt, ...) dbgGetThrdName(pszThrdName, sizeof(pszThrdName), ptLastThrdID, 0); if(bWasNL) { + if(bPrintTime) { + clock_gettime(CLOCK_REALTIME, &t); + fprintf(stddbg, "%4.4ld.%9.9ld:", t.tv_sec % 1000, t.tv_nsec); + if(altdbg != NULL) fprintf(altdbg, "%4.4ld.%9.9ld:", t.tv_sec % 1000, t.tv_nsec); + } fprintf(stddbg, "%s: ", pszThrdName); if(altdbg != NULL) fprintf(altdbg, "%s: ", pszThrdName); } diff --git a/obj-types.h b/obj-types.h index 7eff503d..1e607d0c 100644 --- a/obj-types.h +++ b/obj-types.h @@ -100,16 +100,21 @@ typedef struct obj { /* the dummy struct that each derived class can be casted t objInfo_t *pObjInfo; \ unsigned int iObjCooCKiE; /* prevent name conflict, thus the strange name */ # define ISOBJ_assert(pObj) \ - { \ + do { \ + if(pObj == NULL) dbgPrintAllDebugInfo(); \ assert((pObj) != NULL); \ + if(((obj_t*)(pObj))->iObjCooCKiE != (unsigned) 0xBADEFEE) dbgPrintAllDebugInfo(); \ assert((unsigned) ((obj_t*)(pObj))->iObjCooCKiE == (unsigned) 0xBADEFEE); \ - } + } while(0); # define ISOBJ_TYPE_assert(pObj, objType) \ - { \ + do { \ + if(pObj == NULL) dbgPrintAllDebugInfo(); \ assert(pObj != NULL); \ + if(((obj_t*)(pObj))->iObjCooCKiE != (unsigned) 0xBADEFEE) dbgPrintAllDebugInfo(); \ assert((unsigned) pObj->iObjCooCKiE == (unsigned) 0xBADEFEE); \ + if(objGetObjID(pObj) != OBJ##objType) dbgPrintAllDebugInfo(); \ assert(objGetObjID(pObj) == OBJ##objType); \ - } + } while(0); #else /* non-debug mode, no checks but much faster */ # define BEGINobjInstance objInfo_t *pObjInfo; # define ISOBJ_TYPE_assert(pObj, objType) @@ -62,7 +62,7 @@ DEFobjStaticHelpers /* forward-definitions */ rsRetVal queueChkPersist(queue_t *pThis); -static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly); +static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex); static int queueChkStopWrkrDA(queue_t *pThis); static int queueIsIdleDA(queue_t *pThis); static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave); @@ -81,9 +81,11 @@ static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2); * do it at some later time, because we need to destruct the DA queue. That, * however, can not be done in a thread that has been signalled * This is to be called when we revert back to our own queue. + * This function must be called with the queue mutex locked (the wti + * class ensures this). * rgerhards, 2008-01-15 */ -static inline rsRetVal +static rsRetVal queueTurnOffDAMode(queue_t *pThis) { DEFiRet; @@ -91,31 +93,69 @@ queueTurnOffDAMode(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); assert(pThis->bRunsDA); + /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need + * to wait for its startup... -- rgerhards, 2008-01-25 + */ + while(pThis->bRunsDA != 2) { + d_pthread_cond_wait(&pThis->condDAReady, pThis->mut); + } /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to * keep that comment if future need arises. */ - /* we start at least one worker thread. If no new messages come in, this will - * be the only one for the time being. I am not yet sure if that is acceptable. - * To solve that issue, queueWorker () would need to check if it needs to fire - * up addtl ones. I am not yet sure if that is justified. After all, if no new - * messages come into the queue, we may be well off with a single worker. - * rgerhards, 2008-01-16 + /* we need to check if the DA queue is empty because the DA worker may simply have + * terminated do to no new messages arriving. That does not, however, mean that the + * DA queue is empty. If there is still data in that queue, we do nothing and leave + * that for a later incarnation of this function (it will be called multiple times + * during the lifetime of DA-mode, depending on how often the DA worker receives an + * inactivity timeout. -- rgerhards, 2008-01-25 */ +RUNLOG_VAR("%p", pThis->pqDA); +RUNLOG_VAR("%d", pThis->pqDA->iQueueSize); + if(pThis->pqDA->iQueueSize == 0) { dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQueInDestr %d, NumWrkd %d\n", queueGetID(pThis), pThis->bEnqOnly,pThis->bQueueInDestruction,pThis->iCurNumWrkThrd); - // TODO: mutex? - pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ - /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, - * this will be quick. - */ - queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ + pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ + /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, + * this will be quick. + */ + queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ + dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n", + queueGetID(pThis), iRet); + } + + RETiRet; +} + + + +/* returns the number of workers that should be advised at + * this point in time. The mutex must be locked when + * ths function is called. -- rgerhards, 2008-01-25 + */ +static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) +{ + DEFiRet; + int iMaxWorkers; - dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n", - queueGetID(pThis), iRet); + ISOBJ_TYPE_assert(pThis, queue); + +RUNLOG_VAR("%d", pThis->bEnqOnly); + if(!pThis->bEnqOnly) { + if(pThis->bRunsDA) { + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ + } else { + if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; + } else { + iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1; + } + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ + } + } RETiRet; } @@ -151,7 +191,10 @@ RUNLOG_VAR("%s", pThis->pszFilePrefix); * chore of this function is to create the DA queue object. If that function fails, * the DA worker should return with an appropriate state, which in turn should lead to * a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this - * function is called, else a race on pThis->bRunsDA may happen. + * function is called, else a number of races will happen. + * Please note that this function may be called *while* we in DA mode. This is due to the + * fact that the DA worker calls it and the DA worker may be suspended (and restarted) due + * to inactivity timeouts. * rgerhards, 2008-01-15 */ static rsRetVal @@ -161,6 +204,9 @@ queueStartDA(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); + if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */ + FINALIZE; /* ... then we are already done! */ + /* set up sync objects */ pthread_mutex_init(&pThis->mutDA, NULL); pthread_cond_init(&pThis->condDA, NULL); @@ -180,7 +226,7 @@ dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pT CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); - CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly)); + CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); if(pThis->toQShutdown == 0) { @@ -207,7 +253,8 @@ dbgprintf("Queue %p: queueStartDA pre start\n", pThis); */ wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */ - pThis->bRunsDA = 1; /* we are now in DA mode! */ + pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */ + pthread_cond_signal(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n", queueGetID(pThis), queueGetID(pThis->pqDA)); @@ -248,7 +295,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) * rgerhards, 2008-01-24 */ if(pThis->pWtpDA == NULL) { - lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx/DA", (unsigned long) pThis); + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx:DA", (unsigned long) pThis); CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, queueChkStopWrkrDA)); @@ -273,8 +320,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) * that will also start one up. If we forgot that step, everything would be stalled * until the next enqueue request. */ - if(pThis->bEnqOnly == 0) - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* one worker only for disk queues! */ + queueAdviseMaxWorkers(pThis); finalize_it: END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -283,7 +329,9 @@ finalize_it: /* check if we need to start disk assisted mode and send some signals to - * keep it running if we are already in it. + * keep it running if we are already in it. It also checks if DA mode is + * partially initialized, in which case it waits for initialization to + * complete. * rgerhards, 2008-01-14 */ static inline rsRetVal @@ -310,7 +358,7 @@ dbgprintf("Queue %p: chkStartDA\n", pThis); */ dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", queueGetID(pThis), pThis->iQueueSize); - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* run again [see comment above] ;) */ + queueAdviseMaxWorkers(pThis); } else { /* this is the case when we are currently not running in DA mode. So it is time * to turn it back on. @@ -779,7 +827,6 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) { DEFiRet; DEFVARS_mutexProtection; - int i; struct timespec tTimeout; rsRetVal iRetLocal; @@ -788,15 +835,20 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis)); // TODO: reminder, delte after testing: do we need to modify the high wtr mark? I dont' think so 2008-01-25 + /* we reduce the low water mark in any case. This is not absolutely necessary, but + * it is useful because we enable DA mode at several spots below and so we do not need + * to think about the low water mark each time. + */ + pThis->iLowWtrMrk = 0; + /* first try to shutdown the queue within the regular shutdown period */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ if(pThis->iQueueSize > 0) { if(pThis->bRunsDA) { - /* worker threads may be inactive after reaching low water - * mark. Lower the mark and react workers. + /* We may have waited on the low water mark. As it may have changed, we + * see if we reactivate the worker. */ - pThis->iLowWtrMrk = 0; - wtpAdviseMaxWorkers(pThis->pWtpReg, 1); + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); } } END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -843,17 +895,22 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) } /* when we reach this point, both queues are either empty or the regular queue shutdown timeout - * has expired. Now we need to check if we areconfigured to not loose messages. If so, we need + * has expired. Now we need to check if we are configured to not loose messages. If so, we need * to persist the queue to disk (this is only possible if the queue is DA-enabled). */ +// TODO: what about pure disk queues and bSaveOnShutdown? BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ +RUNLOG_VAR("%d", pThis->bSaveOnShutdown); +RUNLOG_VAR("%d", pThis->bIsDA); +RUNLOG_VAR("%d", pThis->iQueueSize); if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) { +RUNLOG; /* switch to enqueue-only mode so that no more actions happen */ if(pThis->bRunsDA == 0) { queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ } else { - queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* switch to enqueue-only mode */ + queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */ } END_MTX_PROTECTED_OPERATIONS(pThis->mut); /* make sure we do not timeout before we are done */ @@ -870,10 +927,12 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) END_MTX_PROTECTED_OPERATIONS(pThis->mut); } +RUNLOG; /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we * can now request immediate shutdown of any remaining workers. */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ +RUNLOG_VAR("%d", pThis->iQueueSize); if(pThis->iQueueSize > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); @@ -895,9 +954,10 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) * thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25 */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ +RUNLOG_VAR("%d", pThis->iQueueSize); if(pThis->iQueueSize > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); - dbgprintf("Queue 0x%lx: primary queue worker threads could not be shutdown, now canceling them\n", + dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the primary queue\n", queueGetID(pThis)); iRetLocal = wtpCancelAll(pThis->pWtpReg); if(iRetLocal != RS_RET_OK) { @@ -913,7 +973,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); - dbgprintf("Queue 0x%lx: DA worker threads could not be shutdown, now canceling them\n", + dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n", queueGetID(pThis)); iRetLocal = wtpCancelAll(pThis->pWtpReg); if(iRetLocal != RS_RET_OK) { @@ -948,8 +1008,6 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, DEFiRet; queue_t *pThis; -int *pBoom = NULL; -//*pBoom = 'A'; assert(ppThis != NULL); assert(pConsumer != NULL); assert(iWorkerThreads >= 0); @@ -1196,7 +1254,11 @@ static int queueIsIdleDA(queue_t *pThis) { /* remember: iQueueSize is the DA queue size, not the main queue! */ - return (pThis->iQueueSize == 0); +RUNLOG_VAR("%d", pThis->iLowWtrMrk); +dbgprintf("queueIsIdleDA(%p) returns %d, qsize %d\n", pThis, pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk), pThis->iQueueSize); + //// TODO: I think we need just a single function... + //return (pThis->iQueueSize == 0); + return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk)); } /* must only be called when the queue mutex is locked, else results * are not stable! Regular queue version @@ -1235,6 +1297,7 @@ dbgprintf("Queue %p: I am child, use mutex %p\n", pThis, pThis->pqParent->mut); } pthread_mutex_init(&pThis->mutThrdMgmt, NULL); + pthread_cond_init (&pThis->condDAReady, NULL); pthread_cond_init (&pThis->notFull, NULL); pthread_cond_init (&pThis->notEmpty, NULL); dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); @@ -1242,8 +1305,8 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ - dbgprintf("Queue 0x%lx: type %d, enq-only %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis), - pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize); + dbgprintf("Queue 0x%lx: type %d, enq-only %d, disk assisted %d, maxFileSz %ld, qsize %d starting\n", queueGetID(pThis), + pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iQueueSize); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ @@ -1251,7 +1314,7 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); /* create worker thread pools for regular operation. The DA pool is created on an as-needed * basis, which potentially means never under most circumstances. */ - lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx/Reg", (unsigned long) pThis); + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx:Reg", (unsigned long) pThis); CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, queueChkStopWrkrReg)); @@ -1265,20 +1328,18 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); CHKiRet(wtpConstructFinalize (pThis->pWtpReg)); /* initialize worker thread instances */ +RUNLOG_VAR("%d", pThis->bIsDA); if(pThis->bIsDA) { /* If we are disk-assisted, we need to check if there is a QIF file * which we need to load. -- rgerhards, 2008-01-15 */ iRetLocal = queueHaveQIF(pThis); +RUNLOG_VAR("%d", iRetLocal); if(iRetLocal == RS_RET_OK) { dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n", queueGetID(pThis)); - +RUNLOG; queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ - /* we need to start the DA worker thread so that messages will be processed. So - * we advise the worker pool there is at least one needed. The wtp does the rest... - */ - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); bInitialized = 1; /* we are done */ } else { // TODO: use logerror? -- rgerhards, 2008-01-16 @@ -1287,11 +1348,17 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); } } +RUNLOG_VAR("%d", bInitialized); if(!bInitialized) { - dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis)); - /* we do not fire up any worker threads here, this happens automatically when they are needed */ - // TODO: preforked workers? queueStrtAllWrkThrds(pThis); + dbgprintf("Queue 0x%lx: queue starts up without (loading) any DA disk state (this is normal for the DA " + "queue itself!)\n", queueGetID(pThis)); } + + /* if the queue already contains data, we need to start the correct number of worker threads. This can be + * the case when a disk queue has been loaded. If we did not start it here, it would never start. + */ + queueAdviseMaxWorkers(pThis); + pThis->bQueueStarted = 1; finalize_it: @@ -1411,12 +1478,21 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ /* shut down all workers (handles *all* of the persistence logic) */ - queueShutdownWorkers(pThis); + if(!pThis->bEnqOnly) /* in enque-only mode, we have no worker pool! */ + queueShutdownWorkers(pThis); +RUNLOG; - /* finally destruct our (regular) worker thread pool */ - if(pThis->qType != QUEUETYPE_DIRECT) { + /* finally destruct our (regular) worker thread pool + * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen, + * e.g. when they are not created in enqueue-only mode. We already check the condition + * as this may otherwise be very hard to find once we optimize (and have long forgotten + * about this condition here ;) + * rgerhards, 2008-01-25 + */ + if(pThis->qType != QUEUETYPE_DIRECT && pThis->pWtpReg != NULL) { wtpDestruct(&pThis->pWtpReg); } +RUNLOG; /* Now check if we actually have a DA queue and, if so, destruct it. * Note that the wtp must be destructed first, it may be in cancel cleanup handler @@ -1424,10 +1500,16 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove * data (re-queueing case). So we need to destruct the wtp first, which will make * sure all workers have terminated. */ +RUNLOG_VAR("%p", pThis->pWtpDA); if(pThis->pWtpDA != NULL) { +RUNLOG; wtpDestruct(&pThis->pWtpDA); +RUNLOG_VAR("%p", pThis->pqDA); + } + if(pThis->pqDA != NULL) { queueDestruct(&pThis->pqDA); } +RUNLOG; /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) * This handler is most important for disk queues, it will finally persist the necessary @@ -1447,6 +1529,7 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove free(pThis->mut); } pthread_mutex_destroy(&pThis->mutThrdMgmt); + pthread_cond_destroy(&pThis->condDAReady); pthread_cond_destroy(&pThis->notFull); pthread_cond_destroy(&pThis->notEmpty); @@ -1522,7 +1605,6 @@ queueEnqObj(queue_t *pThis, void *pUsr) { DEFiRet; int iCancelStateSave; - int iMaxWorkers; int i; struct timespec t; @@ -1530,6 +1612,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) // TODO: check if queue is terminating and if so either discard message or enqeue it to the DA queue *directly* dbgprintf("Queue %p: EnqObj() 1\n", pThis); +RUNLOG_VAR("%d", pThis->bRunsDA); /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE * during its execution. If that is not done, race conditions occur if the @@ -1544,25 +1627,10 @@ dbgprintf("Queue %p: EnqObj() 1\n", pThis); /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); -dbgprintf("Queue %p: EnqObj() 10\n", pThis); /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) CHKiRet(queueChkStrtDA(pThis)); -RUNLOG_VAR("%d", pThis->bIsDA); - /* make sure at least one worker is running. */ - if(pThis->bRunsDA) { -RUNLOG; - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ - } else { - if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { - iMaxWorkers = 1; - } else { - iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1; - } -RUNLOG; - wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); - } /* wait for the queue to be ready... */ while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { @@ -1576,6 +1644,8 @@ RUNLOG; } /* and finally enqueue the message */ +RUNLOG_VAR("%p", pThis); +RUNLOG_VAR("%d", pThis->bRunsDA); CHKiRet(queueAdd(pThis, pUsr)); queueChkPersist(pThis); @@ -1587,6 +1657,8 @@ finalize_it: pthread_setcancelstate(iCancelStateSave, NULL); } + /* make sure at least one worker is running. */ + queueAdviseMaxWorkers(pThis); RETiRet; } @@ -1601,10 +1673,10 @@ finalize_it: * rgerhards, 2008-01-16 */ static rsRetVal -queueSetEnqOnly(queue_t *pThis, int bEnqOnly) +queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex) { DEFiRet; - int iCancelStateSave; + DEFVARS_mutexProtection; ISOBJ_TYPE_assert(pThis, queue); @@ -1612,8 +1684,7 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly) * called, so that doesn't matter... -- rgerhards, 2008-01-16 */ if(pThis->mut != NULL) { - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->mut); + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex); } if(bEnqOnly == pThis->bEnqOnly) @@ -1626,8 +1697,10 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly) /* this means we need to terminate all workers - that's it... */ dbgprintf("Queue 0x%lx: switching to enqueue-only mode, terminating all worker threads\n", queueGetID(pThis)); - wtpWakeupAllWrkr(pThis->pWtpDA); - wtpWakeupAllWrkr(pThis->pWtpReg); + if(pThis->pWtpReg != NULL) + wtpWakeupAllWrkr(pThis->pWtpReg); + if(pThis->pWtpDA != NULL) + wtpWakeupAllWrkr(pThis->pWtpDA); } else { /* switch back to regular mode */ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */ @@ -1638,8 +1711,7 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly) finalize_it: if(pThis->mut != NULL) { - d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } RETiRet; } @@ -90,7 +90,8 @@ typedef struct queue_s { pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */ pthread_cond_t notFull, notEmpty; - pthread_cond_t condThrdTrm;/* signalled when threads terminate */ + pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */ + pthread_cond_t condThrdTrm;/* signalled when threads terminate */ // TODO: no longer used? pthread_cond_t *condSignalOnEmpty;/* caller-provided condition to be signalled when queue is empty (DA mode!) */ pthread_mutex_t *mutSignalOnEmpty; /* and its associated mutex */ pthread_cond_t *condSignalOnEmpty2;/* another condition to be signalled on empty */ @@ -112,7 +113,7 @@ typedef struct queue_s { int bIsDA; /* is this queue disk assisted? */ int bRunsDA; /* is this queue actually *running* disk assisted? */ pthread_mutex_t mutDA; /* mutex for low water mark algo */ - pthread_cond_t condDA; /* and its matching condition */ + pthread_cond_t condDA; /* and its matching condition */ // TODO: no longer needed! struct queue_s *pqDA; /* queue for disk-assisted modes */ struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */ int bDAEnqOnly; /* EnqOnly setting for DA queue */ @@ -354,9 +354,9 @@ timeoutVal(struct timespec *pt) iTimeout = (pt->tv_nsec - t.tv_nsec) / 1000; } } else { - iTimeout = pt->tv_sec - t.tv_nsec; - iTimeout += 1000 - (pt->tv_nsec / 1000); - iTimeout += t.tv_nsec / 1000; + iTimeout = (pt->tv_sec - t.tv_sec) * 1000; + iTimeout += 1000 - (pt->tv_nsec / 1000000); + iTimeout += t.tv_nsec / 1000000; } return iTimeout; @@ -89,14 +89,12 @@ void mutexCancelCleanup(void *arg); #define LOCK_MUTEX 1 #define DEFVARS_mutexProtection\ int iCancelStateSave; \ - int bLockedOpIsLocked + int bLockedOpIsLocked=0 #define BEGIN_MTX_PROTECTED_OPERATIONS(mut, bMustLock) \ if(bMustLock == LOCK_MUTEX) { \ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \ d_pthread_mutex_lock(mut); \ bLockedOpIsLocked = 1; \ - } else { \ - bLockedOpIsLocked = 0; \ } #define END_MTX_PROTECTED_OPERATIONS(mut) \ if(bLockedOpIsLocked) { \ @@ -2577,6 +2577,7 @@ static void doDie(int sig) { static int iRetries = 0; /* debug aid */ dbgprintf("DoDie called.\n"); + dbgPrintAllDebugInfo(); if(iRetries++ == 4) { dbgprintf("DoDie called 5 times - unconditional exit\n"); exit(1); @@ -91,7 +91,6 @@ wtiGetState(wti_t *pThis, int bLockMutex) } - /* send a command to a specific thread * bActiveOnly specifies if the command should be sent only when the worker is * in an active state. -- rgerhards, 2008-01-20 @@ -107,7 +106,6 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); -RUNLOG_VAR("%d", bActiveOnly); /* all worker states must be followed sequentially, only termination can be set in any state */ if( (bActiveOnly && (pThis->tCurrCmd < eWRKTHRD_RUN_CREATED)) || (pThis->tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) { @@ -116,14 +114,13 @@ RUNLOG_VAR("%d", bActiveOnly); } else { dbgprintf("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd); switch(tCmd) { - case eWRKTHRD_RUN_CREATED: - break; case eWRKTHRD_TERMINATING: /* TODO: re-enable meaningful debug msg! (via function callback?) dbgprintf("%s: thread terminating with %d entries left in queue, %d workers running.\n", wtiGetDbgHdr(pThis->pQueue), pThis->pQueue->iQueueSize, pThis->pQueue->iCurNumWrkThrd); */ + pthread_cond_signal(&pThis->condExitDone); dbgprintf("%s: worker terminating\n", wtiGetDbgHdr(pThis)); break; case eWRKTHRD_RUNNING: @@ -131,6 +128,7 @@ RUNLOG_VAR("%d", bActiveOnly); break; /* these cases just to satisfy the compiler, we do (yet) not act an them: */ case eWRKTHRD_STOPPED: + case eWRKTHRD_RUN_CREATED: case eWRKTHRD_RUN_INIT: case eWRKTHRD_SHUTDOWN: case eWRKTHRD_SHUTDOWN_IMMEDIATE: @@ -145,6 +143,39 @@ RUNLOG_VAR("%d", bActiveOnly); } +#if 0 +/* check if the worker shall shutdown (1 = yes, 0 = no) + * TODO: check if we can use atomic operations to enhance performance + * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user" + * (e.g. the queue clas) + * rgerhards, 2008-01-24 + * TODO: we can optimize this via function pointers, as the code is only called during + * termination. So we can call the function via ptr in wtiWorker () and change that pointer + * to this function here upon shutdown. + */ +static inline rsRetVal +wtiChkStopWrkr(wti_t *pThis, wtp_t *pWtp, int bLockMutex, int bLockUsrMutex) +{ + DEFiRet; + DEFVARS_mutexProtection; + + ISOBJ_TYPE_assert(pThis, wti); + + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut); + if(pThis->bShutdownRqtd) { + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + iRet = RS_RET_TERMINATE_NOW; + } else { + /* regular case */ + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + iRet = wtpChkStopWrkr(pWtp, bLockMutex, bLockUsrMutex); + } + + RETiRet; +} +#endif + + /* Destructor */ rsRetVal wtiDestruct(wti_t **ppThis) { @@ -160,17 +191,28 @@ rsRetVal wtiDestruct(wti_t **ppThis) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); /* if we reach this point, we must make sure the associated worker has terminated. It is - * the callers duty to make sure the worker has already terminated. + * the callers duty to make sure the worker already knows it shall terminate. * TODO: is it *really* the caller's duty? ...mmmhhhh.... smells bad... rgerhards, 2008-01-25 */ wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */ d_pthread_mutex_lock(&pThis->mut); - assert(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) <= eWRKTHRD_TERMINATING); // I knew it smelled bad... +RUNLOG_VAR("%d", pThis->tCurrCmd); + if(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) { + dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - joining it\n", + wtiGetDbgHdr(pThis), pThis); + /* let's hope the caller actually instructed it to shutdown... */ + pthread_cond_wait(&pThis->condExitDone, &pThis->mut); +RUNLOG; + wtiJoinThrd(pThis); +RUNLOG; + } +RUNLOG; d_pthread_mutex_unlock(&pThis->mut); /* actual destruction */ pthread_cond_destroy(&pThis->condInitDone); + pthread_cond_destroy(&pThis->condExitDone); pthread_mutex_destroy(&pThis->mut); if(pThis->pszDbgHdr != NULL) @@ -191,6 +233,7 @@ rsRetVal wtiDestruct(wti_t **ppThis) */ BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ pthread_cond_init(&pThis->condInitDone, NULL); + pthread_cond_init(&pThis->condExitDone, NULL); pthread_mutex_init(&pThis->mut, NULL); ENDobjConstruct(wti) @@ -228,7 +271,7 @@ wtiJoinThrd(wti_t *pThis) pthread_join(pThis->thrdID, NULL); RUNLOG; wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */ -RUNLOG; +RUNLOG_VAR("%p", pThis->thrdID); pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ dbgprintf("wti: worker %s has stopped\n", wtiGetDbgHdr(pThis)); @@ -339,7 +382,9 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); + BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); pWtp->pfOnWorkerStartup(pWtp->pUsr); + END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ @@ -354,6 +399,7 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) || wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) { + //|| wtiChkStopWrkr(pThis, pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) { END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); break; /* end worker thread run */ } @@ -393,14 +439,13 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), // " %d messages to process.\n", wtiGetDbgHdr(pThis), pThis->iQueueSize); } - pWtp->pfOnWorkerShutdown(pWtp->pUsr); - /* indicate termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); -dbgprintf("%s: worker waiting for mutex\n", wtiGetDbgHdr(pThis)); d_pthread_mutex_lock(&pThis->mut); pthread_cleanup_pop(0); /* remove cleanup handler */ + pWtp->pfOnWorkerShutdown(pWtp->pUsr); + // TODO: I think we no longer need that - but check! #if 0 /* if we ever need finalize_it, here would be the place for it! */ @@ -35,7 +35,9 @@ typedef struct wti_s { obj_t *pUsrp; /* pointer to an object meaningful for current user pointer (e.g. queue pUsr data elemt) */ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */ + pthread_cond_t condExitDone; /* signaled when the thread exit is done (once per thread existance) */ pthread_mutex_t mut; + int bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */ uchar *pszDbgHdr; /* header string for debug messages */ } wti_t; @@ -135,8 +135,6 @@ wtpDestruct(wtp_t **ppThis) int iCancelStateSave; int i; -dbgPrintAllDebugInfo(); -RUNLOG; assert(ppThis != NULL); pThis = *ppThis; ISOBJ_TYPE_assert(pThis, wtp); @@ -179,9 +177,7 @@ wtpWakeupWrkr(wtp_t *pThis) // TODO; mutex? ISOBJ_TYPE_assert(pThis, wtp); -dbgprintf("wtpWakeupWrkr 1, cond %p\n", pThis->pcondBusy); pthread_cond_signal(pThis->pcondBusy); -dbgprintf("wtpWakeupWrkr 2\n"); RETiRet; } /* wake up all worker threads. @@ -211,10 +207,8 @@ wtpProcessThrdChanges(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - RUNLOG; if(pThis->bThrdStateChanged == 0) FINALIZE; - RUNLOG; /* go through all threads */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { @@ -222,7 +216,6 @@ wtpProcessThrdChanges(wtp_t *pThis) } finalize_it: - RUNLOG; RETiRet; } @@ -255,6 +248,8 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) DEFiRet; DEFVARS_mutexProtection; + ISOBJ_TYPE_assert(pThis, wtp); + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex))) @@ -281,14 +276,17 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout int bTimedOut; int iCancelStateSave; -dbgPrintAllDebugInfo(); -RUNLOG_VAR("%p", pThis); -RUNLOG_VAR("%d", tShutdownCmd); ISOBJ_TYPE_assert(pThis, wtp); wtpSetState(pThis, tShutdownCmd); wtpWakeupAllWrkr(pThis); + + /* see if we need to harvest (join) any terminated threads (even in timeout case, + * some may have terminated... + */ + wtpProcessThrdChanges(pThis); +RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); /* and wait for their termination */ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); @@ -297,6 +295,7 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { +RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd); @@ -315,7 +314,6 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); */ wtpProcessThrdChanges(pThis); -dbgprintf("wtpShutdownAll exit"); RETiRet; } @@ -346,6 +344,7 @@ wtpCancelAll(wtp_t *pThis) { DEFiRet; int i; + int numCancelled = 0; // TODO: mutex?? // TODO: cancellation in wti! ISOBJ_TYPE_assert(pThis, wtp); @@ -353,19 +352,17 @@ wtpCancelAll(wtp_t *pThis) /* process any pending thread requests so that we know who actually is still running */ wtpProcessThrdChanges(pThis); -RUNLOG_VAR("%d", pThis->iNumWorkerThreads);; /* first tell the workers our request */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { // TODO: mutex lock! -RUNLOG_VAR("%p", pThis->pWrkr[i]); if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) { -RUNLOG; dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); pthread_cancel(pThis->pWrkr[i]->thrdID); + ++numCancelled; } } -RUNLOG; + dbgprintf("%s: cancelled %d worker threads\n", wtpGetDbgHdr(pThis), numCancelled); RETiRet; } @@ -380,13 +377,9 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex) DEFiRet; DEFVARS_mutexProtection; -RUNLOG; BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); -RUNLOG; pThis->bInactivityGuard = bNewState; -RUNLOG; END_MTX_PROTECTED_OPERATIONS(&pThis->mut); -RUNLOG; RETiRet; } @@ -403,6 +396,7 @@ wtpWrkrExecCancelCleanup(void *arg) ISOBJ_TYPE_assert(pThis, wtp); pThis->iCurNumWrkThrd--; +RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); wtpSignalWrkrTermination(pThis); dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd); @@ -459,6 +453,7 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in pthread_cleanup_pop(0); pThis->iCurNumWrkThrd--; +RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); wtpSignalWrkrTermination(pThis); dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n", @@ -488,6 +483,8 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); pThis->iCurNumWrkThrd++; +dbgPrintAllDebugInfo(); +RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); /* find free spot in thread table. If we find at least one worker that is in initialization, * we do NOT start a new one. Let's give the other one a chance, first. @@ -538,7 +535,6 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) int nMissing; /* number workers missing to run */ int i; - if(pThis == NULL) dbgPrintAllDebugInfo(); ISOBJ_TYPE_assert(pThis, wtp); dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetDbgHdr(pThis), nMaxWrkr, pThis->iCurNumWrkThrd, pThis->iNumWorkerThreads); @@ -547,11 +543,10 @@ dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetD BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX); + if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */ + nMaxWrkr = pThis->iNumWorkerThreads; + nMissing = nMaxWrkr - pThis->iCurNumWrkThrd; - if(nMissing > pThis->iNumWorkerThreads) - nMissing = pThis->iNumWorkerThreads; - else if(nMissing < 0) - nMissing = 0; if(nMissing > 0) { dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); @@ -559,9 +554,11 @@ dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetD for(i = 0 ; i < nMissing ; ++i) { CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED)); } - } else { -dbgprintf("wtpAdviseMaxWorkers signals busy\n"); - wtpWakeupWrkr(pThis); + } else { + if(nMaxWrkr > 0) { + dbgprintf("wtpAdviseMaxWorkers signals busy\n"); + wtpWakeupWrkr(pThis); + } } @@ -108,6 +108,7 @@ PROTOTYPEpropSetMeth(wtp, toWrkShutdown, long); PROTOTYPEpropSetMeth(wtp, wtpState, wtpState_t); PROTOTYPEpropSetMeth(wtp, iMaxWorkerThreads, int); PROTOTYPEpropSetMeth(wtp, pUsr, void*); +PROTOTYPEpropSetMeth(wtp, iNumWorkerThreads, int); PROTOTYPEpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t); PROTOTYPEpropSetMethPTR(wtp, pcondBusy, pthread_cond_t); |