summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-16 10:15:42 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-16 10:15:42 +0000
commit19c9b187ab29f9304adb82d9c6005c69c92b3c17 (patch)
tree4a9b1c35963e15bcbf9426fb7037880cc6747f62 /queue.c
parent22b9dc1af11c3fdfdf9218fb48e15aedf9a342b3 (diff)
downloadrsyslog-19c9b187ab29f9304adb82d9c6005c69c92b3c17.tar.gz
rsyslog-19c9b187ab29f9304adb82d9c6005c69c92b3c17.tar.xz
rsyslog-19c9b187ab29f9304adb82d9c6005c69c92b3c17.zip
cleaned up queue disk startup
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c60
1 files changed, 32 insertions, 28 deletions
diff --git a/queue.c b/queue.c
index 88e41168..00f6e63f 100644
--- a/queue.c
+++ b/queue.c
@@ -127,6 +127,7 @@ queueStrtWrkThrd(queue_t *pThis, int i)
ISOBJ_TYPE_assert(pThis, queue);
assert(i >= 0 && i <= pThis->iNumWorkerThreads);
+ assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRDCMD_RUN);
queueTellWrkThrd(pThis, i, eWRKTHRDCMD_RUN);
iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
@@ -1441,50 +1442,50 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
DEFiRet;
int i;
rsRetVal iRetLocal;
+ int bInitialized = 0; /* is queue already initialized? */
assert(pThis != NULL);
/* call type-specific constructor */
- CHKiRet(pThis->qConstruct(pThis));
+ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
- dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", queueGetID(pThis), pThis->qType,
- pThis->iMaxFileSize);
+ dbgprintf("Queue 0x%lx: type %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis), pThis->qType,
+ pThis->bIsDA, pThis->iMaxFileSize);
if(pThis->qType != QUEUETYPE_DIRECT) {
if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- /* worker 0 is reserved for disk-assisted mode */
- queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_NEVER_RAN);
+ if(pThis->bIsDA) {
+ /* If we are disk-assisted, we need to check if there is a QIF file
+ * which we need to load. -- rgerhards, 2008-01-15
+ */
+ iRetLocal = queueHaveQIF(pThis);
+ if(iRetLocal == RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n",
+ queueGetID(pThis));
- /* fire up the worker threads */
- for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) {
- queueStrtWrkThrd(pThis, i);
- }
- }
+ /* indicate we now run in DA mode - this is reset by the DA worker if it fails */
+ pThis->qRunsDA = QRUNS_DA_INIT;
- if(pThis->bIsDA) {
- /* If we are disk-assisted, we need to check if there is a QIF file
- * which we need to load. -- rgerhards, 2008-01-15
- */
- iRetLocal = queueHaveQIF(pThis);
-dbgprintf("HaveQIF %d\n", iRet);
- if(iRetLocal == RS_RET_OK) {
-dbgprintf("need to restore disk queue\n");
- // code below to function!
- /* if we reach this point, we are NOT currently running in DA mode.
- * TODO: split this function, I think that would make the code easier
- * to read. -- rgerhards, 2008-10-15
- */
- dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n",
- queueGetID(pThis));
+ /* now we must start our DA worker thread - it does the rest of the initialization */
+ CHKiRet(queueStrtWrkThrd(pThis, 0));
+ bInitialized = 1;
+ }
+ }
- pThis->qRunsDA = QRUNS_DA_INIT; /* indicate we now run in DA mode - this is reset by the DA worker if it fails */
+ if(!bInitialized) {
+ dbgprintf("Queue 0x%lx: queue starts up without loading any disk state\n", queueGetID(pThis));
+ /* worker 0 is reserved for disk-assisted mode, so do not start */
+ queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_NEVER_RAN);
- /* now we must start our DA worker thread - it does the rest of the initialization */
- CHKiRet(queueStrtWrkThrd(pThis, 0));
+ /* fire up the worker threads */
+ for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) {
+ queueStrtWrkThrd(pThis, i);
+ }
}
}
+
finalize_it:
return iRet;
}
@@ -1613,6 +1614,9 @@ rsRetVal queueDestruct(queue_t *pThis)
/* if running DA, tell the DA workers to shut down. This saves us some CPU cycles which
* we can use to persist the remaining in-memory data to disk quicker. -- rgerhads, 2008-01-16
+ * TODO: we actually need to change the queue to an "input-only" mode, that also prevents
+ * startup of the thread again further down in the process. None of that really hurts, so we
+ * leave it for the time being. -- rgerhards, 2008-01-16
*/
if(pThis->qRunsDA != QRUNS_REGULAR)
queueWrkThrdReqTrm(pThis->pqDA, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 0);