diff options
-rw-r--r-- | queue.c | 186 |
1 files changed, 72 insertions, 114 deletions
@@ -138,17 +138,13 @@ queueTurnOffDAMode(queue_t *pThis) RUNLOG_VAR("%p", pThis->pqDA); RUNLOG_VAR("%d", pThis->pqDA->iQueueSize); if(pThis->pqDA->iQueueSize == 0) { -dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQueInDestr %d, NumWrkd %d\n", - queueGetID(pThis), - pThis->bEnqOnly,pThis->bQueueInDestruction,pThis->iCurNumWrkThrd); - pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, * this will be quick. */ queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ - dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n", - queueGetID(pThis), iRet); + dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n", + iRet); /* now we need to check if the regular queue has some messages. This may be the case * when it is waiting that the high water mark is reached again. If so, we need to start up * a regular worker. -- rgerhards, 2008-01-26 @@ -178,9 +174,9 @@ queueChkIsDA(queue_t *pThis) RUNLOG_VAR("%s", pThis->pszFilePrefix); if(pThis->pszFilePrefix != NULL) { pThis->bIsDA = 1; - dbgprintf("Queue 0x%lx: is disk-assisted, disk will be used on demand\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n"); } else { - dbgprintf("Queue 0x%lx: is NOT disk-assisted\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "is NOT disk-assisted\n"); } RETiRet; @@ -209,10 +205,8 @@ queueStartDA(queue_t *pThis) FINALIZE; /* ... then we are already done! */ /* create message queue */ -dbgprintf("Queue %p: queueSTrtDA pre child queue construct,\n", pThis); CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer)); -dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pThis->pqDA); /* as the created queue is the same object class, we take the * liberty to access its properties directly. */ @@ -236,7 +230,7 @@ dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pT CHKiRet(queueSettoQShutdown(pThis->pqDA, 1)); } -dbgprintf("Queue %p: queueStartDA pre start\n", pThis); +dbgoprint((obj_t*) pThis, "queueStartDA pre start\n"); iRet = queueStart(pThis->pqDA); /* file not found is expected, that means it is no previous QIF available */ if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) @@ -254,16 +248,15 @@ dbgprintf("Queue %p: queueStartDA pre start\n", pThis); pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */ pthread_cond_signal(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ - dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n", - queueGetID(pThis), queueGetID(pThis->pqDA)); + dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n", + queueGetID(pThis->pqDA)); finalize_it: if(iRet != RS_RET_OK) { if(pThis->pqDA != NULL) { queueDestruct(&pThis->pqDA); } - dbgprintf("Queue 0x%lx: error %d creating disk queue - giving up.\n", - queueGetID(pThis), iRet); + dbgoprint((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet); pThis->bIsDA = 0; } @@ -344,7 +337,6 @@ queueChkStrtDA(queue_t *pThis) if(pThis->iQueueSize != pThis->iHighWtrMrk) ABORT_FINALIZE(RS_RET_OK); -dbgprintf("Queue %p: chkStartDA\n", pThis); if(pThis->bRunsDA) { /* then we need to signal that we are at the high water mark again. If that happens * on our way down the queue, that doesn't matter, because then nobody is waiting @@ -355,15 +347,15 @@ dbgprintf("Queue %p: chkStartDA\n", pThis); * terminated due to the inactivity timeout, thus we need to advise the pool that * we need at least one). */ - dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", - queueGetID(pThis), pThis->iQueueSize); + dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n", + pThis->iQueueSize); queueAdviseMaxWorkers(pThis); } else { /* this is the case when we are currently not running in DA mode. So it is time * to turn it back on. */ - dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n", - queueGetID(pThis), pThis->iQueueSize); + dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n", + pThis->iQueueSize); queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ } @@ -560,10 +552,10 @@ queueHaveQIF(queue_t *pThis) /* check if the file exists */ if(stat((char*) pszQIFNam, &stat_buf) == -1) { if(errno == ENOENT) { - dbgprintf("Queue 0x%lx: no .qi file found\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "no .qi file found\n"); ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); } else { - dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno); + dbgoprint((obj_t*) pThis, "error %d trying to access .qi file\n", errno); ABORT_FINALIZE(RS_RET_IO_ERROR); } } @@ -595,10 +587,10 @@ queueTryLoadPersistedInfo(queue_t *pThis) /* check if the file exists */ if(stat((char*) pszQIFNam, &stat_buf) == -1) { if(errno == ENOENT) { - dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "clean startup, no .qi file found\n"); ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); } else { - dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno); + dbgoprint((obj_t*) pThis, "error %d trying to access .qi file\n", errno); ABORT_FINALIZE(RS_RET_IO_ERROR); } } @@ -634,8 +626,8 @@ finalize_it: strmDestruct(&psQIF); if(iRet != RS_RET_OK) { - dbgprintf("Queue 0x%lx: error %d reading .qi file - can not read persisted info (if any)\n", - queueGetID(pThis), iRet); + dbgoprint((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n", + iRet); } RETiRet; @@ -663,7 +655,6 @@ static rsRetVal qConstructDisk(queue_t *pThis) else if(iRet != RS_RET_FILE_NOT_FOUND) FINALIZE; -dbgprintf("qConstructDisk: bRestarted %d, iRet %d\n", bRestarted, iRet); if(bRestarted == 1) { ; } else { @@ -754,9 +745,9 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr) /* calling the consumer is quite different here than it is from a worker thread */ iRetLocal = pThis->pConsumer(pThis->pUsr, pUsr); - if(iRetLocal != RS_RET_OK) - dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n", - queueGetID(pThis), iRetLocal); + if(iRetLocal != RS_RET_OK) { + dbgoprint((obj_t*) pThis, "Consumer returned iRet %d\n", iRetLocal); + } --pThis->iQueueSize; /* this is kind of a hack, but its the smartest thing we can do given * the somewhat astonishing fact that this queue type does not actually * queue anything ;) @@ -786,7 +777,7 @@ queueAdd(queue_t *pThis, void *pUsr) ++pThis->iQueueSize; - dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", queueGetID(pThis), pThis->iQueueSize); + dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize); finalize_it: RETiRet; @@ -809,8 +800,8 @@ queueDel(queue_t *pThis, void *pUsr) iRet = pThis->qDel(pThis, pUsr); --pThis->iQueueSize; - dbgprintf("Queue 0x%lx: entry deleted, state %d, size now %d entries\n", - queueGetID(pThis), iRet, pThis->iQueueSize); + dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n", + iRet, pThis->iQueueSize); RETiRet; } @@ -837,7 +828,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ - dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); /* we reduce the low water mark in any case. This is not absolutely necessary, but * it is useful because we enable DA mode at several spots below and so we do not need @@ -876,26 +867,25 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) */ RUNLOG_VAR("%d", pThis->toQShutdown); timeoutComp(&tTimeout, pThis->toQShutdown); - dbgprintf("Queue 0x%lx: trying shutdown of regular workers\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "trying shutdown of regular workers\n"); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { - dbgprintf("Queue 0x%lx: regular shutdown timed out on primary queue (this is OK)\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "regular shutdown timed out on primary queue (this is OK)\n"); } else { /* OK, the regular queue is now shut down. So we can now wait for the DA queue (if running DA) */ - dbgprintf("Queue 0x%lx: regular queue workers shut down.\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "regular queue workers shut down.\n"); BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ if(pThis->bRunsDA) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); - dbgprintf("Queue 0x%lx: we have a DA queue (0x%lx), requesting its shutdown.\n", - queueGetID(pThis), queueGetID(pThis->pqDA)); + dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", + queueGetID(pThis->pqDA)); /* we use the same absolute timeout as above, so we do not use more than the configured * timeout interval! */ - dbgprintf("Queue 0x%lx: trying shutdown of DA workers\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "trying shutdown of DA workers\n"); iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { - dbgprintf("Queue 0x%lx: shutdown timed out on DA queue (this is OK)\n", - queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "shutdown timed out on DA queue (this is OK)\n"); } } else { END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -926,14 +916,13 @@ RUNLOG_VAR("%d", pThis->toQShutdown); } END_MTX_PROTECTED_OPERATIONS(pThis->mut); /* make sure we do not timeout before we are done */ - dbgprintf("Queue 0x%lx: bSaveOnShutdown configured, eternal timeout set\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n"); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); /* and run the primary queue's DA worker to drain the queue */ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); if(iRetLocal != RS_RET_OK) { - dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying to shut down primary queue in disk save mode, " - "continuing, but results are unpredictable\n", - queueGetID(pThis), iRetLocal); + dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, " + "continuing, but results are unpredictable\n", iRetLocal); } } else { END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -949,15 +938,14 @@ RUNLOG_VAR("%d", pThis->toQShutdown); timeoutComp(&tTimeout, pThis->toActShutdown); if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); - dbgprintf("Queue 0x%lx: trying immediate shutdown of regular workers\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n"); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { - dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and " - "triggers cancellation)\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and " + "triggers cancellation)\n"); } else if(iRetLocal != RS_RET_OK) { - dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the primary queue " - "in disk save mode. Continuing, but results are unpredictable\n", - queueGetID(pThis), iRetLocal); + dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue " + "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } if(pThis->bIsDA) { /* we need to re-aquire the mutex for the next check in this case! */ @@ -967,15 +955,14 @@ RUNLOG_VAR("%d", pThis->toQShutdown); if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) { /* and now the same for the DA queue */ END_MTX_PROTECTED_OPERATIONS(pThis->mut); - dbgprintf("Queue 0x%lx: trying immediate shutdown of DA workers\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA workers\n"); iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { - dbgprintf("Queue 0x%lx: immediate shutdown timed out on DA queue (this is acceptable and " - "triggers cancellation)\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable and " + "triggers cancellation)\n"); } else if(iRetLocal != RS_RET_OK) { - dbgprintf("Queue 0x%lx: unexpected iRet state %d after trying immediate shutdown of the DA queue " - "in disk save mode. Continuing, but results are unpredictable\n", - queueGetID(pThis), iRetLocal); + dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue " + "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } } } else { @@ -989,13 +976,11 @@ RUNLOG_VAR("%d", pThis->toQShutdown); * function is still needed (what is no problem as we do not yet destroy the queue - but I * thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25 */ - dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the primary queue\n", - queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n"); iRetLocal = wtpCancelAll(pThis->pWtpReg); /* returns immediately if all threads already have terminated */ if(iRetLocal != RS_RET_OK) { - dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel primary queue worker " - "threads, continuing, but results are unpredictable\n", - queueGetID(pThis), iRetLocal); + dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel primary queue worker " + "threads, continuing, but results are unpredictable\n", iRetLocal); } @@ -1019,13 +1004,11 @@ RUNLOG_VAR("%d", pThis->toQShutdown); */ /* ... and now the DA queue, if it exists (should always be after the primary one) */ if(pThis->pqDA != NULL) { - dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n", - queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the DA queue\n"); iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg); /* returns immediately if all threads already have terminated */ if(iRetLocal != RS_RET_OK) { - dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel DA queue worker " - "threads, continuing, but results are unpredictable\n", - queueGetID(pThis), iRetLocal); + dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker " + "threads, continuing, but results are unpredictable\n", iRetLocal); } } @@ -1033,8 +1016,7 @@ RUNLOG_VAR("%d", pThis->toQShutdown); * Well, more precisely, they *are in termination*. Some cancel cleanup handlers * may still be running. */ - dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", - queueGetID(pThis), pThis->iQueueSize); + dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", pThis->iQueueSize); RETiRet; } @@ -1129,8 +1111,7 @@ queueConsumerCancelCleanup(void *arg1, void *arg2) ISOBJ_TYPE_assert(pThis, queue); - dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n", - queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n"); /* TODO: re-enqueue the data element! This will also make the compiler warning go away... */ @@ -1165,14 +1146,13 @@ RUNLOG_VAR("%p", pUsr); if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) { iRetLocal = objGetSeverity(pUsr, &iSeverity); if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { - dbgprintf("Queue 0x%lx: queue nearly full (%d entries), discarded severity %d message\n", - queueGetID(pThis), iQueueSize, iSeverity); + dbgoprint((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n", + iQueueSize, iSeverity); objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { - dbgprintf("Queue 0x%lx: queue nearly full (%d entries), but could not drop msg " - "(iRet: %d, severity %d)\n", queueGetID(pThis), iQueueSize, - iRetLocal, iSeverity); + dbgoprint((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg " + "(iRet: %d, severity %d)\n", iQueueSize, iRetLocal, iSeverity); } } @@ -1221,8 +1201,8 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) finalize_it: if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { - dbgprintf("Queue 0x%lx/w?: error %d dequeueing element - ignoring, but strange things " - "may happen\n", queueGetID(pThis), iRet); + dbgoprint((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things " + "may happen\n", iRet); } RETiRet; } @@ -1244,7 +1224,7 @@ queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave) CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp)); finalize_it: -dbgprintf("Queue %p: regular consumer returns %d\n", pThis, iRet); + dbgoprint((obj_t*) pThis, "regular consumer returns %d\n", iRet); RETiRet; } @@ -1266,12 +1246,11 @@ queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave) ISOBJ_TYPE_assert(pThis, queue); ISOBJ_TYPE_assert(pWti, wti); -dbgprintf("Queue %p/w?: queueDAConsumer, queue size %d\n", pThis, pThis->iQueueSize);/* dirty iQueueSize! */ CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); CHKiRet(queueEnqObj(pThis->pqDA, pWti->pUsrp)); finalize_it: -dbgprintf("DAConsumer returns with iRet %d\n", iRet); + dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); RETiRet; } @@ -1367,18 +1346,6 @@ RUNLOG_VAR("%p", pThis->pqParent->pWtpDA); ASSERT(pThis->pqParent->pWtpDA != NULL); pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ -#if 0 - if(pThis->pqParent->pWtpDA == NULL) { - /* this can happen if we are not set to save on an eternal timeout. We - * log a warning but otherwise do nothing - */ - dbgprintf("Queue 0x%lx: warning: pThis->pqParent->pWtpDA is NULL (this may be OK if the parent is not set to " - " bSaveOnShutdown\n", queueGetID(pThis)); - } else { - pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ - wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ - } -#endif } RETiRet; @@ -1471,38 +1438,30 @@ RUNLOG_VAR("%d", pThis->bIsDA); iRetLocal = queueHaveQIF(pThis); RUNLOG_VAR("%d", iRetLocal); if(iRetLocal == RS_RET_OK) { - dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n", - queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n"); RUNLOG; queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ bInitialized = 1; /* we are done */ } else { /* TODO: use logerror? -- rgerhards, 2008-01-16 */ - dbgprintf("Queue 0x%lx: error %d trying to access on-disk queue files, starting without them. " - "Some data may be lost\n", queueGetID(pThis), iRetLocal); + dbgoprint((obj_t*) pThis, "error %d trying to access on-disk queue files, starting without them. " + "Some data may be lost\n", iRetLocal); } } RUNLOG_VAR("%d", bInitialized); if(!bInitialized) { - dbgprintf("Queue 0x%lx: queue starts up without (loading) any DA disk state (this is normal for the DA " - "queue itself!)\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA " + "queue itself!)\n"); } /* if the queue already contains data, we need to start the correct number of worker threads. This can be * the case when a disk queue has been loaded. If we did not start it here, it would never start. */ queueAdviseMaxWorkers(pThis); -#if 0 - if(!pThis->bEnqOnly && pThis->bRunsDA) { - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* there is always just one DA worker! */ - } -#endif - pThis->bQueueStarted = 1; finalize_it: -dbgprintf("queueStart() exit, iret %d\n", iRet); RETiRet; } @@ -1533,7 +1492,7 @@ static rsRetVal queuePersist(queue_t *pThis) FINALIZE; /* if the queue is empty, we are happy and done... */ } - dbgprintf("Queue 0x%lx: persisting queue to disk, %d entries...\n", queueGetID(pThis), pThis->iQueueSize); + dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", pThis->iQueueSize); /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); @@ -1666,7 +1625,7 @@ RUNLOG; * if need arises (what I doubt...) -- rgerhards, 2008-01-25 */ CHKiRet_Hdlr(queuePersist(pThis)) { - dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet); + dbgoprint((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet); } /* finally, clean up some simple things... */ @@ -1779,10 +1738,10 @@ RUNLOG_VAR("%d", pThis->bRunsDA); /* wait for the queue to be ready... */ while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { - dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); timeoutComp(&t, pThis->toEnq); if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { - dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } @@ -1798,7 +1757,7 @@ finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { d_pthread_mutex_unlock(pThis->mut); i = pthread_cond_signal(&pThis->notEmpty); - dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); + dbgoprint((obj_t*) pThis, "EnqueueMsg signaled condition (%d)\n", i); pthread_setcancelstate(iCancelStateSave, NULL); } @@ -1842,8 +1801,7 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex) if(bEnqOnly == 1) { /* switch to enqueue-only mode */ /* this means we need to terminate all workers - that's it... */ - dbgprintf("Queue 0x%lx: switching to enqueue-only mode, terminating all worker threads\n", - queueGetID(pThis)); + dbgoprint((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n"); if(pThis->pWtpReg != NULL) wtpWakeupAllWrkr(pThis->pWtpReg); if(pThis->pWtpDA != NULL) |