summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-25 19:25:46 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-25 19:25:46 +0000
commit87f0e9b5f91407418a43a06f39831febfbd4e3ad (patch)
tree810a4191b8cfd14a4a2a19399dbe894b16b5e6ae /queue.c
parent167abdb5b3fa6900edd6bbdb1cc7d586896a268c (diff)
downloadrsyslog-87f0e9b5f91407418a43a06f39831febfbd4e3ad.tar.gz
rsyslog-87f0e9b5f91407418a43a06f39831febfbd4e3ad.tar.xz
rsyslog-87f0e9b5f91407418a43a06f39831febfbd4e3ad.zip
disk-assisted queue mode finally begins to look good ;)
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c220
1 files changed, 146 insertions, 74 deletions
diff --git a/queue.c b/queue.c
index 320b3385..421fd651 100644
--- a/queue.c
+++ b/queue.c
@@ -62,7 +62,7 @@ DEFobjStaticHelpers
/* forward-definitions */
rsRetVal queueChkPersist(queue_t *pThis);
-static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly);
+static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex);
static int queueChkStopWrkrDA(queue_t *pThis);
static int queueIsIdleDA(queue_t *pThis);
static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
@@ -81,9 +81,11 @@ static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2);
* do it at some later time, because we need to destruct the DA queue. That,
* however, can not be done in a thread that has been signalled
* This is to be called when we revert back to our own queue.
+ * This function must be called with the queue mutex locked (the wti
+ * class ensures this).
* rgerhards, 2008-01-15
*/
-static inline rsRetVal
+static rsRetVal
queueTurnOffDAMode(queue_t *pThis)
{
DEFiRet;
@@ -91,31 +93,69 @@ queueTurnOffDAMode(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
assert(pThis->bRunsDA);
+ /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
+ * to wait for its startup... -- rgerhards, 2008-01-25
+ */
+ while(pThis->bRunsDA != 2) {
+ d_pthread_cond_wait(&pThis->condDAReady, pThis->mut);
+ }
/* if we need to pull any data that we still need from the (child) disk queue,
* now would be the time to do so. At present, we do not need this, but I'd like to
* keep that comment if future need arises.
*/
- /* we start at least one worker thread. If no new messages come in, this will
- * be the only one for the time being. I am not yet sure if that is acceptable.
- * To solve that issue, queueWorker () would need to check if it needs to fire
- * up addtl ones. I am not yet sure if that is justified. After all, if no new
- * messages come into the queue, we may be well off with a single worker.
- * rgerhards, 2008-01-16
+ /* we need to check if the DA queue is empty because the DA worker may simply have
+ * terminated do to no new messages arriving. That does not, however, mean that the
+ * DA queue is empty. If there is still data in that queue, we do nothing and leave
+ * that for a later incarnation of this function (it will be called multiple times
+ * during the lifetime of DA-mode, depending on how often the DA worker receives an
+ * inactivity timeout. -- rgerhards, 2008-01-25
*/
+RUNLOG_VAR("%p", pThis->pqDA);
+RUNLOG_VAR("%d", pThis->pqDA->iQueueSize);
+ if(pThis->pqDA->iQueueSize == 0) {
dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQueInDestr %d, NumWrkd %d\n",
queueGetID(pThis),
pThis->bEnqOnly,pThis->bQueueInDestruction,pThis->iCurNumWrkThrd);
- // TODO: mutex?
- pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
- /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
- * this will be quick.
- */
- queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
+ pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
+ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
+ * this will be quick.
+ */
+ queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
+ dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
+ queueGetID(pThis), iRet);
+ }
+
+ RETiRet;
+}
+
+
+
+/* returns the number of workers that should be advised at
+ * this point in time. The mutex must be locked when
+ * ths function is called. -- rgerhards, 2008-01-25
+ */
+static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
+{
+ DEFiRet;
+ int iMaxWorkers;
- dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
- queueGetID(pThis), iRet);
+ ISOBJ_TYPE_assert(pThis, queue);
+
+RUNLOG_VAR("%d", pThis->bEnqOnly);
+ if(!pThis->bEnqOnly) {
+ if(pThis->bRunsDA) {
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
+ } else {
+ if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
+ iMaxWorkers = 1;
+ } else {
+ iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1;
+ }
+ wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
+ }
+ }
RETiRet;
}
@@ -151,7 +191,10 @@ RUNLOG_VAR("%s", pThis->pszFilePrefix);
* chore of this function is to create the DA queue object. If that function fails,
* the DA worker should return with an appropriate state, which in turn should lead to
* a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this
- * function is called, else a race on pThis->bRunsDA may happen.
+ * function is called, else a number of races will happen.
+ * Please note that this function may be called *while* we in DA mode. This is due to the
+ * fact that the DA worker calls it and the DA worker may be suspended (and restarted) due
+ * to inactivity timeouts.
* rgerhards, 2008-01-15
*/
static rsRetVal
@@ -161,6 +204,9 @@ queueStartDA(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
+ if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */
+ FINALIZE; /* ... then we are already done! */
+
/* set up sync objects */
pthread_mutex_init(&pThis->mutDA, NULL);
pthread_cond_init(&pThis->condDA, NULL);
@@ -180,7 +226,7 @@ dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pT
CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
- CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly));
+ CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
if(pThis->toQShutdown == 0) {
@@ -207,7 +253,8 @@ dbgprintf("Queue %p: queueStartDA pre start\n", pThis);
*/
wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */
- pThis->bRunsDA = 1; /* we are now in DA mode! */
+ pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */
+ pthread_cond_signal(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n",
queueGetID(pThis), queueGetID(pThis->pqDA));
@@ -248,7 +295,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
* rgerhards, 2008-01-24
*/
if(pThis->pWtpDA == NULL) {
- lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx/DA", (unsigned long) pThis);
+ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx:DA", (unsigned long) pThis);
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, queueChkStopWrkrDA));
@@ -273,8 +320,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
* that will also start one up. If we forgot that step, everything would be stalled
* until the next enqueue request.
*/
- if(pThis->bEnqOnly == 0)
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* one worker only for disk queues! */
+ queueAdviseMaxWorkers(pThis);
finalize_it:
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -283,7 +329,9 @@ finalize_it:
/* check if we need to start disk assisted mode and send some signals to
- * keep it running if we are already in it.
+ * keep it running if we are already in it. It also checks if DA mode is
+ * partially initialized, in which case it waits for initialization to
+ * complete.
* rgerhards, 2008-01-14
*/
static inline rsRetVal
@@ -310,7 +358,7 @@ dbgprintf("Queue %p: chkStartDA\n", pThis);
*/
dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n",
queueGetID(pThis), pThis->iQueueSize);
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* run again [see comment above] ;) */
+ queueAdviseMaxWorkers(pThis);
} else {
/* this is the case when we are currently not running in DA mode. So it is time
* to turn it back on.
@@ -779,7 +827,6 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
{
DEFiRet;
DEFVARS_mutexProtection;
- int i;
struct timespec tTimeout;
rsRetVal iRetLocal;
@@ -788,15 +835,20 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis));
// TODO: reminder, delte after testing: do we need to modify the high wtr mark? I dont' think so 2008-01-25
+ /* we reduce the low water mark in any case. This is not absolutely necessary, but
+ * it is useful because we enable DA mode at several spots below and so we do not need
+ * to think about the low water mark each time.
+ */
+ pThis->iLowWtrMrk = 0;
+
/* first try to shutdown the queue within the regular shutdown period */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
if(pThis->iQueueSize > 0) {
if(pThis->bRunsDA) {
- /* worker threads may be inactive after reaching low water
- * mark. Lower the mark and react workers.
+ /* We may have waited on the low water mark. As it may have changed, we
+ * see if we reactivate the worker.
*/
- pThis->iLowWtrMrk = 0;
- wtpAdviseMaxWorkers(pThis->pWtpReg, 1);
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
}
}
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -843,17 +895,22 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
}
/* when we reach this point, both queues are either empty or the regular queue shutdown timeout
- * has expired. Now we need to check if we areconfigured to not loose messages. If so, we need
+ * has expired. Now we need to check if we are configured to not loose messages. If so, we need
* to persist the queue to disk (this is only possible if the queue is DA-enabled).
*/
+// TODO: what about pure disk queues and bSaveOnShutdown?
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
+RUNLOG_VAR("%d", pThis->bSaveOnShutdown);
+RUNLOG_VAR("%d", pThis->bIsDA);
+RUNLOG_VAR("%d", pThis->iQueueSize);
if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) {
+RUNLOG;
/* switch to enqueue-only mode so that no more actions happen */
if(pThis->bRunsDA == 0) {
queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
} else {
- queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* switch to enqueue-only mode */
+ queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
}
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
/* make sure we do not timeout before we are done */
@@ -870,10 +927,12 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
+RUNLOG;
/* now the primary queue is either empty, persisted to disk - or set to loose messages. So we
* can now request immediate shutdown of any remaining workers.
*/
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+RUNLOG_VAR("%d", pThis->iQueueSize);
if(pThis->iQueueSize > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
@@ -895,9 +954,10 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25
*/
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
+RUNLOG_VAR("%d", pThis->iQueueSize);
if(pThis->iQueueSize > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgprintf("Queue 0x%lx: primary queue worker threads could not be shutdown, now canceling them\n",
+ dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the primary queue\n",
queueGetID(pThis));
iRetLocal = wtpCancelAll(pThis->pWtpReg);
if(iRetLocal != RS_RET_OK) {
@@ -913,7 +973,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- dbgprintf("Queue 0x%lx: DA worker threads could not be shutdown, now canceling them\n",
+ dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n",
queueGetID(pThis));
iRetLocal = wtpCancelAll(pThis->pWtpReg);
if(iRetLocal != RS_RET_OK) {
@@ -948,8 +1008,6 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
DEFiRet;
queue_t *pThis;
-int *pBoom = NULL;
-//*pBoom = 'A';
assert(ppThis != NULL);
assert(pConsumer != NULL);
assert(iWorkerThreads >= 0);
@@ -1196,7 +1254,11 @@ static int
queueIsIdleDA(queue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
- return (pThis->iQueueSize == 0);
+RUNLOG_VAR("%d", pThis->iLowWtrMrk);
+dbgprintf("queueIsIdleDA(%p) returns %d, qsize %d\n", pThis, pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk), pThis->iQueueSize);
+ //// TODO: I think we need just a single function...
+ //return (pThis->iQueueSize == 0);
+ return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
}
/* must only be called when the queue mutex is locked, else results
* are not stable! Regular queue version
@@ -1235,6 +1297,7 @@ dbgprintf("Queue %p: I am child, use mutex %p\n", pThis, pThis->pqParent->mut);
}
pthread_mutex_init(&pThis->mutThrdMgmt, NULL);
+ pthread_cond_init (&pThis->condDAReady, NULL);
pthread_cond_init (&pThis->notFull, NULL);
pthread_cond_init (&pThis->notEmpty, NULL);
dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
@@ -1242,8 +1305,8 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
- dbgprintf("Queue 0x%lx: type %d, enq-only %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis),
- pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize);
+ dbgprintf("Queue 0x%lx: type %d, enq-only %d, disk assisted %d, maxFileSz %ld, qsize %d starting\n", queueGetID(pThis),
+ pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iQueueSize);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
@@ -1251,7 +1314,7 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
/* create worker thread pools for regular operation. The DA pool is created on an as-needed
* basis, which potentially means never under most circumstances.
*/
- lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx/Reg", (unsigned long) pThis);
+ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx:Reg", (unsigned long) pThis);
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, queueChkStopWrkrReg));
@@ -1265,20 +1328,18 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
CHKiRet(wtpConstructFinalize (pThis->pWtpReg));
/* initialize worker thread instances */
+RUNLOG_VAR("%d", pThis->bIsDA);
if(pThis->bIsDA) {
/* If we are disk-assisted, we need to check if there is a QIF file
* which we need to load. -- rgerhards, 2008-01-15
*/
iRetLocal = queueHaveQIF(pThis);
+RUNLOG_VAR("%d", iRetLocal);
if(iRetLocal == RS_RET_OK) {
dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n",
queueGetID(pThis));
-
+RUNLOG;
queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
- /* we need to start the DA worker thread so that messages will be processed. So
- * we advise the worker pool there is at least one needed. The wtp does the rest...
- */
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
bInitialized = 1; /* we are done */
} else {
// TODO: use logerror? -- rgerhards, 2008-01-16
@@ -1287,11 +1348,17 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
}
}
+RUNLOG_VAR("%d", bInitialized);
if(!bInitialized) {
- dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis));
- /* we do not fire up any worker threads here, this happens automatically when they are needed */
- // TODO: preforked workers? queueStrtAllWrkThrds(pThis);
+ dbgprintf("Queue 0x%lx: queue starts up without (loading) any DA disk state (this is normal for the DA "
+ "queue itself!)\n", queueGetID(pThis));
}
+
+ /* if the queue already contains data, we need to start the correct number of worker threads. This can be
+ * the case when a disk queue has been loaded. If we did not start it here, it would never start.
+ */
+ queueAdviseMaxWorkers(pThis);
+
pThis->bQueueStarted = 1;
finalize_it:
@@ -1411,12 +1478,21 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
/* shut down all workers (handles *all* of the persistence logic) */
- queueShutdownWorkers(pThis);
+ if(!pThis->bEnqOnly) /* in enque-only mode, we have no worker pool! */
+ queueShutdownWorkers(pThis);
+RUNLOG;
- /* finally destruct our (regular) worker thread pool */
- if(pThis->qType != QUEUETYPE_DIRECT) {
+ /* finally destruct our (regular) worker thread pool
+ * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,
+ * e.g. when they are not created in enqueue-only mode. We already check the condition
+ * as this may otherwise be very hard to find once we optimize (and have long forgotten
+ * about this condition here ;)
+ * rgerhards, 2008-01-25
+ */
+ if(pThis->qType != QUEUETYPE_DIRECT && pThis->pWtpReg != NULL) {
wtpDestruct(&pThis->pWtpReg);
}
+RUNLOG;
/* Now check if we actually have a DA queue and, if so, destruct it.
* Note that the wtp must be destructed first, it may be in cancel cleanup handler
@@ -1424,10 +1500,16 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
* data (re-queueing case). So we need to destruct the wtp first, which will make
* sure all workers have terminated.
*/
+RUNLOG_VAR("%p", pThis->pWtpDA);
if(pThis->pWtpDA != NULL) {
+RUNLOG;
wtpDestruct(&pThis->pWtpDA);
+RUNLOG_VAR("%p", pThis->pqDA);
+ }
+ if(pThis->pqDA != NULL) {
queueDestruct(&pThis->pqDA);
}
+RUNLOG;
/* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty)
* This handler is most important for disk queues, it will finally persist the necessary
@@ -1447,6 +1529,7 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
free(pThis->mut);
}
pthread_mutex_destroy(&pThis->mutThrdMgmt);
+ pthread_cond_destroy(&pThis->condDAReady);
pthread_cond_destroy(&pThis->notFull);
pthread_cond_destroy(&pThis->notEmpty);
@@ -1522,7 +1605,6 @@ queueEnqObj(queue_t *pThis, void *pUsr)
{
DEFiRet;
int iCancelStateSave;
- int iMaxWorkers;
int i;
struct timespec t;
@@ -1530,6 +1612,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
// TODO: check if queue is terminating and if so either discard message or enqeue it to the DA queue *directly*
dbgprintf("Queue %p: EnqObj() 1\n", pThis);
+RUNLOG_VAR("%d", pThis->bRunsDA);
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
* during its execution. If that is not done, race conditions occur if the
@@ -1544,25 +1627,10 @@ dbgprintf("Queue %p: EnqObj() 1\n", pThis);
/* first check if we need to discard this message (which will cause CHKiRet() to exit) */
CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
-dbgprintf("Queue %p: EnqObj() 10\n", pThis);
/* then check if we need to add an assistance disk queue */
if(pThis->bIsDA)
CHKiRet(queueChkStrtDA(pThis));
-RUNLOG_VAR("%d", pThis->bIsDA);
- /* make sure at least one worker is running. */
- if(pThis->bRunsDA) {
-RUNLOG;
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
- } else {
- if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
- iMaxWorkers = 1;
- } else {
- iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1;
- }
-RUNLOG;
- wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
- }
/* wait for the queue to be ready... */
while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
@@ -1576,6 +1644,8 @@ RUNLOG;
}
/* and finally enqueue the message */
+RUNLOG_VAR("%p", pThis);
+RUNLOG_VAR("%d", pThis->bRunsDA);
CHKiRet(queueAdd(pThis, pUsr));
queueChkPersist(pThis);
@@ -1587,6 +1657,8 @@ finalize_it:
pthread_setcancelstate(iCancelStateSave, NULL);
}
+ /* make sure at least one worker is running. */
+ queueAdviseMaxWorkers(pThis);
RETiRet;
}
@@ -1601,10 +1673,10 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static rsRetVal
-queueSetEnqOnly(queue_t *pThis, int bEnqOnly)
+queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex)
{
DEFiRet;
- int iCancelStateSave;
+ DEFVARS_mutexProtection;
ISOBJ_TYPE_assert(pThis, queue);
@@ -1612,8 +1684,7 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly)
* called, so that doesn't matter... -- rgerhards, 2008-01-16
*/
if(pThis->mut != NULL) {
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(pThis->mut);
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
}
if(bEnqOnly == pThis->bEnqOnly)
@@ -1626,8 +1697,10 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly)
/* this means we need to terminate all workers - that's it... */
dbgprintf("Queue 0x%lx: switching to enqueue-only mode, terminating all worker threads\n",
queueGetID(pThis));
- wtpWakeupAllWrkr(pThis->pWtpDA);
- wtpWakeupAllWrkr(pThis->pWtpReg);
+ if(pThis->pWtpReg != NULL)
+ wtpWakeupAllWrkr(pThis->pWtpReg);
+ if(pThis->pWtpDA != NULL)
+ wtpWakeupAllWrkr(pThis->pWtpDA);
} else {
/* switch back to regular mode */
ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */
@@ -1638,8 +1711,7 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly)
finalize_it:
if(pThis->mut != NULL) {
- d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
RETiRet;
}