summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--debug.c2
-rw-r--r--queue.c84
-rw-r--r--queue.h1
-rw-r--r--rsyslog.h1
-rw-r--r--wti.c57
-rw-r--r--wtp.c30
6 files changed, 47 insertions, 128 deletions
diff --git a/debug.c b/debug.c
index b1a35084..4605eec8 100644
--- a/debug.c
+++ b/debug.c
@@ -57,7 +57,7 @@ static dbgThrdInfo_t *dbgGetThrdInfo(void);
int Debug; /* debug flag - read-only after startup */
int debugging_on = 0; /* read-only, except on sig USR1 */
static int bLogFuncFlow = 0; /* shall the function entry and exit be logged to the debug log? */
-static int bPrintFuncDBOnExit = 1; /* shall the function entry and exit be logged to the debug log? */
+static int bPrintFuncDBOnExit = 0; /* shall the function entry and exit be logged to the debug log? */
static int bPrintMutexAction = 0; /* shall mutex calls be printed to the debug log? */
static int bPrintTime = 1; /* print a timestamp together with debug message */
static char *pszAltDbgFileName = NULL; /* if set, debug output is *also* sent to here */
diff --git a/queue.c b/queue.c
index 0e0a5a83..fa682765 100644
--- a/queue.c
+++ b/queue.c
@@ -1,13 +1,3 @@
-// TODO: DA worker must not wait eternal on shutdown when in enqueue only mode!
-// TODO: we need to implement peek(), without it (today!) we lose one message upon
-// worker cancellation! -- rgerhards, 2008-01-14
-// TODO: think about mutDA - I think it's no longer needed
-// TODO: start up the correct num of workers when switching to non-DA mode
-// TODO: "preforked" worker threads
-// TODO: do an if(debug) in dbgrintf - performance in release build!
-// TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in
-// call consumer state. Facilitates retaining messages in queue until action could
-// be called!
/* queue.c
*
* This file implements the queue object and its several queueing methods.
@@ -218,9 +208,6 @@ queueStartDA(queue_t *pThis)
if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */
FINALIZE; /* ... then we are already done! */
- /* set up sync objects */
- pthread_mutex_init(&pThis->mutDA, NULL);
-
/* create message queue */
dbgprintf("Queue %p: queueSTrtDA pre child queue construct,\n", pThis);
CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
@@ -309,12 +296,12 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx:DA", (unsigned long) pThis);
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, queueChkStopWrkrDA));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, queueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, queueConsumerDA));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, queueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, queueStartDA));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, queueTurnOffDAMode));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) queueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueStartDA));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueTurnOffDAMode));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
@@ -564,7 +551,7 @@ queueHaveQIF(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
if(pThis->pszFilePrefix == NULL)
- ABORT_FINALIZE(RS_RET_ERR); // TODO: change code!
+ ABORT_FINALIZE(RS_RET_NO_FILEPREFIX);
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
@@ -921,17 +908,16 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
* set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate as soon as its consumer
* is done. This is especially important as we otherwise may interfere with queue order while the
* DA consumer is running. -- rgerhards, 2008-01-27
+ * Note: there was a note that we should not wait eternally on the DA worker if we run in
+ * enqueue-only note. I have reviewed the code and think there is no need for this check. Howerver,
+ * I'd like to keep this note in here should we happen to run into some related trouble.
+ * rgerhards, 2008-01-28
*/
wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */
-// TODO: what about pure disk queues and bSaveOnShutdown?
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
-//RUNLOG_VAR("%d", pThis->bSaveOnShutdown);
-//RUNLOG_VAR("%d", pThis->bIsDA);
-//RUNLOG_VAR("%d", pThis->iQueueSize);
if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) {
-//RUNLOG;
/* switch to enqueue-only mode so that no more actions happen */
if(pThis->bRunsDA == 0) {
queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
@@ -953,21 +939,17 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
-RUNLOG;
/* now the primary queue is either empty, persisted to disk - or set to loose messages. So we
* can now request immediate shutdown of any remaining workers. Note that if bSaveOnShutdown was set,
* the queue is now empty. If regular workers are still running, and try to pull the next message,
* they will automatically terminate as there no longer is any message left to process.
*/
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
-RUNLOG_VAR("%d", pThis->iQueueSize);
- //old: if(pThis->iQueueSize > 0) {
if(pThis->iQueueSize > 0) {
timeoutComp(&tTimeout, pThis->toActShutdown);
if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
dbgprintf("Queue 0x%lx: trying immediate shutdown of regular workers\n", queueGetID(pThis));
- // TODO: ??? cut&paste error? iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and "
@@ -1047,8 +1029,6 @@ RUNLOG_VAR("%d", pThis->iQueueSize);
}
}
-// TODO: think about joining all workers, so that the destructors are called
-//
/* ... finally ... all worker threads have terminated :-)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running.
@@ -1152,7 +1132,7 @@ queueConsumerCancelCleanup(void *arg1, void *arg2)
dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n",
queueGetID(pThis));
- /* TODO: re-enqueue the data element! */
+ /* TODO: re-enqueue the data element! This will also make the compiler warning go away... */
RETiRet;
}
@@ -1213,7 +1193,7 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
/* dequeue element (still protected from mutex) */
iRet = queueDel(pThis, &pUsr);
- queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
+ queueChkPersist(pThis);
iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */
bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
pWti->pUsrp = pUsr; /* save it for the cancel cleanup handler */
@@ -1316,12 +1296,6 @@ queueChkStopWrkrDA(queue_t *pThis)
bStopWrkr = 1;
} else {
if(pThis->bRunsDA) {
-#if 0
-RUNLOG_VAR("%d", pThis->iQueueSize);
-RUNLOG_VAR("%d", pThis->iHighWtrMrk);
-if(pThis->pqDA != NULL)
- RUNLOG_VAR("%d", pThis->pqDA->bQueueStarted);
-#endif
if(pThis->iQueueSize < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
bStopWrkr = 1;
} else {
@@ -1332,7 +1306,6 @@ if(pThis->pqDA != NULL)
}
}
-//RUNLOG_VAR("%d", bStopWrkr);
ENDfunc
return bStopWrkr;
}
@@ -1364,10 +1337,7 @@ queueIsIdleDA(queue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
BEGINfunc
-RUNLOG_VAR("%d", pThis->iLowWtrMrk);
-dbgprintf("queueIsIdleDA(%p) returns %d, qsize %d\n", pThis, pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk), pThis->iQueueSize);
- //// TODO: I think we need just a single function...
- //return (pThis->iQueueSize == 0);
+ /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
ENDfunc
return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
}
@@ -1483,12 +1453,12 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx:Reg", (unsigned long) pThis);
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, queueChkStopWrkrReg));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, queueIsIdleReg));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, queueConsumerReg));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, queueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, queueRegOnWrkrStartup));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, queueRegOnWrkrShutdown));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))queueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrStartup));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
@@ -1511,7 +1481,7 @@ RUNLOG;
queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
bInitialized = 1; /* we are done */
} else {
- // TODO: use logerror? -- rgerhards, 2008-01-16
+ /* TODO: use logerror? -- rgerhards, 2008-01-16 */
dbgprintf("Queue 0x%lx: error %d trying to access on-disk queue files, starting without them. "
"Some data may be lost\n", queueGetID(pThis), iRetLocal);
}
@@ -1556,9 +1526,14 @@ static rsRetVal queuePersist(queue_t *pThis)
assert(pThis != NULL);
if(pThis->qType != QUEUETYPE_DISK) {
- if(pThis->iQueueSize > 0)
- ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */
- else
+ if(pThis->iQueueSize > 0) {
+ /* This error code is OK, but we will probably not implement this any time
+ * The reason is that persistence happens via DA queues. But I would like to
+ * leave the code as is, as we so have a hook in case we need one.
+ * -- rgerhards, 2008-01-28
+ */
+ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED);
+ } else
FINALIZE; /* if the queue is empty, we are happy and done... */
}
@@ -1938,7 +1913,6 @@ finalize_it:
*/
BEGINObjClassInit(queue, 1)
OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
- //OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize);
ENDObjClassInit(queue)
/*
diff --git a/queue.h b/queue.h
index aadb13d7..a05c6341 100644
--- a/queue.h
+++ b/queue.h
@@ -107,7 +107,6 @@ typedef struct queue_s {
size_t iMaxFileSize; /* max size for a single queue file */
int bIsDA; /* is this queue disk assisted? */
int bRunsDA; /* is this queue actually *running* disk assisted? */
- pthread_mutex_t mutDA; /* mutex for low water mark algo */
struct queue_s *pqDA; /* queue for disk-assisted modes */
struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
int bDAEnqOnly; /* EnqOnly setting for DA queue */
diff --git a/rsyslog.h b/rsyslog.h
index dd7b1c1e..2c57e7a4 100644
--- a/rsyslog.h
+++ b/rsyslog.h
@@ -113,6 +113,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_QSIZE_ZERO = -2042, /**< queue size is zero where this is not supported */
RS_RET_ALREADY_STARTING = -2043, /**< something (a thread?) is already starting - not necessarily an error */
RS_RET_NO_MORE_THREADS = -2044, /**< no more threads available, not necessarily an error */
+ RS_RET_NO_FILEPREFIX = -2045, /**< file prefix is not specified where one is needed */
RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */
RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */
RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */
diff --git a/wti.c b/wti.c
index 688f1267..dd0e465e 100644
--- a/wti.c
+++ b/wti.c
@@ -143,39 +143,6 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
}
-#if 0
-/* check if the worker shall shutdown (1 = yes, 0 = no)
- * TODO: check if we can use atomic operations to enhance performance
- * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user"
- * (e.g. the queue clas)
- * rgerhards, 2008-01-24
- * TODO: we can optimize this via function pointers, as the code is only called during
- * termination. So we can call the function via ptr in wtiWorker () and change that pointer
- * to this function here upon shutdown.
- */
-static inline rsRetVal
-wtiChkStopWrkr(wti_t *pThis, wtp_t *pWtp, int bLockMutex, int bLockUsrMutex)
-{
- DEFiRet;
- DEFVARS_mutexProtection;
-
- ISOBJ_TYPE_assert(pThis, wti);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut);
- if(pThis->bShutdownRqtd) {
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
- iRet = RS_RET_TERMINATE_NOW;
- } else {
- /* regular case */
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
- iRet = wtpChkStopWrkr(pWtp, bLockMutex, bLockUsrMutex);
- }
-
- RETiRet;
-}
-#endif
-
-
/* Destructor */
rsRetVal wtiDestruct(wti_t **ppThis)
{
@@ -328,7 +295,7 @@ wtiWorkerCancelCleanup(void *arg)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pWtp->mut);
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
- // TODO: sync access!
+ /* TODO: sync access? I currently think it is NOT needed -- rgerhards, 2008-01-28 */
pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
d_pthread_mutex_unlock(&pWtp->mut);
@@ -393,7 +360,6 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis),
if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED))
|| wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
- //|| wtiChkStopWrkr(pThis, pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
break; /* end worker thread run */
}
@@ -413,7 +379,6 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis),
d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
} else {
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
-dbgprintf("timeout value is %ld\n", timeoutVal(&t));
if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
dbgprintf("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
bInactivityTOOccured = 1; /* indicate we had a timeout */
@@ -427,11 +392,6 @@ dbgprintf("timeout value is %ld\n", timeoutVal(&t));
/* if we reach this point, we have a non-empty queue (and are still protected by mutex) */
dbgprintf("%s: calling consumer\n", wtiGetDbgHdr(pThis));
pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
-
- /* TODO: move this above into one of the chck Term functions */
- //if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0)
- // dbgprintf("%s: worker does not yet terminate because it still has "
- // " %d messages to process.\n", wtiGetDbgHdr(pThis), pThis->iQueueSize);
}
/* indicate termination */
@@ -441,22 +401,7 @@ dbgprintf("timeout value is %ld\n", timeoutVal(&t));
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
- // TODO: I think we no longer need that - but check!
-#if 0
- /* if we ever need finalize_it, here would be the place for it! */
- if(qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN ||
- qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN_IMMEDIATE ||
- qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT ||
- qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_CREATED) {
- /* in shutdown case, we need to flag termination. All other commands
- * have a meaning to the thread harvester, so we can not overwrite them
- */
-dbgprintf("%s: setting termination state\n", wtiGetDbgHdr(pThis));
- wtiSetState(pWrkrInst, eWRKTHRD_TERMINATING, 0);
- }
-#else
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
-#endif
pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
d_pthread_mutex_unlock(&pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
diff --git a/wtp.c b/wtp.c
index a71301a5..63507279 100644
--- a/wtp.c
+++ b/wtp.c
@@ -173,11 +173,12 @@ wtpWakeupWrkr(wtp_t *pThis)
{
DEFiRet;
- // TODO; mutex?
+ /* TODO; mutex? I think not needed, as we do not need predictable exec order -- rgerhards, 2008-01-28 */
ISOBJ_TYPE_assert(pThis, wtp);
pthread_cond_signal(pThis->pcondBusy);
RETiRet;
}
+
/* wake up all worker threads.
* rgerhards, 2008-01-16
*/
@@ -187,7 +188,6 @@ wtpWakeupAllWrkr(wtp_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, wtp);
- // TODO; mutex?
pthread_cond_broadcast(pThis->pcondBusy);
RETiRet;
}
@@ -228,7 +228,7 @@ wtpSetState(wtp_t *pThis, wtpState_t iNewState)
ISOBJ_TYPE_assert(pThis, wtp);
pThis->wtpState = iNewState;
- // TODO: must wakeup workers?
+ /* TODO: must wakeup workers? seen to be not needed -- rgerhards, 2008-01-28 */
RETiRet;
}
@@ -321,14 +321,19 @@ RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
rsRetVal wtpSignalWrkrTermination(wtp_t *pThis)
{
DEFiRet;
- //TODO: mutex or not mutex, that's the question ;)DEFVARS_mutexProtection;
+ /* I leave the mutex code here out as it give as deadlocks. I think it is not really
+ * needed and we are on the safe side. I leave this comment in if practice proves us
+ * wrong. The whole thing should be removed after half a your or year if we see there
+ * actually is no issue (or revisit it from a theoretical POV).
+ * rgerhards, 2008-01-28
+ */
+ /*TODO: mutex or not mutex, that's the question ;)DEFVARS_mutexProtection;*/
ISOBJ_TYPE_assert(pThis, wtp);
- //BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
-dbgprintf("signaling thread termination, cond %p\n", &pThis->condThrdTrm);
+ /*BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);*/
pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
- //END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ /*END_MTX_PROTECTED_OPERATIONS(&pThis->mut);*/
RETiRet;
}
@@ -342,18 +347,16 @@ wtpCancelAll(wtp_t *pThis)
DEFiRet;
int i;
int numCancelled = 0;
- // TODO: mutex?? // TODO: cancellation in wti!
+ /* TODO: mutex?? TODO: cancellation in wti (but OK as is [though ugly form an isolation POV]!) */
ISOBJ_TYPE_assert(pThis, wtp);
/* process any pending thread requests so that we know who actually is still running */
wtpProcessThrdChanges(pThis);
-//RUNLOG_VAR("%d", pThis->iCurNumWrkThrd);
/* go through all workers and cancel those that are active */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- // TODO: mutex lock!
-//RUNLOG_VAR("%d", pThis->pWrkr[i]->tCurrCmd);
+ /* TODO: mutex lock!*/
if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) {
dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
pthread_cancel(pThis->pWrkr[i]->thrdID);
@@ -430,13 +433,11 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
pthread_cleanup_push(wtpWrkrExecCancelCleanup, pThis);
- // TODO: review code below - if still needed (setState yes!)?
/* finally change to RUNNING state. We need to check if we actually should still run,
* because someone may have requested us to shut down even before we got a chance to do
* our init. That would be a bad race... -- rgerhards, 2008-01-16
*/
- //if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT)
- wtiSetState(pWti, eWRKTHRD_RUNNING, 0, MUTEX_ALREADY_LOCKED); /* we are running now! */
+ wtiSetState(pWti, eWRKTHRD_RUNNING, 0, MUTEX_ALREADY_LOCKED); /* we are running now! */
do {
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
@@ -600,7 +601,6 @@ wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex)
iNumWrkr = pThis->iCurNumWrkThrd;
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
-RUNLOG_VAR("%d", iNumWrkr);
ENDfunc
return iNumWrkr;
}