summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--debug.c33
-rw-r--r--obj-types.h13
-rw-r--r--queue.c220
-rw-r--r--queue.h5
-rwxr-xr-xsrUtils.c6
-rwxr-xr-xsrUtils.h4
-rw-r--r--syslogd.c1
-rw-r--r--wti.c65
-rw-r--r--wti.h2
-rw-r--r--wtp.c51
-rw-r--r--wtp.h1
11 files changed, 269 insertions, 132 deletions
diff --git a/debug.c b/debug.c
index a3aa2b35..c7f5e408 100644
--- a/debug.c
+++ b/debug.c
@@ -52,10 +52,12 @@ int Debug; /* debug flag - read-only after startup */
int debugging_on = 0; /* read-only, except on sig USR1 */
static int bLogFuncFlow = 0; /* shall the function entry and exit be logged to the debug log? */
static int bPrintFuncDBOnExit = 0; /* shall the function entry and exit be logged to the debug log? */
+static int bPrintMutexAction = 0; /* shall mutex calls be printed to the debug log? */
+static int bPrintTime = 1; /* print a timestamp together with debug message */
static char *pszAltDbgFileName = "/home/rger/proj/rsyslog/log"; /* if set, debug output is *also* sent to here */
static FILE *altdbg; /* and the handle for alternate debug output */
static FILE *stddbg;
-static dbgFuncDB_t pCurrFunc;
+//static dbgFuncDB_t pCurrFunc;
/* list of all known FuncDBs. We use a special list, because it must only be single-linked. As
@@ -77,6 +79,7 @@ typedef struct dbgMutLog_s {
pthread_mutex_t *mut;
pthread_t thrd;
dbgFuncDB_t *pFuncDB;
+ int lockLn; /* the actual line where the mutex was locked */
short mutexOp;
} dbgMutLog_t;
static dbgMutLog_t *dbgMutLogListRoot = NULL;
@@ -276,7 +279,7 @@ static inline void dbgFuncDBRemoveMutexLock(dbgFuncDB_t *pFuncDB, pthread_mutex_
/* constructor & add new entry to list
*/
-dbgMutLog_t *dbgMutLogAddEntry(pthread_mutex_t *pmut, short mutexOp, dbgFuncDB_t *pFuncDB)
+dbgMutLog_t *dbgMutLogAddEntry(pthread_mutex_t *pmut, short mutexOp, dbgFuncDB_t *pFuncDB, int lockLn)
{
dbgMutLog_t *pLog;
@@ -287,6 +290,7 @@ dbgMutLog_t *dbgMutLogAddEntry(pthread_mutex_t *pmut, short mutexOp, dbgFuncDB_t
pLog->mut = pmut;
pLog->thrd = pthread_self();
pLog->mutexOp = mutexOp;
+ pLog->lockLn = lockLn;
pLog->pFuncDB = pFuncDB;
DLL_Add(MutLog, pLog);
@@ -327,7 +331,9 @@ static void dbgMutLogPrintOne(dbgMutLog_t *pLog)
dbgGetThrdName(pszThrdName, sizeof(pszThrdName), pLog->thrd, 1);
dbgprintf("mutex 0x%lx is being %s by code at %s:%d, thread %s\n", (unsigned long) pLog->mut,
- strmutop, pLog->pFuncDB->file, pLog->pFuncDB->line, pszThrdName);
+ strmutop, pLog->pFuncDB->file,
+ (pLog->mutexOp == MUTOP_LOCK) ? pLog->lockLn : pLog->pFuncDB->line,
+ pszThrdName);
}
/* print the complete mutex log */
@@ -411,17 +417,18 @@ static inline void dbgMutexPreLockLog(pthread_mutex_t *pmut, dbgFuncDB_t *pFuncD
pthread_mutex_lock(&mutMutLog);
pHolder = dbgMutLogFindHolder(pmut);
- pLog = dbgMutLogAddEntry(pmut, MUTOP_LOCKWAIT, pFuncDB);
+ pLog = dbgMutLogAddEntry(pmut, MUTOP_LOCKWAIT, pFuncDB, ln);
if(pHolder == NULL)
pszHolder = "[NONE]";
else {
dbgGetThrdName(pszHolderThrdName, sizeof(pszHolderThrdName), pHolder->thrd, 1);
- snprintf(pszBuf, sizeof(pszBuf)/sizeof(char), "%s:%d [%s]", pHolder->pFuncDB->file, pHolder->pFuncDB->line, pszHolderThrdName);
+ snprintf(pszBuf, sizeof(pszBuf)/sizeof(char), "%s:%d [%s]", pHolder->pFuncDB->file, pHolder->lockLn, pszHolderThrdName);
pszHolder = pszBuf;
}
- dbgprintf("%s:%d:%s: mutex %p waiting on lock, held by %s\n", pFuncDB->file, ln, pFuncDB->func, (void*)pmut, pszHolder);
+ if(bPrintMutexAction)
+ dbgprintf("%s:%d:%s: mutex %p waiting on lock, held by %s\n", pFuncDB->file, ln, pFuncDB->func, (void*)pmut, pszHolder);
pthread_mutex_unlock(&mutMutLog);
}
@@ -439,10 +446,11 @@ static inline void dbgMutexLockLog(pthread_mutex_t *pmut, dbgFuncDB_t *pFuncDB,
dbgMutLogDelEntry(pLog);
/* add "lock" entry */
- pLog = dbgMutLogAddEntry(pmut, MUTOP_LOCK, pFuncDB);
+ pLog = dbgMutLogAddEntry(pmut, MUTOP_LOCK, pFuncDB, lockLn);
dbgFuncDBAddMutexLock(pFuncDB, pmut, lockLn);
pthread_mutex_unlock(&mutMutLog);
- dbgprintf("%s:%d:%s: mutex %p aquired\n", pFuncDB->file, lockLn, pFuncDB->func, (void*)pmut);
+ if(bPrintMutexAction)
+ dbgprintf("%s:%d:%s: mutex %p aquired\n", pFuncDB->file, lockLn, pFuncDB->func, (void*)pmut);
}
/* if we unlock, we just remove the lock aquired entry from the log list */
@@ -463,7 +471,8 @@ static inline void dbgMutexUnlockLog(pthread_mutex_t *pmut, dbgFuncDB_t *pFuncDB
dbgMutLogDelEntry(pLog);
pthread_mutex_unlock(&mutMutLog);
- dbgprintf("%s:%d:%s: mutex %p UNlocked\n", pFuncDB->file, unlockLn, pFuncDB->func, (void*)pmut);
+ if(bPrintMutexAction)
+ dbgprintf("%s:%d:%s: mutex %p UNlocked\n", pFuncDB->file, unlockLn, pFuncDB->func, (void*)pmut);
}
@@ -685,6 +694,7 @@ dbgprintf(char *fmt, ...)
static char pszThrdName[64]; /* 64 is to be on the safe side, anything over 20 is bad... */
static char pszWriteBuf[1024];
size_t lenWriteBuf;
+ struct timespec t;
if(!(Debug && debugging_on))
return;
@@ -717,6 +727,11 @@ dbgprintf(char *fmt, ...)
dbgGetThrdName(pszThrdName, sizeof(pszThrdName), ptLastThrdID, 0);
if(bWasNL) {
+ if(bPrintTime) {
+ clock_gettime(CLOCK_REALTIME, &t);
+ fprintf(stddbg, "%4.4ld.%9.9ld:", t.tv_sec % 1000, t.tv_nsec);
+ if(altdbg != NULL) fprintf(altdbg, "%4.4ld.%9.9ld:", t.tv_sec % 1000, t.tv_nsec);
+ }
fprintf(stddbg, "%s: ", pszThrdName);
if(altdbg != NULL) fprintf(altdbg, "%s: ", pszThrdName);
}
diff --git a/obj-types.h b/obj-types.h
index 7eff503d..1e607d0c 100644
--- a/obj-types.h
+++ b/obj-types.h
@@ -100,16 +100,21 @@ typedef struct obj { /* the dummy struct that each derived class can be casted t
objInfo_t *pObjInfo; \
unsigned int iObjCooCKiE; /* prevent name conflict, thus the strange name */
# define ISOBJ_assert(pObj) \
- { \
+ do { \
+ if(pObj == NULL) dbgPrintAllDebugInfo(); \
assert((pObj) != NULL); \
+ if(((obj_t*)(pObj))->iObjCooCKiE != (unsigned) 0xBADEFEE) dbgPrintAllDebugInfo(); \
assert((unsigned) ((obj_t*)(pObj))->iObjCooCKiE == (unsigned) 0xBADEFEE); \
- }
+ } while(0);
# define ISOBJ_TYPE_assert(pObj, objType) \
- { \
+ do { \
+ if(pObj == NULL) dbgPrintAllDebugInfo(); \
assert(pObj != NULL); \
+ if(((obj_t*)(pObj))->iObjCooCKiE != (unsigned) 0xBADEFEE) dbgPrintAllDebugInfo(); \
assert((unsigned) pObj->iObjCooCKiE == (unsigned) 0xBADEFEE); \
+ if(objGetObjID(pObj) != OBJ##objType) dbgPrintAllDebugInfo(); \
assert(objGetObjID(pObj) == OBJ##objType); \
- }
+ } while(0);
#else /* non-debug mode, no checks but much faster */
# define BEGINobjInstance objInfo_t *pObjInfo;
# define ISOBJ_TYPE_assert(pObj, objType)
diff --git a/queue.c b/queue.c
index 320b3385..421fd651 100644
--- a/queue.c
+++ b/queue.c
@@ -62,7 +62,7 @@ DEFobjStaticHelpers
/* forward-definitions */
rsRetVal queueChkPersist(queue_t *pThis);
-static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly);
+static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex);
static int queueChkStopWrkrDA(queue_t *pThis);
static int queueIsIdleDA(queue_t *pThis);
static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
@@ -81,9 +81,11 @@ static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2);
* do it at some later time, because we need to destruct the DA queue. That,
* however, can not be done in a thread that has been signalled
* This is to be called when we revert back to our own queue.
+ * This function must be called with the queue mutex locked (the wti
+ * class ensures this).
* rgerhards, 2008-01-15
*/
-static inline rsRetVal
+static rsRetVal
queueTurnOffDAMode(queue_t *pThis)
{
DEFiRet;
@@ -91,31 +93,69 @@ queueTurnOffDAMode(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
assert(pThis->bRunsDA);
+ /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
+ * to wait for its startup... -- rgerhards, 2008-01-25
+ */
+ while(pThis->bRunsDA != 2) {
+ d_pthread_cond_wait(&pThis->condDAReady, pThis->mut);
+ }
/* if we need to pull any data that we still need from the (child) disk queue,
* now would be the time to do so. At present, we do not need this, but I'd like to
* keep that comment if future need arises.
*/
- /* we start at least one worker thread. If no new messages come in, this will
- * be the only one for the time being. I am not yet sure if that is acceptable.
- * To solve that issue, queueWorker () would need to check if it needs to fire
- * up addtl ones. I am not yet sure if that is justified. After all, if no new
- * messages come into the queue, we may be well off with a single worker.
- * rgerhards, 2008-01-16
+ /* we need to check if the DA queue is empty because the DA worker may simply have
+ * terminated do to no new messages arriving. That does not, however, mean that the
+ * DA queue is empty. If there is still data in that queue, we do nothing and leave
+ * that for a later incarnation of this function (it will be called multiple times
+ * during the lifetime of DA-mode, depending on how often the DA worker receives an
+ * inactivity timeout. -- rgerhards, 2008-01-25
*/
+RUNLOG_VAR("%p", pThis->pqDA);
+RUNLOG_VAR("%d", pThis->pqDA->iQueueSize);
+ if(pThis->pqDA->iQueueSize == 0) {
dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQueInDestr %d, NumWrkd %d\n",
queueGetID(pThis),
pThis->bEnqOnly,pThis->bQueueInDestruction,pThis->iCurNumWrkThrd);
- // TODO: mutex?
- pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
- /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
- * this will be quick.
- */
- queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
+ pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
+ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
+ * this will be quick.
+ */
+ queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
+ dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
+ queueGetID(pThis), iRet);
+ }
+
+ RETiRet;
+}
+
+
+
+/* returns the number of workers that should be advised at
+ * this point in time. The mutex must be locked when
+ * ths function is called. -- rgerhards, 2008-01-25
+ */
+static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
+{
+ DEFiRet;
+ int iMaxWorkers;
- dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
- queueGetID(pThis), iRet);
+ ISOBJ_TYPE_assert(pThis, queue);
+
+RUNLOG_VAR("%d", pThis->bEnqOnly);
+ if(!pThis->bEnqOnly) {
+ if(pThis->bRunsDA) {
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
+ } else {
+ if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
+ iMaxWorkers = 1;
+ } else {
+ iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1;
+ }
+ wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
+ }
+ }
RETiRet;
}
@@ -151,7 +191,10 @@ RUNLOG_VAR("%s", pThis->pszFilePrefix);
* chore of this function is to create the DA queue object. If that function fails,
* the DA worker should return with an appropriate state, which in turn should lead to
* a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this
- * function is called, else a race on pThis->bRunsDA may happen.
+ * function is called, else a number of races will happen.
+ * Please note that this function may be called *while* we in DA mode. This is due to the
+ * fact that the DA worker calls it and the DA worker may be suspended (and restarted) due
+ * to inactivity timeouts.
* rgerhards, 2008-01-15
*/
static rsRetVal
@@ -161,6 +204,9 @@ queueStartDA(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
+ if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */
+ FINALIZE; /* ... then we are already done! */
+
/* set up sync objects */
pthread_mutex_init(&pThis->mutDA, NULL);
pthread_cond_init(&pThis->condDA, NULL);
@@ -180,7 +226,7 @@ dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pT
CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
- CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly));
+ CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
if(pThis->toQShutdown == 0) {
@@ -207,7 +253,8 @@ dbgprintf("Queue %p: queueStartDA pre start\n", pThis);
*/
wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */
- pThis->bRunsDA = 1; /* we are now in DA mode! */
+ pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */
+ pthread_cond_signal(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n",
queueGetID(pThis), queueGetID(pThis->pqDA));
@@ -248,7 +295,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
* rgerhards, 2008-01-24
*/
if(pThis->pWtpDA == NULL) {
- lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx/DA", (unsigned long) pThis);
+ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx:DA", (unsigned long) pThis);
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, queueChkStopWrkrDA));
@@ -273,8 +320,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
* that will also start one up. If we forgot that step, everything would be stalled
* until the next enqueue request.
*/
- if(pThis->bEnqOnly == 0)
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* one worker only for disk queues! */
+ queueAdviseMaxWorkers(pThis);
finalize_it:
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -283,7 +329,9 @@ finalize_it:
/* check if we need to start disk assisted mode and send some signals to
- * keep it running if we are already in it.
+ * keep it running if we are already in it. It also checks if DA mode is
+ * partially initialized, in which case it waits for initialization to
+ * complete.
* rgerhards, 2008-01-14
*/
static inline rsRetVal
@@ -310,7 +358,7 @@ dbgprintf("Queue %p: chkStartDA\n", pThis);
*/
dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n",
queueGetID(pThis), pThis->iQueueSize);
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* run again [see comment above] ;) */
+ queueAdviseMaxWorkers(pThis);
} else {
/* this is the case when we are currently not running in DA mode. So it is time
* to turn it back on.
@@ -779,7 +827,6 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
{
DEFiRet;
DEFVARS_mutexProtection;
- int i;
struct timespec tTimeout;
rsRetVal iRetLocal;
@@ -788,15 +835,20 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis));
// TODO: reminder, delte after testing: do we need to modify the high wtr mark? I dont' think so 2008-01-25
+ /* we reduce the low water mark in any case. This is not absolutely necessary, but
+ * it is useful because we enable DA mode at several spots below and so we do not need
+ * to think about the low water mark each time.
+ */
+ pThis->iLowWtrMrk = 0;
+
/* first try to shutdown the queue within the regular shutdown period */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
if(pThis->iQueueSize > 0) {
if(pThis->bRunsDA) {
- /* worker threads may be inactive after reaching low water
- * mark. Lower the mark and react workers.
+ /* We may have waited on the low water mark. As it may have changed, we
+ * see if we reactivate the worker.
*/
- pThis->iLowWtrMrk = 0;
- wtpAdviseMaxWorkers(pThis->pWtpReg, 1);
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
}
}
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -843,17 +895,22 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
}
/* when we reach this point, both queues are either empty or the regular queue shutdown timeout
- * has expired. Now we need to check if we areconfigured to not loose messages. If so, we need
+ * has expired. Now we need to check if we are configured to not loose messages. If so, we need
* to persist the queue to disk (this is only possible if the queue is DA-enabled).
*/
+// TODO: what about pure disk queues and bSaveOnShutdown?
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
+RUNLOG_VAR("%d", pThis->bSaveOnShutdown);
+RUNLOG_VAR("%d", pThis->bIsDA);
+RUNLOG_VAR("%d", pThis->iQueueSize);
if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) {
+RUNLOG;
/* switch to enqueue-only mode so that no more actions happen */
if(pThis->bRunsDA == 0) {
queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
} else {
- queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* switch to enqueue-only mode */
+ queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
}
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
/* make sure we do not timeout before we are done */
@@ -870,10 +927,12 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
+RUNLOG;
/* now the primary queue is either empty, persisted to disk - or set to loose messages. So we
* can now request immediate shutdown of any remaining workers.
*/
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+RUNLOG_VAR("%d", pThis->iQueueSize);
if(pThis->iQueueSize > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
@@ -895,9 +954,10 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25
*/
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+RUNLOG_VAR("%d", pThis->iQueueSize);
if(pThis->iQueueSize > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgprintf("Queue 0x%lx: primary queue worker threads could not be shutdown, now canceling them\n",
+ dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the primary queue\n",
queueGetID(pThis));
iRetLocal = wtpCancelAll(pThis->pWtpReg);
if(iRetLocal != RS_RET_OK) {
@@ -913,7 +973,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgprintf("Queue 0x%lx: DA worker threads could not be shutdown, now canceling them\n",
+ dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n",
queueGetID(pThis));
iRetLocal = wtpCancelAll(pThis->pWtpReg);
if(iRetLocal != RS_RET_OK) {
@@ -948,8 +1008,6 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
DEFiRet;
queue_t *pThis;
-int *pBoom = NULL;
-//*pBoom = 'A';
assert(ppThis != NULL);
assert(pConsumer != NULL);
assert(iWorkerThreads >= 0);
@@ -1196,7 +1254,11 @@ static int
queueIsIdleDA(queue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
- return (pThis->iQueueSize == 0);
+RUNLOG_VAR("%d", pThis->iLowWtrMrk);
+dbgprintf("queueIsIdleDA(%p) returns %d, qsize %d\n", pThis, pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk), pThis->iQueueSize);
+ //// TODO: I think we need just a single function...
+ //return (pThis->iQueueSize == 0);
+ return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
}
/* must only be called when the queue mutex is locked, else results
* are not stable! Regular queue version
@@ -1235,6 +1297,7 @@ dbgprintf("Queue %p: I am child, use mutex %p\n", pThis, pThis->pqParent->mut);
}
pthread_mutex_init(&pThis->mutThrdMgmt, NULL);
+ pthread_cond_init (&pThis->condDAReady, NULL);
pthread_cond_init (&pThis->notFull, NULL);
pthread_cond_init (&pThis->notEmpty, NULL);
dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
@@ -1242,8 +1305,8 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
- dbgprintf("Queue 0x%lx: type %d, enq-only %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis),
- pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize);
+ dbgprintf("Queue 0x%lx: type %d, enq-only %d, disk assisted %d, maxFileSz %ld, qsize %d starting\n", queueGetID(pThis),
+ pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iQueueSize);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
@@ -1251,7 +1314,7 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
/* create worker thread pools for regular operation. The DA pool is created on an as-needed
* basis, which potentially means never under most circumstances.
*/
- lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx/Reg", (unsigned long) pThis);
+ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx:Reg", (unsigned long) pThis);
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, queueChkStopWrkrReg));
@@ -1265,20 +1328,18 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
CHKiRet(wtpConstructFinalize (pThis->pWtpReg));
/* initialize worker thread instances */
+RUNLOG_VAR("%d", pThis->bIsDA);
if(pThis->bIsDA) {
/* If we are disk-assisted, we need to check if there is a QIF file
* which we need to load. -- rgerhards, 2008-01-15
*/
iRetLocal = queueHaveQIF(pThis);
+RUNLOG_VAR("%d", iRetLocal);
if(iRetLocal == RS_RET_OK) {
dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n",
queueGetID(pThis));
-
+RUNLOG;
queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
- /* we need to start the DA worker thread so that messages will be processed. So
- * we advise the worker pool there is at least one needed. The wtp does the rest...
- */
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
bInitialized = 1; /* we are done */
} else {
// TODO: use logerror? -- rgerhards, 2008-01-16
@@ -1287,11 +1348,17 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
}
}
+RUNLOG_VAR("%d", bInitialized);
if(!bInitialized) {
- dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis));
- /* we do not fire up any worker threads here, this happens automatically when they are needed */
- // TODO: preforked workers? queueStrtAllWrkThrds(pThis);
+ dbgprintf("Queue 0x%lx: queue starts up without (loading) any DA disk state (this is normal for the DA "
+ "queue itself!)\n", queueGetID(pThis));
}
+
+ /* if the queue already contains data, we need to start the correct number of worker threads. This can be
+ * the case when a disk queue has been loaded. If we did not start it here, it would never start.
+ */
+ queueAdviseMaxWorkers(pThis);
+
pThis->bQueueStarted = 1;
finalize_it:
@@ -1411,12 +1478,21 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
/* shut down all workers (handles *all* of the persistence logic) */
- queueShutdownWorkers(pThis);
+ if(!pThis->bEnqOnly) /* in enque-only mode, we have no worker pool! */
+ queueShutdownWorkers(pThis);
+RUNLOG;
- /* finally destruct our (regular) worker thread pool */
- if(pThis->qType != QUEUETYPE_DIRECT) {
+ /* finally destruct our (regular) worker thread pool
+ * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,
+ * e.g. when they are not created in enqueue-only mode. We already check the condition
+ * as this may otherwise be very hard to find once we optimize (and have long forgotten
+ * about this condition here ;)
+ * rgerhards, 2008-01-25
+ */
+ if(pThis->qType != QUEUETYPE_DIRECT && pThis->pWtpReg != NULL) {
wtpDestruct(&pThis->pWtpReg);
}
+RUNLOG;
/* Now check if we actually have a DA queue and, if so, destruct it.
* Note that the wtp must be destructed first, it may be in cancel cleanup handler
@@ -1424,10 +1500,16 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
* data (re-queueing case). So we need to destruct the wtp first, which will make
* sure all workers have terminated.
*/
+RUNLOG_VAR("%p", pThis->pWtpDA);
if(pThis->pWtpDA != NULL) {
+RUNLOG;
wtpDestruct(&pThis->pWtpDA);
+RUNLOG_VAR("%p", pThis->pqDA);
+ }
+ if(pThis->pqDA != NULL) {
queueDestruct(&pThis->pqDA);
}
+RUNLOG;
/* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty)
* This handler is most important for disk queues, it will finally persist the necessary
@@ -1447,6 +1529,7 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
free(pThis->mut);
}
pthread_mutex_destroy(&pThis->mutThrdMgmt);
+ pthread_cond_destroy(&pThis->condDAReady);
pthread_cond_destroy(&pThis->notFull);
pthread_cond_destroy(&pThis->notEmpty);
@@ -1522,7 +1605,6 @@ queueEnqObj(queue_t *pThis, void *pUsr)
{
DEFiRet;
int iCancelStateSave;
- int iMaxWorkers;
int i;
struct timespec t;
@@ -1530,6 +1612,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
// TODO: check if queue is terminating and if so either discard message or enqeue it to the DA queue *directly*
dbgprintf("Queue %p: EnqObj() 1\n", pThis);
+RUNLOG_VAR("%d", pThis->bRunsDA);
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
* during its execution. If that is not done, race conditions occur if the
@@ -1544,25 +1627,10 @@ dbgprintf("Queue %p: EnqObj() 1\n", pThis);
/* first check if we need to discard this message (which will cause CHKiRet() to exit) */
CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
-dbgprintf("Queue %p: EnqObj() 10\n", pThis);
/* then check if we need to add an assistance disk queue */
if(pThis->bIsDA)
CHKiRet(queueChkStrtDA(pThis));
-RUNLOG_VAR("%d", pThis->bIsDA);
- /* make sure at least one worker is running. */
- if(pThis->bRunsDA) {
-RUNLOG;
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
- } else {
- if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
- iMaxWorkers = 1;
- } else {
- iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1;
- }
-RUNLOG;
- wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
- }
/* wait for the queue to be ready... */
while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
@@ -1576,6 +1644,8 @@ RUNLOG;
}
/* and finally enqueue the message */
+RUNLOG_VAR("%p", pThis);
+RUNLOG_VAR("%d", pThis->bRunsDA);
CHKiRet(queueAdd(pThis, pUsr));
queueChkPersist(pThis);
@@ -1587,6 +1657,8 @@ finalize_it:
pthread_setcancelstate(iCancelStateSave, NULL);
}
+ /* make sure at least one worker is running. */
+ queueAdviseMaxWorkers(pThis);
RETiRet;
}
@@ -1601,10 +1673,10 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static rsRetVal
-queueSetEnqOnly(queue_t *pThis, int bEnqOnly)
+queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex)
{
DEFiRet;
- int iCancelStateSave;
+ DEFVARS_mutexProtection;
ISOBJ_TYPE_assert(pThis, queue);
@@ -1612,8 +1684,7 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly)
* called, so that doesn't matter... -- rgerhards, 2008-01-16
*/
if(pThis->mut != NULL) {
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(pThis->mut);
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
}
if(bEnqOnly == pThis->bEnqOnly)
@@ -1626,8 +1697,10 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly)
/* this means we need to terminate all workers - that's it... */
dbgprintf("Queue 0x%lx: switching to enqueue-only mode, terminating all worker threads\n",
queueGetID(pThis));
- wtpWakeupAllWrkr(pThis->pWtpDA);
- wtpWakeupAllWrkr(pThis->pWtpReg);
+ if(pThis->pWtpReg != NULL)
+ wtpWakeupAllWrkr(pThis->pWtpReg);
+ if(pThis->pWtpDA != NULL)
+ wtpWakeupAllWrkr(pThis->pWtpDA);
} else {
/* switch back to regular mode */
ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */
@@ -1638,8 +1711,7 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly)
finalize_it:
if(pThis->mut != NULL) {
- d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
RETiRet;
}
diff --git a/queue.h b/queue.h
index ee2637da..70f5b925 100644
--- a/queue.h
+++ b/queue.h
@@ -90,7 +90,8 @@ typedef struct queue_s {
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */
pthread_cond_t notFull, notEmpty;
- pthread_cond_t condThrdTrm;/* signalled when threads terminate */
+ pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */
+ pthread_cond_t condThrdTrm;/* signalled when threads terminate */ // TODO: no longer used?
pthread_cond_t *condSignalOnEmpty;/* caller-provided condition to be signalled when queue is empty (DA mode!) */
pthread_mutex_t *mutSignalOnEmpty; /* and its associated mutex */
pthread_cond_t *condSignalOnEmpty2;/* another condition to be signalled on empty */
@@ -112,7 +113,7 @@ typedef struct queue_s {
int bIsDA; /* is this queue disk assisted? */
int bRunsDA; /* is this queue actually *running* disk assisted? */
pthread_mutex_t mutDA; /* mutex for low water mark algo */
- pthread_cond_t condDA; /* and its matching condition */
+ pthread_cond_t condDA; /* and its matching condition */ // TODO: no longer needed!
struct queue_s *pqDA; /* queue for disk-assisted modes */
struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
int bDAEnqOnly; /* EnqOnly setting for DA queue */
diff --git a/srUtils.c b/srUtils.c
index 4c64ce89..efcd7e6d 100755
--- a/srUtils.c
+++ b/srUtils.c
@@ -354,9 +354,9 @@ timeoutVal(struct timespec *pt)
iTimeout = (pt->tv_nsec - t.tv_nsec) / 1000;
}
} else {
- iTimeout = pt->tv_sec - t.tv_nsec;
- iTimeout += 1000 - (pt->tv_nsec / 1000);
- iTimeout += t.tv_nsec / 1000;
+ iTimeout = (pt->tv_sec - t.tv_sec) * 1000;
+ iTimeout += 1000 - (pt->tv_nsec / 1000000);
+ iTimeout += t.tv_nsec / 1000000;
}
return iTimeout;
diff --git a/srUtils.h b/srUtils.h
index d0d34f37..73109c30 100755
--- a/srUtils.h
+++ b/srUtils.h
@@ -89,14 +89,12 @@ void mutexCancelCleanup(void *arg);
#define LOCK_MUTEX 1
#define DEFVARS_mutexProtection\
int iCancelStateSave; \
- int bLockedOpIsLocked
+ int bLockedOpIsLocked=0
#define BEGIN_MTX_PROTECTED_OPERATIONS(mut, bMustLock) \
if(bMustLock == LOCK_MUTEX) { \
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \
d_pthread_mutex_lock(mut); \
bLockedOpIsLocked = 1; \
- } else { \
- bLockedOpIsLocked = 0; \
}
#define END_MTX_PROTECTED_OPERATIONS(mut) \
if(bLockedOpIsLocked) { \
diff --git a/syslogd.c b/syslogd.c
index 09a246f9..f2d13985 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -2577,6 +2577,7 @@ static void doDie(int sig)
{
static int iRetries = 0; /* debug aid */
dbgprintf("DoDie called.\n");
+ dbgPrintAllDebugInfo();
if(iRetries++ == 4) {
dbgprintf("DoDie called 5 times - unconditional exit\n");
exit(1);
diff --git a/wti.c b/wti.c
index 2e1dd548..3f60afb2 100644
--- a/wti.c
+++ b/wti.c
@@ -91,7 +91,6 @@ wtiGetState(wti_t *pThis, int bLockMutex)
}
-
/* send a command to a specific thread
* bActiveOnly specifies if the command should be sent only when the worker is
* in an active state. -- rgerhards, 2008-01-20
@@ -107,7 +106,6 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
-RUNLOG_VAR("%d", bActiveOnly);
/* all worker states must be followed sequentially, only termination can be set in any state */
if( (bActiveOnly && (pThis->tCurrCmd < eWRKTHRD_RUN_CREATED))
|| (pThis->tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) {
@@ -116,14 +114,13 @@ RUNLOG_VAR("%d", bActiveOnly);
} else {
dbgprintf("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd);
switch(tCmd) {
- case eWRKTHRD_RUN_CREATED:
- break;
case eWRKTHRD_TERMINATING:
/* TODO: re-enable meaningful debug msg! (via function callback?)
dbgprintf("%s: thread terminating with %d entries left in queue, %d workers running.\n",
wtiGetDbgHdr(pThis->pQueue), pThis->pQueue->iQueueSize,
pThis->pQueue->iCurNumWrkThrd);
*/
+ pthread_cond_signal(&pThis->condExitDone);
dbgprintf("%s: worker terminating\n", wtiGetDbgHdr(pThis));
break;
case eWRKTHRD_RUNNING:
@@ -131,6 +128,7 @@ RUNLOG_VAR("%d", bActiveOnly);
break;
/* these cases just to satisfy the compiler, we do (yet) not act an them: */
case eWRKTHRD_STOPPED:
+ case eWRKTHRD_RUN_CREATED:
case eWRKTHRD_RUN_INIT:
case eWRKTHRD_SHUTDOWN:
case eWRKTHRD_SHUTDOWN_IMMEDIATE:
@@ -145,6 +143,39 @@ RUNLOG_VAR("%d", bActiveOnly);
}
+#if 0
+/* check if the worker shall shutdown (1 = yes, 0 = no)
+ * TODO: check if we can use atomic operations to enhance performance
+ * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user"
+ * (e.g. the queue clas)
+ * rgerhards, 2008-01-24
+ * TODO: we can optimize this via function pointers, as the code is only called during
+ * termination. So we can call the function via ptr in wtiWorker () and change that pointer
+ * to this function here upon shutdown.
+ */
+static inline rsRetVal
+wtiChkStopWrkr(wti_t *pThis, wtp_t *pWtp, int bLockMutex, int bLockUsrMutex)
+{
+ DEFiRet;
+ DEFVARS_mutexProtection;
+
+ ISOBJ_TYPE_assert(pThis, wti);
+
+ BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ if(pThis->bShutdownRqtd) {
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ iRet = RS_RET_TERMINATE_NOW;
+ } else {
+ /* regular case */
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ iRet = wtpChkStopWrkr(pWtp, bLockMutex, bLockUsrMutex);
+ }
+
+ RETiRet;
+}
+#endif
+
+
/* Destructor */
rsRetVal wtiDestruct(wti_t **ppThis)
{
@@ -160,17 +191,28 @@ rsRetVal wtiDestruct(wti_t **ppThis)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
/* if we reach this point, we must make sure the associated worker has terminated. It is
- * the callers duty to make sure the worker has already terminated.
+ * the callers duty to make sure the worker already knows it shall terminate.
* TODO: is it *really* the caller's duty? ...mmmhhhh.... smells bad... rgerhards, 2008-01-25
*/
wtiProcessThrdChanges(pThis, LOCK_MUTEX); /* process state change one last time */
d_pthread_mutex_lock(&pThis->mut);
- assert(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) <= eWRKTHRD_TERMINATING); // I knew it smelled bad...
+RUNLOG_VAR("%d", pThis->tCurrCmd);
+ if(wtiGetState(pThis, MUTEX_ALREADY_LOCKED) != eWRKTHRD_STOPPED) {
+ dbgprintf("%s: WARNING: worker %p shall be destructed but is still running (might be OK) - joining it\n",
+ wtiGetDbgHdr(pThis), pThis);
+ /* let's hope the caller actually instructed it to shutdown... */
+ pthread_cond_wait(&pThis->condExitDone, &pThis->mut);
+RUNLOG;
+ wtiJoinThrd(pThis);
+RUNLOG;
+ }
+RUNLOG;
d_pthread_mutex_unlock(&pThis->mut);
/* actual destruction */
pthread_cond_destroy(&pThis->condInitDone);
+ pthread_cond_destroy(&pThis->condExitDone);
pthread_mutex_destroy(&pThis->mut);
if(pThis->pszDbgHdr != NULL)
@@ -191,6 +233,7 @@ rsRetVal wtiDestruct(wti_t **ppThis)
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
pthread_cond_init(&pThis->condInitDone, NULL);
+ pthread_cond_init(&pThis->condExitDone, NULL);
pthread_mutex_init(&pThis->mut, NULL);
ENDobjConstruct(wti)
@@ -228,7 +271,7 @@ wtiJoinThrd(wti_t *pThis)
pthread_join(pThis->thrdID, NULL);
RUNLOG;
wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */
-RUNLOG;
+RUNLOG_VAR("%p", pThis->thrdID);
pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
dbgprintf("wti: worker %s has stopped\n", wtiGetDbgHdr(pThis));
@@ -339,7 +382,9 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
+ BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
pWtp->pfOnWorkerStartup(pWtp->pUsr);
+ END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
@@ -354,6 +399,7 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis),
if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED))
|| wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
+ //|| wtiChkStopWrkr(pThis, pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
break; /* end worker thread run */
}
@@ -393,14 +439,13 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis),
// " %d messages to process.\n", wtiGetDbgHdr(pThis), pThis->iQueueSize);
}
- pWtp->pfOnWorkerShutdown(pWtp->pUsr);
-
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
-dbgprintf("%s: worker waiting for mutex\n", wtiGetDbgHdr(pThis));
d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_pop(0); /* remove cleanup handler */
+ pWtp->pfOnWorkerShutdown(pWtp->pUsr);
+
// TODO: I think we no longer need that - but check!
#if 0
/* if we ever need finalize_it, here would be the place for it! */
diff --git a/wti.h b/wti.h
index db782d2b..8bd47d1d 100644
--- a/wti.h
+++ b/wti.h
@@ -35,7 +35,9 @@ typedef struct wti_s {
obj_t *pUsrp; /* pointer to an object meaningful for current user pointer (e.g. queue pUsr data elemt) */
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */
+ pthread_cond_t condExitDone; /* signaled when the thread exit is done (once per thread existance) */
pthread_mutex_t mut;
+ int bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */
uchar *pszDbgHdr; /* header string for debug messages */
} wti_t;
diff --git a/wtp.c b/wtp.c
index 4133e7b4..1bda60e8 100644
--- a/wtp.c
+++ b/wtp.c
@@ -135,8 +135,6 @@ wtpDestruct(wtp_t **ppThis)
int iCancelStateSave;
int i;
-dbgPrintAllDebugInfo();
-RUNLOG;
assert(ppThis != NULL);
pThis = *ppThis;
ISOBJ_TYPE_assert(pThis, wtp);
@@ -179,9 +177,7 @@ wtpWakeupWrkr(wtp_t *pThis)
// TODO; mutex?
ISOBJ_TYPE_assert(pThis, wtp);
-dbgprintf("wtpWakeupWrkr 1, cond %p\n", pThis->pcondBusy);
pthread_cond_signal(pThis->pcondBusy);
-dbgprintf("wtpWakeupWrkr 2\n");
RETiRet;
}
/* wake up all worker threads.
@@ -211,10 +207,8 @@ wtpProcessThrdChanges(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
- RUNLOG;
if(pThis->bThrdStateChanged == 0)
FINALIZE;
- RUNLOG;
/* go through all threads */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
@@ -222,7 +216,6 @@ wtpProcessThrdChanges(wtp_t *pThis)
}
finalize_it:
- RUNLOG;
RETiRet;
}
@@ -255,6 +248,8 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
DEFiRet;
DEFVARS_mutexProtection;
+ ISOBJ_TYPE_assert(pThis, wtp);
+
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
|| ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex)))
@@ -281,14 +276,17 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
int bTimedOut;
int iCancelStateSave;
-dbgPrintAllDebugInfo();
-RUNLOG_VAR("%p", pThis);
-RUNLOG_VAR("%d", tShutdownCmd);
ISOBJ_TYPE_assert(pThis, wtp);
wtpSetState(pThis, tShutdownCmd);
wtpWakeupAllWrkr(pThis);
+
+ /* see if we need to harvest (join) any terminated threads (even in timeout case,
+ * some may have terminated...
+ */
+ wtpProcessThrdChanges(pThis);
+RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
/* and wait for their termination */
dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
@@ -297,6 +295,7 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
+RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd);
@@ -315,7 +314,6 @@ dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
*/
wtpProcessThrdChanges(pThis);
-dbgprintf("wtpShutdownAll exit");
RETiRet;
}
@@ -346,6 +344,7 @@ wtpCancelAll(wtp_t *pThis)
{
DEFiRet;
int i;
+ int numCancelled = 0;
// TODO: mutex?? // TODO: cancellation in wti!
ISOBJ_TYPE_assert(pThis, wtp);
@@ -353,19 +352,17 @@ wtpCancelAll(wtp_t *pThis)
/* process any pending thread requests so that we know who actually is still running */
wtpProcessThrdChanges(pThis);
-RUNLOG_VAR("%d", pThis->iNumWorkerThreads);;
/* first tell the workers our request */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
// TODO: mutex lock!
-RUNLOG_VAR("%p", pThis->pWrkr[i]);
if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) {
-RUNLOG;
dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
pthread_cancel(pThis->pWrkr[i]->thrdID);
+ ++numCancelled;
}
}
-RUNLOG;
+ dbgprintf("%s: cancelled %d worker threads\n", wtpGetDbgHdr(pThis), numCancelled);
RETiRet;
}
@@ -380,13 +377,9 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex)
DEFiRet;
DEFVARS_mutexProtection;
-RUNLOG;
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
-RUNLOG;
pThis->bInactivityGuard = bNewState;
-RUNLOG;
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
-RUNLOG;
RETiRet;
}
@@ -403,6 +396,7 @@ wtpWrkrExecCancelCleanup(void *arg)
ISOBJ_TYPE_assert(pThis, wtp);
pThis->iCurNumWrkThrd--;
+RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
wtpSignalWrkrTermination(pThis);
dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd);
@@ -459,6 +453,7 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
pthread_cleanup_pop(0);
pThis->iCurNumWrkThrd--;
+RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
wtpSignalWrkrTermination(pThis);
dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n",
@@ -488,6 +483,8 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
pThis->iCurNumWrkThrd++;
+dbgPrintAllDebugInfo();
+RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
/* find free spot in thread table. If we find at least one worker that is in initialization,
* we do NOT start a new one. Let's give the other one a chance, first.
@@ -538,7 +535,6 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
int nMissing; /* number workers missing to run */
int i;
- if(pThis == NULL) dbgPrintAllDebugInfo();
ISOBJ_TYPE_assert(pThis, wtp);
dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetDbgHdr(pThis), nMaxWrkr, pThis->iCurNumWrkThrd, pThis->iNumWorkerThreads);
@@ -547,11 +543,10 @@ dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetD
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
+ if(nMaxWrkr > pThis->iNumWorkerThreads) /* limit to configured maximum */
+ nMaxWrkr = pThis->iNumWorkerThreads;
+
nMissing = nMaxWrkr - pThis->iCurNumWrkThrd;
- if(nMissing > pThis->iNumWorkerThreads)
- nMissing = pThis->iNumWorkerThreads;
- else if(nMissing < 0)
- nMissing = 0;
if(nMissing > 0) {
dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
@@ -559,9 +554,11 @@ dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetD
for(i = 0 ; i < nMissing ; ++i) {
CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED));
}
- } else {
-dbgprintf("wtpAdviseMaxWorkers signals busy\n");
- wtpWakeupWrkr(pThis);
+ } else {
+ if(nMaxWrkr > 0) {
+ dbgprintf("wtpAdviseMaxWorkers signals busy\n");
+ wtpWakeupWrkr(pThis);
+ }
}
diff --git a/wtp.h b/wtp.h
index 37cbd7e9..7df3166b 100644
--- a/wtp.h
+++ b/wtp.h
@@ -108,6 +108,7 @@ PROTOTYPEpropSetMeth(wtp, toWrkShutdown, long);
PROTOTYPEpropSetMeth(wtp, wtpState, wtpState_t);
PROTOTYPEpropSetMeth(wtp, iMaxWorkerThreads, int);
PROTOTYPEpropSetMeth(wtp, pUsr, void*);
+PROTOTYPEpropSetMeth(wtp, iNumWorkerThreads, int);
PROTOTYPEpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t);
PROTOTYPEpropSetMethPTR(wtp, pcondBusy, pthread_cond_t);