summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--queue.c186
1 files changed, 72 insertions, 114 deletions
diff --git a/queue.c b/queue.c
index c617de75..d3698d38 100644
--- a/queue.c
+++ b/queue.c
@@ -138,17 +138,13 @@ queueTurnOffDAMode(queue_t *pThis)
RUNLOG_VAR("%p", pThis->pqDA);
RUNLOG_VAR("%d", pThis->pqDA->iQueueSize);
if(pThis->pqDA->iQueueSize == 0) {
-dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQueInDestr %d, NumWrkd %d\n",
- queueGetID(pThis),
- pThis->bEnqOnly,pThis->bQueueInDestruction,pThis->iCurNumWrkThrd);
-
pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
* this will be quick.
*/
queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
- dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
- queueGetID(pThis), iRet);
+ dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
+ iRet);
/* now we need to check if the regular queue has some messages. This may be the case
* when it is waiting that the high water mark is reached again. If so, we need to start up
* a regular worker. -- rgerhards, 2008-01-26
@@ -178,9 +174,9 @@ queueChkIsDA(queue_t *pThis)
RUNLOG_VAR("%s", pThis->pszFilePrefix);
if(pThis->pszFilePrefix != NULL) {
pThis->bIsDA = 1;
- dbgprintf("Queue 0x%lx: is disk-assisted, disk will be used on demand\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n");
} else {
- dbgprintf("Queue 0x%lx: is NOT disk-assisted\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "is NOT disk-assisted\n");
}
RETiRet;
@@ -209,10 +205,8 @@ queueStartDA(queue_t *pThis)
FINALIZE; /* ... then we are already done! */
/* create message queue */
-dbgprintf("Queue %p: queueSTrtDA pre child queue construct,\n", pThis);
CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
-dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pThis->pqDA);
/* as the created queue is the same object class, we take the
* liberty to access its properties directly.
*/
@@ -236,7 +230,7 @@ dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pT
CHKiRet(queueSettoQShutdown(pThis->pqDA, 1));
}
-dbgprintf("Queue %p: queueStartDA pre start\n", pThis);
+dbgoprint((obj_t*) pThis, "queueStartDA pre start\n");
iRet = queueStart(pThis->pqDA);
/* file not found is expected, that means it is no previous QIF available */
if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
@@ -254,16 +248,15 @@ dbgprintf("Queue %p: queueStartDA pre start\n", pThis);
pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */
pthread_cond_signal(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
- dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n",
- queueGetID(pThis), queueGetID(pThis->pqDA));
+ dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
+ queueGetID(pThis->pqDA));
finalize_it:
if(iRet != RS_RET_OK) {
if(pThis->pqDA != NULL) {
queueDestruct(&pThis->pqDA);
}
- dbgprintf("Queue 0x%lx: error %d creating disk queue - giving up.\n",
- queueGetID(pThis), iRet);
+ dbgoprint((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet);
pThis->bIsDA = 0;
}
@@ -344,7 +337,6 @@ queueChkStrtDA(queue_t *pThis)
if(pThis->iQueueSize != pThis->iHighWtrMrk)
ABORT_FINALIZE(RS_RET_OK);
-dbgprintf("Queue %p: chkStartDA\n", pThis);
if(pThis->bRunsDA) {
/* then we need to signal that we are at the high water mark again. If that happens
* on our way down the queue, that doesn't matter, because then nobody is waiting
@@ -355,15 +347,15 @@ dbgprintf("Queue %p: chkStartDA\n", pThis);
* terminated due to the inactivity timeout, thus we need to advise the pool that
* we need at least one).
*/
- dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n",
- queueGetID(pThis), pThis->iQueueSize);
+ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n",
+ pThis->iQueueSize);
queueAdviseMaxWorkers(pThis);
} else {
/* this is the case when we are currently not running in DA mode. So it is time
* to turn it back on.
*/
- dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n",
- queueGetID(pThis), pThis->iQueueSize);
+ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
+ pThis->iQueueSize);
queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
}
@@ -560,10 +552,10 @@ queueHaveQIF(queue_t *pThis)
/* check if the file exists */
if(stat((char*) pszQIFNam, &stat_buf) == -1) {
if(errno == ENOENT) {
- dbgprintf("Queue 0x%lx: no .qi file found\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "no .qi file found\n");
ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
} else {
- dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno);
+ dbgoprint((obj_t*) pThis, "error %d trying to access .qi file\n", errno);
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
}
@@ -595,10 +587,10 @@ queueTryLoadPersistedInfo(queue_t *pThis)
/* check if the file exists */
if(stat((char*) pszQIFNam, &stat_buf) == -1) {
if(errno == ENOENT) {
- dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "clean startup, no .qi file found\n");
ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
} else {
- dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno);
+ dbgoprint((obj_t*) pThis, "error %d trying to access .qi file\n", errno);
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
}
@@ -634,8 +626,8 @@ finalize_it:
strmDestruct(&psQIF);
if(iRet != RS_RET_OK) {
- dbgprintf("Queue 0x%lx: error %d reading .qi file - can not read persisted info (if any)\n",
- queueGetID(pThis), iRet);
+ dbgoprint((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n",
+ iRet);
}
RETiRet;
@@ -663,7 +655,6 @@ static rsRetVal qConstructDisk(queue_t *pThis)
else if(iRet != RS_RET_FILE_NOT_FOUND)
FINALIZE;
-dbgprintf("qConstructDisk: bRestarted %d, iRet %d\n", bRestarted, iRet);
if(bRestarted == 1) {
;
} else {
@@ -754,9 +745,9 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
/* calling the consumer is quite different here than it is from a worker thread */
iRetLocal = pThis->pConsumer(pThis->pUsr, pUsr);
- if(iRetLocal != RS_RET_OK)
- dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n",
- queueGetID(pThis), iRetLocal);
+ if(iRetLocal != RS_RET_OK) {
+ dbgoprint((obj_t*) pThis, "Consumer returned iRet %d\n", iRetLocal);
+ }
--pThis->iQueueSize; /* this is kind of a hack, but its the smartest thing we can do given
* the somewhat astonishing fact that this queue type does not actually
* queue anything ;)
@@ -786,7 +777,7 @@ queueAdd(queue_t *pThis, void *pUsr)
++pThis->iQueueSize;
- dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", queueGetID(pThis), pThis->iQueueSize);
+ dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize);
finalize_it:
RETiRet;
@@ -809,8 +800,8 @@ queueDel(queue_t *pThis, void *pUsr)
iRet = pThis->qDel(pThis, pUsr);
--pThis->iQueueSize;
- dbgprintf("Queue 0x%lx: entry deleted, state %d, size now %d entries\n",
- queueGetID(pThis), iRet, pThis->iQueueSize);
+ dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
+ iRet, pThis->iQueueSize);
RETiRet;
}
@@ -837,7 +828,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
- dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
/* we reduce the low water mark in any case. This is not absolutely necessary, but
* it is useful because we enable DA mode at several spots below and so we do not need
@@ -876,26 +867,25 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
*/
RUNLOG_VAR("%d", pThis->toQShutdown);
timeoutComp(&tTimeout, pThis->toQShutdown);
- dbgprintf("Queue 0x%lx: trying shutdown of regular workers\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "trying shutdown of regular workers\n");
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgprintf("Queue 0x%lx: regular shutdown timed out on primary queue (this is OK)\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "regular shutdown timed out on primary queue (this is OK)\n");
} else {
/* OK, the regular queue is now shut down. So we can now wait for the DA queue (if running DA) */
- dbgprintf("Queue 0x%lx: regular queue workers shut down.\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "regular queue workers shut down.\n");
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
if(pThis->bRunsDA) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgprintf("Queue 0x%lx: we have a DA queue (0x%lx), requesting its shutdown.\n",
- queueGetID(pThis), queueGetID(pThis->pqDA));
+ dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
+ queueGetID(pThis->pqDA));
/* we use the same absolute timeout as above, so we do not use more than the configured
* timeout interval!
*/
- dbgprintf("Queue 0x%lx: trying shutdown of DA workers\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "trying shutdown of DA workers\n");
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgprintf("Queue 0x%lx: shutdown timed out on DA queue (this is OK)\n",
- queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "shutdown timed out on DA queue (this is OK)\n");
}
} else {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -926,14 +916,13 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
}
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
/* make sure we do not timeout before we are done */
- dbgprintf("Queue 0x%lx: bSaveOnShutdown configured, eternal timeout set\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n");
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
/* and run the primary queue's DA worker to drain the queue */
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal != RS_RET_OK) {
- dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
- "continuing, but results are unpredictable\n",
- queueGetID(pThis), iRetLocal);
+ dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
+ "continuing, but results are unpredictable\n", iRetLocal);
}
} else {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -949,15 +938,14 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
timeoutComp(&tTimeout, pThis->toActShutdown);
if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgprintf("Queue 0x%lx: trying immediate shutdown of regular workers\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n");
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and "
- "triggers cancellation)\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and "
+ "triggers cancellation)\n");
} else if(iRetLocal != RS_RET_OK) {
- dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue "
- "in disk save mode. Continuing, but results are unpredictable\n",
- queueGetID(pThis), iRetLocal);
+ dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue "
+ "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
if(pThis->bIsDA) {
/* we need to re-aquire the mutex for the next check in this case! */
@@ -967,15 +955,14 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
/* and now the same for the DA queue */
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgprintf("Queue 0x%lx: trying immediate shutdown of DA workers\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA workers\n");
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
- dbgprintf("Queue 0x%lx: immediate shutdown timed out on DA queue (this is acceptable and "
- "triggers cancellation)\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable and "
+ "triggers cancellation)\n");
} else if(iRetLocal != RS_RET_OK) {
- dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the DA queue "
- "in disk save mode. Continuing, but results are unpredictable\n",
- queueGetID(pThis), iRetLocal);
+ dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue "
+ "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
}
} else {
@@ -989,13 +976,11 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
* function is still needed (what is no problem as we do not yet destroy the queue - but I
* thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25
*/
- dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the primary queue\n",
- queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n");
iRetLocal = wtpCancelAll(pThis->pWtpReg); /* returns immediately if all threads already have terminated */
if(iRetLocal != RS_RET_OK) {
- dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel primary queue worker "
- "threads, continuing, but results are unpredictable\n",
- queueGetID(pThis), iRetLocal);
+ dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel primary queue worker "
+ "threads, continuing, but results are unpredictable\n", iRetLocal);
}
@@ -1019,13 +1004,11 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
*/
/* ... and now the DA queue, if it exists (should always be after the primary one) */
if(pThis->pqDA != NULL) {
- dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n",
- queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the DA queue\n");
iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg); /* returns immediately if all threads already have terminated */
if(iRetLocal != RS_RET_OK) {
- dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel DA queue worker "
- "threads, continuing, but results are unpredictable\n",
- queueGetID(pThis), iRetLocal);
+ dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker "
+ "threads, continuing, but results are unpredictable\n", iRetLocal);
}
}
@@ -1033,8 +1016,7 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running.
*/
- dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
- queueGetID(pThis), pThis->iQueueSize);
+ dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", pThis->iQueueSize);
RETiRet;
}
@@ -1129,8 +1111,7 @@ queueConsumerCancelCleanup(void *arg1, void *arg2)
ISOBJ_TYPE_assert(pThis, queue);
- dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n",
- queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n");
/* TODO: re-enqueue the data element! This will also make the compiler warning go away... */
@@ -1165,14 +1146,13 @@ RUNLOG_VAR("%p", pUsr);
if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) {
iRetLocal = objGetSeverity(pUsr, &iSeverity);
if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
- dbgprintf("Queue 0x%lx: queue nearly full (%d entries), discarded severity %d message\n",
- queueGetID(pThis), iQueueSize, iSeverity);
+ dbgoprint((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n",
+ iQueueSize, iSeverity);
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
- dbgprintf("Queue 0x%lx: queue nearly full (%d entries), but could not drop msg "
- "(iRet: %d, severity %d)\n", queueGetID(pThis), iQueueSize,
- iRetLocal, iSeverity);
+ dbgoprint((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg "
+ "(iRet: %d, severity %d)\n", iQueueSize, iRetLocal, iSeverity);
}
}
@@ -1221,8 +1201,8 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
finalize_it:
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
- dbgprintf("Queue 0x%lx/w?: error %d dequeueing element - ignoring, but strange things "
- "may happen\n", queueGetID(pThis), iRet);
+ dbgoprint((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things "
+ "may happen\n", iRet);
}
RETiRet;
}
@@ -1244,7 +1224,7 @@ queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
finalize_it:
-dbgprintf("Queue %p: regular consumer returns %d\n", pThis, iRet);
+ dbgoprint((obj_t*) pThis, "regular consumer returns %d\n", iRet);
RETiRet;
}
@@ -1266,12 +1246,11 @@ queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pThis, queue);
ISOBJ_TYPE_assert(pWti, wti);
-dbgprintf("Queue %p/w?: queueDAConsumer, queue size %d\n", pThis, pThis->iQueueSize);/* dirty iQueueSize! */
CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
CHKiRet(queueEnqObj(pThis->pqDA, pWti->pUsrp));
finalize_it:
-dbgprintf("DAConsumer returns with iRet %d\n", iRet);
+ dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
RETiRet;
}
@@ -1367,18 +1346,6 @@ RUNLOG_VAR("%p", pThis->pqParent->pWtpDA);
ASSERT(pThis->pqParent->pWtpDA != NULL);
pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
-#if 0
- if(pThis->pqParent->pWtpDA == NULL) {
- /* this can happen if we are not set to save on an eternal timeout. We
- * log a warning but otherwise do nothing
- */
- dbgprintf("Queue 0x%lx: warning: pThis->pqParent->pWtpDA is NULL (this may be OK if the parent is not set to "
- " bSaveOnShutdown\n", queueGetID(pThis));
- } else {
- pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
- wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
- }
-#endif
}
RETiRet;
@@ -1471,38 +1438,30 @@ RUNLOG_VAR("%d", pThis->bIsDA);
iRetLocal = queueHaveQIF(pThis);
RUNLOG_VAR("%d", iRetLocal);
if(iRetLocal == RS_RET_OK) {
- dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n",
- queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n");
RUNLOG;
queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
bInitialized = 1; /* we are done */
} else {
/* TODO: use logerror? -- rgerhards, 2008-01-16 */
- dbgprintf("Queue 0x%lx: error %d trying to access on-disk queue files, starting without them. "
- "Some data may be lost\n", queueGetID(pThis), iRetLocal);
+ dbgoprint((obj_t*) pThis, "error %d trying to access on-disk queue files, starting without them. "
+ "Some data may be lost\n", iRetLocal);
}
}
RUNLOG_VAR("%d", bInitialized);
if(!bInitialized) {
- dbgprintf("Queue 0x%lx: queue starts up without (loading) any DA disk state (this is normal for the DA "
- "queue itself!)\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA "
+ "queue itself!)\n");
}
/* if the queue already contains data, we need to start the correct number of worker threads. This can be
* the case when a disk queue has been loaded. If we did not start it here, it would never start.
*/
queueAdviseMaxWorkers(pThis);
-#if 0
- if(!pThis->bEnqOnly && pThis->bRunsDA) {
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* there is always just one DA worker! */
- }
-#endif
-
pThis->bQueueStarted = 1;
finalize_it:
-dbgprintf("queueStart() exit, iret %d\n", iRet);
RETiRet;
}
@@ -1533,7 +1492,7 @@ static rsRetVal queuePersist(queue_t *pThis)
FINALIZE; /* if the queue is empty, we are happy and done... */
}
- dbgprintf("Queue 0x%lx: persisting queue to disk, %d entries...\n", queueGetID(pThis), pThis->iQueueSize);
+ dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", pThis->iQueueSize);
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
(char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
@@ -1666,7 +1625,7 @@ RUNLOG;
* if need arises (what I doubt...) -- rgerhards, 2008-01-25
*/
CHKiRet_Hdlr(queuePersist(pThis)) {
- dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet);
+ dbgoprint((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet);
}
/* finally, clean up some simple things... */
@@ -1779,10 +1738,10 @@ RUNLOG_VAR("%d", pThis->bRunsDA);
/* wait for the queue to be ready... */
while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
- dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
timeoutComp(&t, pThis->toEnq);
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
- dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
@@ -1798,7 +1757,7 @@ finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
d_pthread_mutex_unlock(pThis->mut);
i = pthread_cond_signal(&pThis->notEmpty);
- dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);
+ dbgoprint((obj_t*) pThis, "EnqueueMsg signaled condition (%d)\n", i);
pthread_setcancelstate(iCancelStateSave, NULL);
}
@@ -1842,8 +1801,7 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex)
if(bEnqOnly == 1) {
/* switch to enqueue-only mode */
/* this means we need to terminate all workers - that's it... */
- dbgprintf("Queue 0x%lx: switching to enqueue-only mode, terminating all worker threads\n",
- queueGetID(pThis));
+ dbgoprint((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n");
if(pThis->pWtpReg != NULL)
wtpWakeupAllWrkr(pThis->pWtpReg);
if(pThis->pWtpDA != NULL)