summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-18 23:24:51 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-18 23:24:51 +0000
commitfabcb72a0994cd832cc1a5019123cfec35ef0b82 (patch)
treedf0c126dd9fb19e6c9cd79698094f60fbfd3e2a6 /queue.c
parent2bd1e283527bae01d61b85682a7e8ecc778997a8 (diff)
downloadrsyslog-fabcb72a0994cd832cc1a5019123cfec35ef0b82.tar.gz
rsyslog-fabcb72a0994cd832cc1a5019123cfec35ef0b82.tar.xz
rsyslog-fabcb72a0994cd832cc1a5019123cfec35ef0b82.zip
saving state
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c174
1 files changed, 102 insertions, 72 deletions
diff --git a/queue.c b/queue.c
index 6ac69c11..c670c8ce 100644
--- a/queue.c
+++ b/queue.c
@@ -1,3 +1,4 @@
+// TODO: think about mutDA - I think it's no longer needed
// TODO: start up the correct num of workers when switching to non-DA mode
// TODO: "preforked" worker threads
// TODO: do an if(debug) in dbgrintf - performance in release build!
@@ -64,6 +65,17 @@ static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly);
/* methods */
+
+/* cancellation cleanup handler - frees provided mutex
+ * rgerhards, 2008-01-14
+ */
+static void queueMutexCleanup(void *arg)
+{
+ assert(arg != NULL);
+ pthread_mutex_unlock((pthread_mutex_t*) arg);
+}
+
+
/* get the current worker state. For simplicity and speed, we have
* NOT used our regular calling interface this time. I hope that won't
* bite in the long term... -- rgerhards, 2008-01-17
@@ -95,12 +107,14 @@ dbgprintf("Queue 0x%lx: trying to send command %d to thread %d\n", queueGetID(p
switch(tCmd) {
case eWRKTHRD_TERMINATING:
pthread_cond_destroy(&pThis->condInitDone);
+ pthread_mutex_destroy(&pThis->mut);
dbgprintf("Queue 0x%lx/w%d: thread terminating with %d entries left in queue, %d workers running.\n",
queueGetID(pThis->pQueue), pThis->iThrd, pThis->pQueue->iQueueSize,
pThis->pQueue->iCurNumWrkThrd);
break;
case eWRKTHRD_RUN_CREATED:
pthread_cond_init(&pThis->condInitDone, NULL);
+ pthread_mutex_init(&pThis->mut, NULL);
break;
case eWRKTHRD_RUN_INIT:
break;
@@ -184,7 +198,7 @@ qWrkrWaitStartup(qWrkThrd_t *pThis)
dbgprintf("Queue 0x%lx: waiting on worker thread %d startup\n", queueGetID(pThis->pQueue),
pThis->iThrd);
pthread_cond_wait(&pThis->condInitDone, pThis->pQueue->mut);
-dbgprintf("startup done!\n");
+dbgprintf("worker startup done!\n");
}
pthread_mutex_unlock(pThis->pQueue->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -287,6 +301,30 @@ queueStrtWrkThrd(queue_t *pThis, int i)
}
+/* start the DA worker thread (if not already running)
+ */
+static inline rsRetVal
+queueStrtDAWrkr(queue_t *pThis)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+dbgprintf("Queue %p: DAWrkr thread mutex lock\n", pThis);
+ pthread_mutex_lock(&pThis->pWrkThrds[0].mut);
+ pthread_cleanup_push(queueMutexCleanup, &pThis->pWrkThrds[0].mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ if(pThis->pWrkThrds[0].tCurrCmd == eWRKTHRD_STOPPED) {
+ iRet = queueStrtWrkThrd(pThis, 0);
+ }
+ pthread_cleanup_pop(1);
+dbgprintf("Queue %p: DAWrkr thread mutex unlock\n", pThis);
+
+ return iRet;
+}
+
/* Starts a *new* worker thread. Function searches itself for a free index spot. It must only
* be called when we have less than max workers active. Pending wrkr thread requests MUST have
* been processed before calling this function. -- rgerhards, 2008-01-16
@@ -399,7 +437,7 @@ queueWakeupWrkThrds(queue_t *pThis, int bWithDAWrk)
}
-/* This function Checks if (another) worker threads needs to be started. It
+/* This function checks if (another) worker threads needs to be started. It
* must be called while the caller holds a lock on the queue mutex. So it must not
* do anything that either reaquires the mutex or forces somebody else to aquire
* it (that would lead to a deadlock).
@@ -436,7 +474,7 @@ dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qR
dbgprintf("Queue %p: DA worker is no longer running, restarting, qsize %d, workers %d, qRunsDA: %d\n",
pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA);
/* DA worker has timed out and needs to be restarted */
- iRet = queueStrtWrkThrd(pThis, 0);
+ iRet = queueStrtDAWrkr(pThis);
}
}
@@ -476,7 +514,10 @@ queueTurnOffDAMode(queue_t *pThis)
* messages come into the queue, we may be well off with a single worker.
* rgerhards, 2008-01-16
*/
- if(pThis->bEnqOnly == 0 && pThis->bQueueInDestruction == 0)
+dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQueInDestr %d, NumWrkd %d\n",
+ queueGetID(pThis),
+ pThis->bEnqOnly,pThis->bQueueInDestruction,pThis->iCurNumWrkThrd);
+ if(pThis->bEnqOnly == 0 && pThis->bQueueInDestruction == 0 && pThis->iCurNumWrkThrd < 2)
queueStrtNewWrkThrd(pThis);
pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */
@@ -587,6 +628,7 @@ queueStrtDA(queue_t *pThis)
pThis->pqDA->mutSignalOnEmpty = &pThis->mutDA;
pThis->pqDA->condSignalOnEmpty2 = pThis->notEmpty;
pThis->pqDA->bSignalOnEmpty = 2;
+ pThis->pqDA->pqParent = pThis;
CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
@@ -665,7 +707,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly)
* In enqueue-only mode, we do not start any workers.
*/
if(pThis->bEnqOnly == 0)
- iRet = queueStrtWrkThrd(pThis, 0);
+ iRet = queueStrtDAWrkr(pThis);
return iRet;
}
@@ -1302,16 +1344,6 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
return iRet;
}
-
-/* cancellation cleanup handler - frees provided mutex
- * rgerhards, 2008-01-14
- */
-static void queueMutexCleanup(void *arg)
-{
- assert(arg != NULL);
- pthread_mutex_unlock((pthread_mutex_t*) arg);
-}
-
/* This is a special consumer to feed the disk-queue in disk-assited mode.
* When active, our own queue more or less acts as a memory buffer to the disk.
* So this consumer just needs to drain the memory queue and submit entries
@@ -1489,8 +1521,9 @@ queueWorkerRemainActive(queue_t *pThis, qWrkThrd_t *pWrkrInst)
|| ((qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && (pThis->iQueueSize > 0)));
dbgprintf("Queue %p/w%d: chk 1 pre empty queue, qsize %d, cont run: %d, cmd %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize, b, qWrkrGetState(pWrkrInst));
if(b && pWrkrInst->iThrd == 0 && pThis->qRunsDA == QRUNS_DA) {
- queueGetQueueSize(pThis->pqDA, &iSizeDAQueue);
- b = pThis->iQueueSize >= pThis->iHighWtrMrk || iSizeDAQueue != 0;
+// queueGetQueueSize(pThis->pqDA, &iSizeDAQueue);
+// b = pThis->iQueueSize >= pThis->iHighWtrMrk || iSizeDAQueue != 0;
+ b = pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->pqDA->iQueueSize != 0;
}
dbgprintf("Queue %p/w%d: pre empty queue, qsize %d, cont run: %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize, b);
@@ -1578,13 +1611,23 @@ dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n",
if(pThis->iQueueSize == 0) {
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
queueGetID(pThis), iMyThrdIndx);
- /* TODO: check if the parent DA worker is running and, if not, initiate it */
+ /* check if the parent DA worker is running and, if not, initiate it. Thanks
+ * to queueStrtDAWrkr (), we do not actually need to check (that routines does
+ * that for us, but we need to aquire the parent queue's mutex to call it.
+ */
+ if(pThis->pqParent != NULL) {
+ dbgprintf("Queue %p: pre start parent %p worker\n", pThis, pThis->pqParent);
+ queueStrtDAWrkr(pThis->pqParent);
+ }
+
if(pThis->bSignalOnEmpty > 0) {
/* we need to signal our parent queue that we are empty */
dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx);
- pthread_mutex_lock(pThis->mutSignalOnEmpty);
+ //pthread_mutex_lock(pThis->mutSignalOnEmpty); // TODO: this was commented out
+ pthread_mutex_lock(pThis->pqParent->mut); // TODO: this was commented out
pthread_cond_signal(pThis->condSignalOnEmpty);
- pthread_mutex_unlock(pThis->mutSignalOnEmpty);
+ //pthread_mutex_unlock(pThis->mutSignalOnEmpty); // TODO: this was commented out
+ pthread_mutex_unlock(pThis->pqParent->mut); // TODO: this was commented out
dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx);
}
if(pThis->bSignalOnEmpty > 1) {
@@ -1596,7 +1639,8 @@ dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n",
* iQueueSize and tCmd have not changed since the while().
*/
dbgprintf("Queue %p/w%d: pre condwait ->notEmpty, worker shutdown %d\n", pThis, iMyThrdIndx, pThis->toWrkShutdown);
- if(pThis->toWrkShutdown == -1) {
+ /* DA worker and first worker never have an inactivity timeout */
+ if(pWrkrInst->iThrd < 2 || pThis->toWrkShutdown == -1) {
dbgprintf("worker never times out!\n");
/* never shut down any started worker */
pthread_cond_wait(pThis->notEmpty, pThis->mut);
@@ -1804,11 +1848,13 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
if(!bInitialized) {
dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis));
/* fire up the worker threads */
+ queueStrtNewWrkThrd(pThis);
// TODO: preforked workers! queueStrtAllWrkThrds(pThis);
}
pThis->bQueueStarted = 1;
finalize_it:
+dbgprintf("queueStart() exit, iret %d\n", iRet);
return iRet;
}
@@ -1923,68 +1969,52 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
- /* optimize parameters for shutdown of DA-enabled queues */
- if(pThis->bIsDA) {
-dbgprintf("IsDA queue, modifying params for draining\n");
- pThis->iHighWtrMrk = 1; /* make sure we drain */
- pThis->iLowWtrMrk = 0; /* disable low water mark algo */
- if(pThis->qRunsDA == QRUNS_REGULAR) {
- if(pThis->iQueueSize > 0) {
- queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* initiate DA mode */
+ /* we do not need to take care of any messages left in queue if we are in enqueue only mode */
+ if(!pThis->bEnqOnly) {
+ /* in regular mode, need look at termination */
+ /* optimize parameters for shutdown of DA-enabled queues */
+ if(pThis->bIsDA && pThis->iQueueSize > 0) { // TODO: atomic iQueueSize!
+ dbgprintf("IsDA queue, modifying params for draining\n");
+ pThis->iHighWtrMrk = 1; /* make sure we drain */
+ pThis->iLowWtrMrk = 0; /* disable low water mark algo */
+ if(pThis->qRunsDA == QRUNS_REGULAR) {
+ if(pThis->iQueueSize > 0) {
+ queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* initiate DA mode */
+ }
+ } else {
+ queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */
}
- } else {
- queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */
- }
- if(pThis->bSaveOnShutdown) {
-dbgprintf("bSaveOnShutdown set, eternal timeout set\n");
- pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL;
+ if(pThis->bSaveOnShutdown) {
+ dbgprintf("bSaveOnShutdown set, eternal timeout set\n");
+ pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL;
+ }
+ /* now we need to activate workers (read doc/dev_queue.html) */
}
- /* now we need to activate workers (read doc/dev_queue.html) */
- }
- // TODO: we may need to startup a regular worker if not in DA mode!
- /* wait until all pending workers are started up */
- qWrkrWaitAllWrkrStartup(pThis);
+ /* wait until all pending workers are started up */
+ qWrkrWaitAllWrkrStartup(pThis);
- /* terminate our own worker threads */
- if(pThis->pWrkThrds != NULL) {
- queueShutdownWorkers(pThis);
- }
+ // We need to startup a worker if we are in non-DA mode and the queue is not empty and not in enque-only mode */
+ dbgprintf("Queue %p: queueDestruct probing if any regular workers need to be started, CurWrkr %d, qsize %d, qRunsDA %d\n",
+ pThis, pThis->iCurNumWrkThrd, pThis->iQueueSize, pThis->qRunsDA);
+ pthread_mutex_lock(pThis->mut);
+ dbgprintf("queueDestruct mutex locked\n");
+ if(pThis->iCurNumWrkThrd == 0 && pThis->iQueueSize > 0 && !pThis->bEnqOnly) {
+ dbgprintf("Queue %p: queueDestruct must start regular workers!\n", pThis);
+ queueStrtNewWrkThrd(pThis);
+ }
+ pthread_mutex_unlock(pThis->mut);
+ dbgprintf("queueDestruct mutex unlocked\n");
-#if 0
- /* if running DA, switch the DA queue to enqueue-only mode. That saves us some CPU cycles as
- * its workers do no longer need to run. It also prevents longer-running actions to spring into
- * existence while we are draining the main (memory) queue. -- rgerhads, 2008-01-16
- */
- if(pThis->qRunsDA != QRUNS_REGULAR) {
- queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */
- if(pThis->bSaveOnShutdown)
- pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL;
+ /* wait again in case a new worker was started */
+ qWrkrWaitAllWrkrStartup(pThis);
}
- /* then, terminate our own worker threads */
+ /* terminate our own worker threads */
if(pThis->pWrkThrds != NULL) {
queueShutdownWorkers(pThis);
}
- /* If we currently run in DA mode, the in-memory queue is already persisted to disk.
- * If we are not in DA mode, we may have data left in in-memory queues, this data will
- * be lost if we do not persist it to a disk queue. So, if configured to do so, we will
- * now start DA mode just to drain our queue. -- rgerhards, 2008-01-16
- * TODO: move to persist function!
- */
- if(pThis->iQueueSize > 0 && pThis->bSaveOnShutdown && pThis->bIsDA) {
- dbgprintf("Queue 0x%lx: in-memory queue contains %d entries after worker shutdown - using DA to save to disk\n",
- queueGetID(pThis), pThis->iQueueSize);
- pThis->iLowWtrMrk = 0; /* disable low water mark algo */
- pThis->iHighWtrMrk = 1; /* make sure we drain */
- queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* start DA queue in enqueue-only mode */
- qWrkrWaitStartup(QUEUE_PTR_DA_WORKER(pThis)); /* wait until DA worker has actually started */
- pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL;
- queueShutdownWorkers(pThis); /* and tell it to shut down. The trick is it will run until q is drained */
- }
-#endif
-
/* if still running DA, terminate disk queue */
if(pThis->qRunsDA != QRUNS_REGULAR)
queueDestruct(&pThis->pqDA);