summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-15 11:04:46 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-15 11:04:46 +0000
commit8f5c0764aaafc9eab72d20761ecba6a27d321f89 (patch)
tree5ce4356474a84b37598813398350b7b588ed6894 /queue.c
parent75b645f16b930f142b777b00b529fb726ef10243 (diff)
downloadrsyslog-8f5c0764aaafc9eab72d20761ecba6a27d321f89.tar.gz
rsyslog-8f5c0764aaafc9eab72d20761ecba6a27d321f89.tar.xz
rsyslog-8f5c0764aaafc9eab72d20761ecba6a27d321f89.zip
disk assisted queue works quite well, except for startup from disk queue
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c232
1 files changed, 204 insertions, 28 deletions
diff --git a/queue.c b/queue.c
index 10a560d2..9bc39464 100644
--- a/queue.c
+++ b/queue.c
@@ -1,3 +1,4 @@
+// TODO: do an if(debug) in dbgrintf - performanc ein release build!
// TODO: remove bIsDA?
// TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in
// call consumer state. Facilitates retaining messages in queue until action could
@@ -71,6 +72,28 @@ queueTellWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
}
+/* join a specific worker thread
+ */
+static inline rsRetVal
+queueJoinWrkThrd(queue_t *pThis, int iIdx)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads);
+ assert(pThis->pWrkThrds[iIdx].tCurrCmd != eWRKTHRDCMD_NEVER_RAN);
+
+ dbgprintf("Queue 0x%lx: thread %d state %d, waiting for exit\n", queueGetID(pThis), iIdx,
+ pThis->pWrkThrds[iIdx].tCurrCmd);
+ pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL);
+ pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRDCMD_NEVER_RAN; /* back to virgin... */
+ dbgprintf("Queue 0x%lx: thread %d state %d, exited\n", queueGetID(pThis), iIdx,
+ pThis->pWrkThrds[iIdx].tCurrCmd);
+
+ return iRet;
+}
+
+
/* Starts a worker thread (on a specific index [i]!)
*/
static inline rsRetVal
@@ -113,6 +136,23 @@ queueTellWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd)
return iRet;
}
+/* start all regular worker threads
+ * rgerhards, 2008-01-15
+ */
+static inline rsRetVal
+queueStrtAllWrkThrds(queue_t *pThis)
+{
+ DEFiRet;
+ int i;
+
+ /* fire up the worker threads */
+ for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) {
+ queueStrtWrkThrd(pThis, i);
+ }
+
+ return iRet;
+}
+
/* compute an absolute time timeout suitable for calls to pthread_cond_timedwait()
* rgerhards, 2008-01-14
@@ -137,6 +177,80 @@ queueTimeoutComp(struct timespec *pt, int iTimeout)
/* --------------- code for disk-assisted (DA) queue modes -------------------- */
+/* Destruct DA queue. This is the last part of DA-to-normal-mode
+ * transistion. This is called asynchronously and some time quite a
+ * while after the actual transistion. The key point is that we need to
+ * do it at some later time, because we need to destruct the DA queue. That,
+ * however, can not be done in a thread that has been signalled
+ * This is to be called when we revert back to our own queue.
+ * rgerhards, 2008-01-15
+ */
+static inline rsRetVal
+queueTurnOffDAMode(queue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ assert(pThis->bRunsDA == 1);
+
+ /* pull any data that we still need from the (child) disk queue... */
+ pThis->pConsumer = pThis->pqDA->pConsumer; /* restore regular consumer */
+
+ queueStrtAllWrkThrds(pThis); /* restore our regular worker threads */
+ pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
+
+ /* note: a disk queue alsways has a single worker and it alwas has the ID 1 */
+ queueTellWrkThrd(pThis->pqDA, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* tell the DA worker to terminate... */
+ pthread_mutex_unlock(&pThis->mutDA); /* ... permit it to run ... */
+ queueJoinWrkThrd(pThis->pqDA, 1); /* ... and wait for the shutdown to happen */
+ queueDestruct(pThis->pqDA); /* and now we are ready to destruct the DA queue */
+ pThis->pqDA = NULL;
+ queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */
+
+ dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
+ queueGetID(pThis), iRet);
+
+ return iRet;
+}
+
+/* check if we had any worker thread changes and, if so, act
+ * on them. At a minimum, terminated threads are harvested (joined).
+ */
+static rsRetVal
+queueChkWrkThrdChanges(queue_t *pThis)
+{
+ DEFiRet;
+ int i;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+ if(pThis->bThrdStateChanged == 0)
+ FINALIZE;
+
+ /* go through all threads (including DA thread) */
+ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
+ switch(pThis->pWrkThrds[i].tCurrCmd) {
+ case eWRKTHRDCMD_TERMINATED:
+ queueJoinWrkThrd(pThis, i);
+ break;
+ case eWRKTHRDCMD_TURN_OFF_DA_MODE:
+ queueTurnOffDAMode(pThis);
+ break;
+ /* these cases just to satisfy the compiler, we do not act an them: */
+ case eWRKTHRDCMD_NEVER_RAN:
+ case eWRKTHRDCMD_RUN:
+ case eWRKTHRDCMD_SHUTDOWN:
+ case eWRKTHRDCMD_SHUTDOWN_IMMEDIATE:
+ /* DO NOTHING */
+ break;
+ }
+ }
+
+finalize_it:
+ return iRet;
+}
+
+
/* check if we run in disk-assisted mode and record that
* setting for easy (and quick!) access in the future. This
* function must only be called from constructors and only
@@ -161,7 +275,6 @@ queueChkIsDA(queue_t *pThis)
}
-
/* This is a special consumer to feed the disk-queue in disk-assited mode.
* When active, our own queue more or less acts as a memory buffer to the disk.
* So this consumer just needs to drain the memory queue and submit entries
@@ -175,6 +288,7 @@ static inline rsRetVal
queueDAConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr)
{
DEFiRet;
+ int iCancelStateSave;
ISOBJ_TYPE_assert(pThis, queue);
ISOBJ_assert(pUsr);
@@ -186,13 +300,27 @@ dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx,
if(pThis->iQueueSize == pThis->iLowWtrMrk) {
dbgprintf("Queue 0x%lx: %d entries - passed low water mark in DA mode, sleeping\n",
queueGetID(pThis), pThis->iQueueSize);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+dbgprintf("pre mutex lock (think about CLEANUP!)\n");
pthread_mutex_lock(&pThis->mutDA);
dbgprintf("mutex locked (think about CLEANUP!)\n");
pthread_cond_wait(&pThis->condDA, &pThis->mutDA);
dbgprintf("condition returned\n");
- pthread_mutex_unlock(&pThis->mutDA);
+ /* we are back. either we have passed the high water mark or the child disk queue
+ * is empty. We check for the later. If that is the case, we switch back to
+ * non-DA mode
+ */
+ if(pThis->pqDA->iQueueSize == 0) {
+ dbgprintf("Queue 0x%lx: %d entries - disk assisted child queue signaled it is empty\n",
+ queueGetID(pThis), pThis->iQueueSize);
+ CHKiRet(queueTurnOffDAMode(pThis)); /* this also unlocks the mutex! */
+ } else {
+ pthread_mutex_unlock(&pThis->mutDA);
+ }
dbgprintf("mutex unlocked (think about CLEANUP!)\n");
+ pthread_setcancelstate(iCancelStateSave, NULL);
}
+dbgprintf("DAConsumer returns\n");
finalize_it:
return iRet;
@@ -206,28 +334,40 @@ static rsRetVal
queueChkStrtDA(queue_t *pThis)
{
DEFiRet;
+ int iCancelStateSave;
ISOBJ_TYPE_assert(pThis, queue);
+ if(pThis->bRunsDA) {
+ if(pThis->iQueueSize < pThis->iHighWtrMrk)
+ pThis->bWasBelowHighWtr = 1;
+ else if(pThis->iQueueSize == pThis->iHighWtrMrk && pThis->bWasBelowHighWtr) {
+ /* then we need to signal that we are at the high water mark again.*/
+ dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n",
+ queueGetID(pThis), pThis->iQueueSize);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ pthread_mutex_lock(&pThis->mutDA);
+ pthread_cond_signal(&pThis->condDA);
+ pthread_mutex_unlock(&pThis->mutDA);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */
+ }
+ /* we need to re-check if we run disk-assisted, because that status may have changed
+ * in our high water mark processing.
+ */
+ if(pThis->bRunsDA)
+ FINALIZE;
+ }
+
+ /* we run into this part if we are NOT currently running DA.
+ * TODO: split this function, I think that would make the code easier
+ * to read. -- rgerhards, 2008-10-15
+ */
/* if we do not hit the high water mark, we have nothing to do */
if(pThis->iQueueSize != pThis->iHighWtrMrk)
ABORT_FINALIZE(RS_RET_OK);
- if(pThis->bRunsDA) {
- /* then we need to signal that we are at the high water mark again. The DA
- * consumer shall check if it needs to restart. Note that we may pass the
- * high water mark while we drain the queue.
- * TODO: problem here? (condition signalled on drain...)
- */
- dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n",
- queueGetID(pThis), pThis->iQueueSize);
- pthread_mutex_lock(&pThis->mutDA);
- pthread_cond_signal(&pThis->condDA);
- pthread_mutex_unlock(&pThis->mutDA);
- FINALIZE;
- }
-
dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n",
queueGetID(pThis), pThis->iQueueSize);
@@ -263,6 +403,7 @@ queueChkStrtDA(queue_t *pThis)
* so we now need to change our consumer to utilize it.
*/
pThis->bRunsDA = 1; /* and that's all we need to do - the worker handles the rest ;) */
+ pThis->bWasBelowHighWtr = 0;/* init to be sure */
/* now we must start our DA worker thread and shutdown all others */
CHKiRet(queueStrtWrkThrd(pThis, 0));
@@ -723,6 +864,9 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
/* awake them... */
pthread_cond_broadcast(pThis->notEmpty);
+dbgprintf("queueWrkThrdTrm broadcasted notEmpty\n");
+ if(pThis->bRunsDA) /* if running disk-assisted, workers may wait on that condition, too */
+ pthread_cond_broadcast(&pThis->condDA);
/* get timeout */
queueTimeoutComp(&t, iTimeout);
@@ -761,6 +905,8 @@ queueWrkThrdCancel(queue_t *pThis)
/* awake the workers one more time, just to be sure */
pthread_cond_broadcast(pThis->notEmpty);
+ if(pThis->bRunsDA) /* if running disk-assisted, workers may wait on that condition, too */
+ pthread_cond_broadcast(&pThis->condDA);
/* first tell the workers our request */
for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i)
@@ -816,8 +962,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
*/
for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_NEVER_RAN) {
- dbgprintf("Queue 0x%lx: joining worker thread %d\n", queueGetID(pThis), i);
- pthread_join(pThis->pWrkThrds[i].thrdID, NULL);
+ queueJoinWrkThrd(pThis, i);
}
}
@@ -872,6 +1017,7 @@ queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr)
}
}
+dbgprintf("CallConsumer returns\n");
return iRet;
}
@@ -889,6 +1035,10 @@ queueWorker(void *arg)
sigset_t sigSet;
int iMyThrdIndx; /* index for this thread in queue thread table */
int iCancelStateSave;
+ int bInitialEmpty = 1; /* if running as a DA child, we do NOT need to signal the parent
+ * on the first occasion we are empty (because that happens on every
+ * startup. This var keeps track of the state.
+ */
assert(pThis != NULL);
@@ -915,17 +1065,33 @@ queueWorker(void *arg)
|| (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) {
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
- while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
+ while(pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
queueGetID(pThis), iMyThrdIndx);
if(pThis->bSignalOnEmpty) {
- /* we need to signal our parent queue that we are empty */
+ if(bInitialEmpty == 1) {
+ /* ignore */
+ bInitialEmpty = 0;
+ } else {
+ /* we need to signal our parent queue that we are empty */
dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx);
- pthread_mutex_lock(pThis->mutSignalOnEmpty);
- pthread_cond_signal(pThis->condSignalOnEmpty);
- pthread_mutex_unlock(pThis->mutSignalOnEmpty);
+ pthread_mutex_lock(pThis->mutSignalOnEmpty);
+ pthread_cond_signal(pThis->condSignalOnEmpty);
+ pthread_mutex_unlock(pThis->mutSignalOnEmpty);
+dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx);
+ /* we now need to re-check if we still shall continue to
+ * run. This is important because the parent may have changed our
+ * state. So we simply go back to the begin of the loop.
+ */
+ continue;
+ }
}
+ /* If we arrive here, we have the regular case, where we can safely assume that
+ * iQueueSize and tCmd have not changed since the while().
+ */
+dbgprintf("Queue %p/w%d: pre condwait ->notEmpty\n", pThis, iMyThrdIndx);
pthread_cond_wait(pThis->notEmpty, pThis->mut);
+dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx);
}
if(pThis->iQueueSize > 0) {
/* dequeue element (still protected from mutex) */
@@ -969,8 +1135,9 @@ dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx);
* and would be very hard to debug. The yield() is a sure fix, its performance overhead
* should be well accepted given the above facts. -- rgerhards, 2008-01-10
*/
+dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n",
+ queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd);
pthread_yield();
-
if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN) && pThis->iQueueSize > 0)
dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has "
" %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
@@ -980,7 +1147,14 @@ dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
pThis->iCurNumWrkThrd--;
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED;
+ if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN ||
+ pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN_IMMEDIATE) {
+ /* in shutdown case, we need to flag termination. All other commands
+ * have a meaning to the thread harvester, so we can not overwrite them
+ */
+ pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED;
+ }
+ pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n",
queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd);
@@ -1334,6 +1508,9 @@ queueEnqObj(queue_t *pThis, void *pUsr)
pthread_mutex_lock(pThis->mut);
}
+ /* process any pending thread requests */
+ queueChkWrkThrdChanges(pThis);
+
/* first check if we can discard anything */
if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) {
iRetLocal = objGetSeverity(pUsr, &iSeverity);
@@ -1352,13 +1529,12 @@ queueEnqObj(queue_t *pThis, void *pUsr)
/* then check if we need to add an assistance disk queue */
if(pThis->bIsDA)
CHKiRet(queueChkStrtDA(pThis));
-
+
/* and finally (try to) enqueue what is left over */
while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis));
queueTimeoutComp(&t, pThis->toEnq);
- if(pthread_cond_timedwait (pThis->notFull,
- pThis->mut, &t) != 0) {
+ if(pthread_cond_timedwait (pThis->notFull, pThis->mut, &t) != 0) {
dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", queueGetID(pThis));
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);