summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-20 15:06:32 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-20 15:06:32 +0200
commitff6963d6f6d85c6c10e80b17da8432bb983f3e38 (patch)
tree8da1df1e854c8aa53ea9bcbd15f2539f0c185d79
parent541696f307e9facc80f2aa153147a9456d41d3f9 (diff)
downloadrsyslog-ff6963d6f6d85c6c10e80b17da8432bb983f3e38.tar.gz
rsyslog-ff6963d6f6d85c6c10e80b17da8432bb983f3e38.tar.xz
rsyslog-ff6963d6f6d85c6c10e80b17da8432bb983f3e38.zip
simplified startup of queue DA mode
-rw-r--r--runtime/queue.c73
-rw-r--r--runtime/queue.h1
-rw-r--r--runtime/wti.c2
3 files changed, 9 insertions, 67 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 50bfaca9..bb988c86 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -258,30 +258,6 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
}
-/* wait until we have a fully initialized DA queue. Sometimes, we need to
- * sync with it, as we expect it for some function. Note that in extreme
- * cases, the DA queue may already have started up AND terminated when we
- * call this function. As such,it may validly be that DA is already shut down.
- * So we just check if we are in init phase and then wait for full startup.
- * If in non-DA mode, we silently return.
- * IMPORTANT: the QUEUE MUTEX MUST BE LOOKED WHEN this funnction is called!
- * rgerhards, 2008-02-27
- */
-static rsRetVal
-qqueueWaitDAModeInitialized(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- while(pThis->bRunsDA == 1) {
- d_pthread_cond_wait(&pThis->condDAReady, pThis->mut);
- }
-
- RETiRet;
-}
-
-
/* Destruct DA queue. This is the last part of DA-to-normal-mode
* transistion. This is called asynchronously and some time quite a
* while after the actual transistion. The key point is that we need to
@@ -301,10 +277,6 @@ TurnOffDAMode(qqueue_t *pThis)
ASSERT(pThis->bRunsDA);
if(getLogicalQueueSize(pThis->pqDA) == 0) {
pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
- /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
- * this will be quick.
- */
-//XXX: TODO qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
iRet);
}
@@ -356,9 +328,6 @@ StartDA(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
- if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */
- FINALIZE; /* ... then we are already done! */
-
/* create message queue */
CHKiRet(qqueueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
@@ -404,9 +373,7 @@ StartDA(qqueue_t *pThis)
if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
FINALIZE; /* something is wrong */
- pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */
- pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */
- pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
+ //pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
qqueueGetID(pThis->pqDA));
@@ -455,7 +422,6 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA));
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) StartDA));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
@@ -467,9 +433,15 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
/* if we reach this point, we have a "good" DA worker pool */
/* indicate we now run in DA mode - this is reset by the DA worker if it fails */
- pThis->bRunsDA = 1;
pThis->bDAEnqOnly = bEnqOnly;
+ /* now construct the actual queue (if it does not already exist) */
+ if(pThis->pqDA == NULL) {
+ CHKiRet(StartDA(pThis));
+ }
+
+ pThis->bRunsDA = 1;
+
/* now we must now adivse the wtp that we need one worker. If none is yet active,
* that will also start one up. If we forgot that step, everything would be stalled
* until the next enqueue request.
@@ -1168,11 +1140,6 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
}
}
-
- /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
- if(pThis->bRunsDA) {
- qqueueWaitDAModeInitialized(pThis);
- }
d_pthread_mutex_unlock(pThis->mut);
/* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found
@@ -2024,7 +1991,6 @@ RegOnWrkrShutdown(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pqParent != NULL) {
- pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
if(pThis->pqParent->pWtpDA != NULL) { /* see comment in function header from 2008-02-27 */
wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
}
@@ -2034,24 +2000,6 @@ RegOnWrkrShutdown(qqueue_t *pThis)
}
-/* The following function is called when a regular queue worker starts up. We need this
- * hook to indicate in the parent queue (if we are a child) that we are not done yet.
- */
-static rsRetVal
-RegOnWrkrStartup(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- if(pThis->pqParent != NULL) {
- pThis->pqParent->bChildIsDone = 0;
- }
-
- RETiRet;
-}
-
-
/* start up the queue - it must have been constructed and parameters defined
* before.
*/
@@ -2114,7 +2062,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
@@ -2287,11 +2234,9 @@ DoSaveOnShutdown(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */
-dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
- if(pThis->bRunsDA != 2) {
+ if(!pThis->bRunsDA) {
InitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
- qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */
}
d_pthread_mutex_unlock(pThis->mut);
/* make sure we do not timeout before we are done */
diff --git a/runtime/queue.h b/runtime/queue.h
index 7b10e5dd..73c62b52 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -121,7 +121,6 @@ typedef struct queue_s {
pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */
- int bChildIsDone; /* set to 1 when the child DA queue has finished processing, 0 otherwise */
int bThrdStateChanged; /* at least one thread state has changed if 1 */
/* end sync variables */
/* the following variables are always present, because they
diff --git a/runtime/wti.c b/runtime/wti.c
index c295ccc9..9d0560dd 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -250,9 +250,7 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
- d_pthread_mutex_lock(pWtp->pmutUsr);
pWtp->pfOnWorkerStartup(pWtp->pUsr);
- d_pthread_mutex_unlock(pWtp->pmutUsr);
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */