From caf6b75951d087f2406bcdda21a98dc2ee3eb145 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 20 Jul 2009 15:49:50 +0200 Subject: cleanup & better debug message handling the new handling will hopefully spare a few cycles, as function calls (and most importantly parameter generation!) or now only done when debug messages are actually active. --- runtime/queue.c | 131 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 65 insertions(+), 66 deletions(-) diff --git a/runtime/queue.c b/runtime/queue.c index e71e35cf..cb14b58d 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -67,7 +67,6 @@ DEFobjCurrIf(glbl) DEFobjCurrIf(strm) /* forward-definitions */ -static rsRetVal ChkStrtDA(qqueue_t *pThis); static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); static rsRetVal SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); static rsRetVal RateLimiter(qqueue_t *pThis); @@ -209,7 +208,7 @@ static inline void queueDrain(qqueue_t *pThis) ASSERT(pThis != NULL); BEGINfunc - dbgoprint((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); + DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) { pThis->qDeq(pThis, &pUsr); @@ -277,7 +276,7 @@ TurnOffDAMode(qqueue_t *pThis) ASSERT(pThis->bRunsDA); if(getLogicalQueueSize(pThis->pqDA) == 0) { pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ - dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n", + DBGOPRINT((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n", iRet); } @@ -300,9 +299,9 @@ qqueueChkIsDA(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->pszFilePrefix != NULL) { pThis->bIsDA = 1; - dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n"); + DBGOPRINT((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n"); } else { - dbgoprint((obj_t*) pThis, "is NOT disk-assisted\n"); + DBGOPRINT((obj_t*) pThis, "is NOT disk-assisted\n"); } RETiRet; @@ -375,7 +374,7 @@ StartDA(qqueue_t *pThis) //pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ - dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n", + DBGOPRINT((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n", qqueueGetID(pThis->pqDA)); finalize_it: @@ -383,7 +382,7 @@ finalize_it: if(pThis->pqDA != NULL) { qqueueDestruct(&pThis->pqDA); } - dbgoprint((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet); + DBGOPRINT((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet); pThis->bIsDA = 0; } @@ -481,14 +480,14 @@ ChkStrtDA(qqueue_t *pThis) * terminated due to the inactivity timeout, thus we need to advise the pool that * we need at least one). */ - dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n", + DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n", getPhysicalQueueSize(pThis)); qqueueAdviseMaxWorkers(pThis); } else { /* this is the case when we are currently not running in DA mode. So it is time * to turn it back on. */ - dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n", + DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n", getPhysicalQueueSize(pThis)); InitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ } @@ -598,7 +597,7 @@ qUnDeqAllFixedArray(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); - dbgoprint((obj_t*) pThis, "resetting FixedArray deq index to %ld (was %ld), logical dequeue count %d\n", + DBGOPRINT((obj_t*) pThis, "resetting FixedArray deq index to %ld (was %ld), logical dequeue count %d\n", pThis->tVars.farray.head, pThis->tVars.farray.deqhead, pThis->nLogDeq); pThis->tVars.farray.deqhead = pThis->tVars.farray.head; @@ -709,7 +708,7 @@ qUnDeqAllLinkedList(qqueue_t *pThis) ASSERT(pThis != NULL); - dbgoprint((obj_t*) pThis, "resetting LinkedList deq ptr to %p (was %p), logical dequeue count %d\n", + DBGOPRINT((obj_t*) pThis, "resetting LinkedList deq ptr to %p (was %p), logical dequeue count %d\n", pThis->tVars.linklist.pDelRoot, pThis->tVars.linklist.pDeqRoot, pThis->nLogDeq); pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pDelRoot; @@ -758,10 +757,10 @@ qqueueHaveQIF(qqueue_t *pThis) /* check if the file exists */ if(stat((char*) pszQIFNam, &stat_buf) == -1) { if(errno == ENOENT) { - dbgoprint((obj_t*) pThis, "no .qi file found\n"); + DBGOPRINT((obj_t*) pThis, "no .qi file found\n"); ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); } else { - dbgoprint((obj_t*) pThis, "error %d trying to access .qi file\n", errno); + DBGOPRINT((obj_t*) pThis, "error %d trying to access .qi file\n", errno); ABORT_FINALIZE(RS_RET_IO_ERROR); } } @@ -793,10 +792,10 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) /* check if the file exists */ if(stat((char*) pszQIFNam, &stat_buf) == -1) { if(errno == ENOENT) { - dbgoprint((obj_t*) pThis, "clean startup, no .qi file found\n"); + DBGOPRINT((obj_t*) pThis, "clean startup, no .qi file found\n"); ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); } else { - dbgoprint((obj_t*) pThis, "error %d trying to access .qi file\n", errno); + DBGOPRINT((obj_t*) pThis, "error %d trying to access .qi file\n", errno); ABORT_FINALIZE(RS_RET_IO_ERROR); } } @@ -839,7 +838,7 @@ finalize_it: strm.Destruct(&psQIF); if(iRet != RS_RET_OK) { - dbgoprint((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n", + DBGOPRINT((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n", iRet); } @@ -948,7 +947,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) */ objDestruct(pUsr); - dbgoprint((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n", + DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n", nWriteCount, pThis->tVars.disk.sizeOnDisk); finalize_it: @@ -990,7 +989,7 @@ static rsRetVal qDelDisk(qqueue_t *pThis) } else { pThis->tVars.disk.sizeOnDisk -= pThis->tVars.disk.bytesRead; pThis->tVars.disk.bytesRead = offsOut; - dbgoprint((obj_t*) pThis, "a file has been deleted, now %lld octets disk space used\n", pThis->tVars.disk.sizeOnDisk); + DBGOPRINT((obj_t*) pThis, "a file has been deleted, now %lld octets disk space used\n", pThis->tVars.disk.sizeOnDisk); /* awake possibly waiting enq process */ pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */ } @@ -1080,7 +1079,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr) if(pThis->qType != QUEUETYPE_DIRECT) { ATOMIC_INC(pThis->iQueueSize); - dbgoprint((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n", + DBGOPRINT((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); } @@ -1106,7 +1105,7 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr) iRet = pThis->qDeq(pThis, ppUsr); ATOMIC_INC(pThis->nLogDeq); -// dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", +// DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", // getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; @@ -1159,37 +1158,37 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) * shutdown of both the regular and DA queue on *the same* timeout. */ timeoutComp(&tTimeout, pThis->toQShutdown); - dbgoprint((obj_t*) pThis, "trying shutdown of regular workers\n"); + DBGOPRINT((obj_t*) pThis, "trying shutdown of regular workers\n"); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { - dbgoprint((obj_t*) pThis, "regular shutdown timed out on primary queue (this is OK)\n"); + DBGOPRINT((obj_t*) pThis, "regular shutdown timed out on primary queue (this is OK)\n"); } else { - dbgoprint((obj_t*) pThis, "regular queue workers shut down.\n"); + DBGOPRINT((obj_t*) pThis, "regular queue workers shut down.\n"); } /* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */ if(pThis->pqDA != NULL) { - dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", + DBGOPRINT((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", qqueueGetID(pThis->pqDA)); /* we use the same absolute timeout as above, so we do not use more than the configured * timeout interval! */ - dbgoprint((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n"); + DBGOPRINT((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n"); iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { - dbgoprint((obj_t*) pThis, "shutdown timed out on DA queue worker (this is OK)\n"); + DBGOPRINT((obj_t*) pThis, "shutdown timed out on DA queue worker (this is OK)\n"); } else { - dbgoprint((obj_t*) pThis, "DA queue worker shut down.\n"); + DBGOPRINT((obj_t*) pThis, "DA queue worker shut down.\n"); } /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting * the queue, it is restarted at a later stage. We don't care here if a timeout happens. */ - dbgoprint((obj_t*) pThis, "trying shutdown of main queue DA worker pool\n"); + DBGOPRINT((obj_t*) pThis, "trying shutdown of main queue DA worker pool\n"); iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { - dbgoprint((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n"); + DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n"); } else { - dbgoprint((obj_t*) pThis, "main queue DA worker pool shut down.\n"); + DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down.\n"); } } @@ -1227,25 +1226,25 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ timeoutComp(&tTimeout, pThis->toActShutdown); - dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers (if any)\n"); + DBGOPRINT((obj_t*) pThis, "trying immediate shutdown of regular workers (if any)\n"); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { - dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and " + DBGOPRINT((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and " "triggers cancellation)\n"); } else if(iRetLocal != RS_RET_OK) { - dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue " + DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue " "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } if(pThis->pqDA != NULL) { /* and now the same for the DA queue */ - dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n"); + DBGOPRINT((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n"); iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { - dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable " + DBGOPRINT((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable " "and triggers cancellation)\n"); } else if(iRetLocal != RS_RET_OK) { - dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA " + DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA " "queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } } @@ -1268,19 +1267,19 @@ cancelWorkers(qqueue_t *pThis) * all timeout setting. If any worker in any queue still executes, its consumer is possibly * long-running and cancelling is the only way to get rid of it. */ - dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n"); + DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n"); iRetLocal = wtpCancelAll(pThis->pWtpReg); /* returns immediately if all threads already have terminated */ if(iRetLocal != RS_RET_OK) { - dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel primary queue worker " + DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel primary queue worker " "threads, continuing, but results are unpredictable\n", iRetLocal); } /* ... and now the DA queue, if it exists (should always be after the primary one) */ if(pThis->pqDA != NULL) { - dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the DA queue\n"); + DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the DA queue\n"); iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg); /* returns immediately if all threads already have terminated */ if(iRetLocal != RS_RET_OK) { - dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker " + DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker " "threads, continuing, but results are unpredictable\n", iRetLocal); } @@ -1289,7 +1288,7 @@ cancelWorkers(qqueue_t *pThis) * big trouble when resetting the logical dequeue pointer. This operation can only be * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28 */ - dbgoprint((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n"); + DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n"); iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */ } @@ -1322,7 +1321,7 @@ ShutdownWorkers(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ - dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); + DBGOPRINT((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); /* we reduce the low water mark in any case. This is not absolutely necessary, but * it is useful because we enable DA mode at several spots below and so we do not need @@ -1343,7 +1342,7 @@ ShutdownWorkers(qqueue_t *pThis) * Well, more precisely, they *are in termination*. Some cancel cleanup handlers * may still be running. Note that the main queue's DA worker may still be running. */ - dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n", + DBGOPRINT((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); finalize_it: @@ -1461,12 +1460,12 @@ static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, voi if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) { iRetLocal = objGetSeverity(pUsr, &iSeverity); if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { - dbgoprint((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n", + DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n", iQueueSize, iSeverity); objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { - dbgoprint((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg " + DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg " "(iRet: %d, severity %d)\n", iQueueSize, iRetLocal, iSeverity); } } @@ -1656,7 +1655,7 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { - dbgoprint((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things " + DBGOPRINT((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things " "may happen\n", iRet); } @@ -1751,7 +1750,7 @@ RateLimiter(qqueue_t *pThis) } if(iDelay > 0) { - dbgoprint((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay); + DBGOPRINT((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay); srSleep(iDelay, 0); } @@ -1822,7 +1821,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) */ //TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that... if(pThis->iDeqSlowdown) { - dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n", + DBGOPRINT((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n", pThis->iDeqSlowdown); srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000); } @@ -1862,7 +1861,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) } finalize_it: - dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); + DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); RETiRet; } @@ -2018,7 +2017,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pthread_mutex_init(pThis->mut, NULL); } else { /* child queue, we need to use parent's mutex */ - dbgoprint((obj_t*) pThis, "I am a child\n"); + DBGOPRINT((obj_t*) pThis, "I am a child\n"); pThis->mut = pThis->pqParent->mut; } @@ -2032,7 +2031,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ - dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, " + DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, " "full delay %d, light delay %d, deq batch size %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), @@ -2069,18 +2068,18 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ */ iRetLocal = qqueueHaveQIF(pThis); if(iRetLocal == RS_RET_OK) { - dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n"); + DBGOPRINT((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n"); InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ bInitialized = 1; /* we are done */ } else { /* TODO: use logerror? -- rgerhards, 2008-01-16 */ - dbgoprint((obj_t*) pThis, "error %d trying to access on-disk queue files, starting without them. " + DBGOPRINT((obj_t*) pThis, "error %d trying to access on-disk queue files, starting without them. " "Some data may be lost\n", iRetLocal); } } if(Debug && !bInitialized) { - dbgoprint((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA " + DBGOPRINT((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA " "queue itself!)\n"); } @@ -2123,7 +2122,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) FINALIZE; /* if the queue is empty, we are happy and done... */ } - dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis)); + DBGOPRINT((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis)); /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", @@ -2228,14 +2227,14 @@ DoSaveOnShutdown(qqueue_t *pThis) InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); /* make sure we do not timeout before we are done */ - dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n"); + DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n"); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); /* and run the primary queue's DA worker to drain the queue */ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); - dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n", + DBGOPRINT((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n", iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); if(iRetLocal != RS_RET_OK) { - dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, " + DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, " "continuing, but results are unpredictable\n", iRetLocal); } @@ -2301,7 +2300,7 @@ CODESTARTobjDestruct(qqueue) * if need arises (what I doubt...) -- rgerhards, 2008-01-25 */ CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) { - dbgoprint((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet); + DBGOPRINT((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet); } /* finally, clean up some simple things... */ @@ -2411,12 +2410,12 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) */ if(flowCtlType == eFLOWCTL_FULL_DELAY) { while(pThis->iQueueSize >= pThis->iFullDlyMrk) { - dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n"); + DBGOPRINT((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n"); pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */ } } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) { if(pThis->iQueueSize >= pThis->iLightDlyMrk) { - dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n"); + DBGOPRINT((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n"); timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */ pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */ } @@ -2430,10 +2429,10 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0 && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { - dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); + DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); timeoutComp(&t, pThis->toEnq); if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { - dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); + DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } @@ -2483,7 +2482,7 @@ finalize_it: /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); - dbgoprint((obj_t*) pThis, "MultiEnqObj advised worker start\n"); + DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n"); } RETiRet; @@ -2517,7 +2516,7 @@ finalize_it: /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); - dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n"); + DBGOPRINT((obj_t*) pThis, "EnqueueMsg advised worker start\n"); } RETiRet; @@ -2555,7 +2554,7 @@ SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) if(bEnqOnly == 1) { /* switch to enqueue-only mode */ /* this means we need to terminate all workers - that's it... */ - dbgoprint((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n"); + DBGOPRINT((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n"); if(pThis->pWtpReg != NULL) wtpWakeupAllWrkr(pThis->pWtpReg); if(pThis->pWtpDA != NULL) -- cgit