summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-20 15:49:50 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-20 15:49:50 +0200
commitcaf6b75951d087f2406bcdda21a98dc2ee3eb145 (patch)
tree5259073347a4c9310f1a93c1bb1b188c7a3f599c
parent1e9ee368f9c8b1d2e926091691452bce047bb847 (diff)
downloadrsyslog-caf6b75951d087f2406bcdda21a98dc2ee3eb145.tar.gz
rsyslog-caf6b75951d087f2406bcdda21a98dc2ee3eb145.tar.xz
rsyslog-caf6b75951d087f2406bcdda21a98dc2ee3eb145.zip
cleanup & better debug message handling
the new handling will hopefully spare a few cycles, as function calls (and most importantly parameter generation!) or now only done when debug messages are actually active.
-rw-r--r--runtime/queue.c131
1 files changed, 65 insertions, 66 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index e71e35cf..cb14b58d 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -67,7 +67,6 @@ DEFobjCurrIf(glbl)
DEFobjCurrIf(strm)
/* forward-definitions */
-static rsRetVal ChkStrtDA(qqueue_t *pThis);
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
static rsRetVal SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
static rsRetVal RateLimiter(qqueue_t *pThis);
@@ -209,7 +208,7 @@ static inline void queueDrain(qqueue_t *pThis)
ASSERT(pThis != NULL);
BEGINfunc
- dbgoprint((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
+ DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) {
pThis->qDeq(pThis, &pUsr);
@@ -277,7 +276,7 @@ TurnOffDAMode(qqueue_t *pThis)
ASSERT(pThis->bRunsDA);
if(getLogicalQueueSize(pThis->pqDA) == 0) {
pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
- dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
+ DBGOPRINT((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
iRet);
}
@@ -300,9 +299,9 @@ qqueueChkIsDA(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pszFilePrefix != NULL) {
pThis->bIsDA = 1;
- dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n");
+ DBGOPRINT((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n");
} else {
- dbgoprint((obj_t*) pThis, "is NOT disk-assisted\n");
+ DBGOPRINT((obj_t*) pThis, "is NOT disk-assisted\n");
}
RETiRet;
@@ -375,7 +374,7 @@ StartDA(qqueue_t *pThis)
//pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
- dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
+ DBGOPRINT((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
qqueueGetID(pThis->pqDA));
finalize_it:
@@ -383,7 +382,7 @@ finalize_it:
if(pThis->pqDA != NULL) {
qqueueDestruct(&pThis->pqDA);
}
- dbgoprint((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet);
+ DBGOPRINT((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet);
pThis->bIsDA = 0;
}
@@ -481,14 +480,14 @@ ChkStrtDA(qqueue_t *pThis)
* terminated due to the inactivity timeout, thus we need to advise the pool that
* we need at least one).
*/
- dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n",
+ DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n",
getPhysicalQueueSize(pThis));
qqueueAdviseMaxWorkers(pThis);
} else {
/* this is the case when we are currently not running in DA mode. So it is time
* to turn it back on.
*/
- dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
+ DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
getPhysicalQueueSize(pThis));
InitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
}
@@ -598,7 +597,7 @@ qUnDeqAllFixedArray(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
- dbgoprint((obj_t*) pThis, "resetting FixedArray deq index to %ld (was %ld), logical dequeue count %d\n",
+ DBGOPRINT((obj_t*) pThis, "resetting FixedArray deq index to %ld (was %ld), logical dequeue count %d\n",
pThis->tVars.farray.head, pThis->tVars.farray.deqhead, pThis->nLogDeq);
pThis->tVars.farray.deqhead = pThis->tVars.farray.head;
@@ -709,7 +708,7 @@ qUnDeqAllLinkedList(qqueue_t *pThis)
ASSERT(pThis != NULL);
- dbgoprint((obj_t*) pThis, "resetting LinkedList deq ptr to %p (was %p), logical dequeue count %d\n",
+ DBGOPRINT((obj_t*) pThis, "resetting LinkedList deq ptr to %p (was %p), logical dequeue count %d\n",
pThis->tVars.linklist.pDelRoot, pThis->tVars.linklist.pDeqRoot, pThis->nLogDeq);
pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pDelRoot;
@@ -758,10 +757,10 @@ qqueueHaveQIF(qqueue_t *pThis)
/* check if the file exists */
if(stat((char*) pszQIFNam, &stat_buf) == -1) {
if(errno == ENOENT) {
- dbgoprint((obj_t*) pThis, "no .qi file found\n");
+ DBGOPRINT((obj_t*) pThis, "no .qi file found\n");
ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
} else {
- dbgoprint((obj_t*) pThis, "error %d trying to access .qi file\n", errno);
+ DBGOPRINT((obj_t*) pThis, "error %d trying to access .qi file\n", errno);
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
}
@@ -793,10 +792,10 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
/* check if the file exists */
if(stat((char*) pszQIFNam, &stat_buf) == -1) {
if(errno == ENOENT) {
- dbgoprint((obj_t*) pThis, "clean startup, no .qi file found\n");
+ DBGOPRINT((obj_t*) pThis, "clean startup, no .qi file found\n");
ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
} else {
- dbgoprint((obj_t*) pThis, "error %d trying to access .qi file\n", errno);
+ DBGOPRINT((obj_t*) pThis, "error %d trying to access .qi file\n", errno);
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
}
@@ -839,7 +838,7 @@ finalize_it:
strm.Destruct(&psQIF);
if(iRet != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n",
+ DBGOPRINT((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n",
iRet);
}
@@ -948,7 +947,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
*/
objDestruct(pUsr);
- dbgoprint((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n",
+ DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n",
nWriteCount, pThis->tVars.disk.sizeOnDisk);
finalize_it:
@@ -990,7 +989,7 @@ static rsRetVal qDelDisk(qqueue_t *pThis)
} else {
pThis->tVars.disk.sizeOnDisk -= pThis->tVars.disk.bytesRead;
pThis->tVars.disk.bytesRead = offsOut;
- dbgoprint((obj_t*) pThis, "a file has been deleted, now %lld octets disk space used\n", pThis->tVars.disk.sizeOnDisk);
+ DBGOPRINT((obj_t*) pThis, "a file has been deleted, now %lld octets disk space used\n", pThis->tVars.disk.sizeOnDisk);
/* awake possibly waiting enq process */
pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
}
@@ -1080,7 +1079,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr)
if(pThis->qType != QUEUETYPE_DIRECT) {
ATOMIC_INC(pThis->iQueueSize);
- dbgoprint((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n",
+ DBGOPRINT((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n",
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
}
@@ -1106,7 +1105,7 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr)
iRet = pThis->qDeq(pThis, ppUsr);
ATOMIC_INC(pThis->nLogDeq);
-// dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
+// DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
// getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
@@ -1159,37 +1158,37 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
* shutdown of both the regular and DA queue on *the same* timeout.
*/
timeoutComp(&tTimeout, pThis->toQShutdown);
- dbgoprint((obj_t*) pThis, "trying shutdown of regular workers\n");
+ DBGOPRINT((obj_t*) pThis, "trying shutdown of regular workers\n");
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgoprint((obj_t*) pThis, "regular shutdown timed out on primary queue (this is OK)\n");
+ DBGOPRINT((obj_t*) pThis, "regular shutdown timed out on primary queue (this is OK)\n");
} else {
- dbgoprint((obj_t*) pThis, "regular queue workers shut down.\n");
+ DBGOPRINT((obj_t*) pThis, "regular queue workers shut down.\n");
}
/* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */
if(pThis->pqDA != NULL) {
- dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
+ DBGOPRINT((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
qqueueGetID(pThis->pqDA));
/* we use the same absolute timeout as above, so we do not use more than the configured
* timeout interval!
*/
- dbgoprint((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n");
+ DBGOPRINT((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n");
iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgoprint((obj_t*) pThis, "shutdown timed out on DA queue worker (this is OK)\n");
+ DBGOPRINT((obj_t*) pThis, "shutdown timed out on DA queue worker (this is OK)\n");
} else {
- dbgoprint((obj_t*) pThis, "DA queue worker shut down.\n");
+ DBGOPRINT((obj_t*) pThis, "DA queue worker shut down.\n");
}
/* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting
* the queue, it is restarted at a later stage. We don't care here if a timeout happens.
*/
- dbgoprint((obj_t*) pThis, "trying shutdown of main queue DA worker pool\n");
+ DBGOPRINT((obj_t*) pThis, "trying shutdown of main queue DA worker pool\n");
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgoprint((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n");
+ DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n");
} else {
- dbgoprint((obj_t*) pThis, "main queue DA worker pool shut down.\n");
+ DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down.\n");
}
}
@@ -1227,25 +1226,25 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
timeoutComp(&tTimeout, pThis->toActShutdown);
- dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers (if any)\n");
+ DBGOPRINT((obj_t*) pThis, "trying immediate shutdown of regular workers (if any)\n");
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and "
+ DBGOPRINT((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and "
"triggers cancellation)\n");
} else if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue "
+ DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue "
"in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
if(pThis->pqDA != NULL) {
/* and now the same for the DA queue */
- dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n");
+ DBGOPRINT((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n");
iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable "
+ DBGOPRINT((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable "
"and triggers cancellation)\n");
} else if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA "
+ DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA "
"queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
}
@@ -1268,19 +1267,19 @@ cancelWorkers(qqueue_t *pThis)
* all timeout setting. If any worker in any queue still executes, its consumer is possibly
* long-running and cancelling is the only way to get rid of it.
*/
- dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n");
+ DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n");
iRetLocal = wtpCancelAll(pThis->pWtpReg); /* returns immediately if all threads already have terminated */
if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel primary queue worker "
+ DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel primary queue worker "
"threads, continuing, but results are unpredictable\n", iRetLocal);
}
/* ... and now the DA queue, if it exists (should always be after the primary one) */
if(pThis->pqDA != NULL) {
- dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the DA queue\n");
+ DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the DA queue\n");
iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg); /* returns immediately if all threads already have terminated */
if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker "
+ DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker "
"threads, continuing, but results are unpredictable\n", iRetLocal);
}
@@ -1289,7 +1288,7 @@ cancelWorkers(qqueue_t *pThis)
* big trouble when resetting the logical dequeue pointer. This operation can only be
* done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
*/
- dbgoprint((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n");
+ DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n");
iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */
}
@@ -1322,7 +1321,7 @@ ShutdownWorkers(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
- dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
+ DBGOPRINT((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
/* we reduce the low water mark in any case. This is not absolutely necessary, but
* it is useful because we enable DA mode at several spots below and so we do not need
@@ -1343,7 +1342,7 @@ ShutdownWorkers(qqueue_t *pThis)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running. Note that the main queue's DA worker may still be running.
*/
- dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n",
+ DBGOPRINT((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n",
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
finalize_it:
@@ -1461,12 +1460,12 @@ static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, voi
if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) {
iRetLocal = objGetSeverity(pUsr, &iSeverity);
if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
- dbgoprint((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n",
+ DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n",
iQueueSize, iSeverity);
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
- dbgoprint((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg "
+ DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg "
"(iRet: %d, severity %d)\n", iQueueSize, iRetLocal, iSeverity);
}
}
@@ -1656,7 +1655,7 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
- dbgoprint((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things "
+ DBGOPRINT((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things "
"may happen\n", iRet);
}
@@ -1751,7 +1750,7 @@ RateLimiter(qqueue_t *pThis)
}
if(iDelay > 0) {
- dbgoprint((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay);
+ DBGOPRINT((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay);
srSleep(iDelay, 0);
}
@@ -1822,7 +1821,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
*/
//TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that...
if(pThis->iDeqSlowdown) {
- dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n",
+ DBGOPRINT((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n",
pThis->iDeqSlowdown);
srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000);
}
@@ -1862,7 +1861,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
}
finalize_it:
- dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
+ DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
RETiRet;
}
@@ -2018,7 +2017,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pthread_mutex_init(pThis->mut, NULL);
} else {
/* child queue, we need to use parent's mutex */
- dbgoprint((obj_t*) pThis, "I am a child\n");
+ DBGOPRINT((obj_t*) pThis, "I am a child\n");
pThis->mut = pThis->pqParent->mut;
}
@@ -2032,7 +2031,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
- dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, "
+ DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, "
"full delay %d, light delay %d, deq batch size %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis),
@@ -2069,18 +2068,18 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
*/
iRetLocal = qqueueHaveQIF(pThis);
if(iRetLocal == RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n");
+ DBGOPRINT((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n");
InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
bInitialized = 1; /* we are done */
} else {
/* TODO: use logerror? -- rgerhards, 2008-01-16 */
- dbgoprint((obj_t*) pThis, "error %d trying to access on-disk queue files, starting without them. "
+ DBGOPRINT((obj_t*) pThis, "error %d trying to access on-disk queue files, starting without them. "
"Some data may be lost\n", iRetLocal);
}
}
if(Debug && !bInitialized) {
- dbgoprint((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA "
+ DBGOPRINT((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA "
"queue itself!)\n");
}
@@ -2123,7 +2122,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
FINALIZE; /* if the queue is empty, we are happy and done... */
}
- dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis));
+ DBGOPRINT((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis));
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
@@ -2228,14 +2227,14 @@ DoSaveOnShutdown(qqueue_t *pThis)
InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
/* make sure we do not timeout before we are done */
- dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n");
+ DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n");
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
/* and run the primary queue's DA worker to drain the queue */
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
- dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n",
+ DBGOPRINT((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n",
iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
+ DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
"continuing, but results are unpredictable\n", iRetLocal);
}
@@ -2301,7 +2300,7 @@ CODESTARTobjDestruct(qqueue)
* if need arises (what I doubt...) -- rgerhards, 2008-01-25
*/
CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) {
- dbgoprint((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet);
+ DBGOPRINT((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet);
}
/* finally, clean up some simple things... */
@@ -2411,12 +2410,12 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
*/
if(flowCtlType == eFLOWCTL_FULL_DELAY) {
while(pThis->iQueueSize >= pThis->iFullDlyMrk) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n");
+ DBGOPRINT((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n");
pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */
}
} else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) {
if(pThis->iQueueSize >= pThis->iLightDlyMrk) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n");
+ DBGOPRINT((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n");
timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */
pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */
}
@@ -2430,10 +2429,10 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize)
|| (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
&& pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
+ DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
timeoutComp(&t, pThis->toEnq);
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
+ DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
@@ -2483,7 +2482,7 @@ finalize_it:
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
- dbgoprint((obj_t*) pThis, "MultiEnqObj advised worker start\n");
+ DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n");
}
RETiRet;
@@ -2517,7 +2516,7 @@ finalize_it:
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
- dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
+ DBGOPRINT((obj_t*) pThis, "EnqueueMsg advised worker start\n");
}
RETiRet;
@@ -2555,7 +2554,7 @@ SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
if(bEnqOnly == 1) {
/* switch to enqueue-only mode */
/* this means we need to terminate all workers - that's it... */
- dbgoprint((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n");
+ DBGOPRINT((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n");
if(pThis->pWtpReg != NULL)
wtpWakeupAllWrkr(pThis->pWtpReg);
if(pThis->pWtpDA != NULL)