summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-16 09:24:38 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-16 09:24:38 +0000
commit22b9dc1af11c3fdfdf9218fb48e15aedf9a342b3 (patch)
treecd68f2098fa3277138eec015ffb74756f1df1a01 /queue.c
parentf2c27aa1e032c2b9e6339c01904334f33b4ac920 (diff)
downloadrsyslog-22b9dc1af11c3fdfdf9218fb48e15aedf9a342b3.tar.gz
rsyslog-22b9dc1af11c3fdfdf9218fb48e15aedf9a342b3.tar.xz
rsyslog-22b9dc1af11c3fdfdf9218fb48e15aedf9a342b3.zip
queue is now able to restore persisted state on startup (but still some
fine tuning to be done)
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c216
1 files changed, 174 insertions, 42 deletions
diff --git a/queue.c b/queue.c
index 02346075..88e41168 100644
--- a/queue.c
+++ b/queue.c
@@ -52,10 +52,33 @@ DEFobjStaticHelpers
/* forward-definitions */
rsRetVal queueChkPersist(queue_t *pThis);
static void *queueWorker(void *arg);
+static rsRetVal queueGetQueueSize(queue_t *pThis, int *piQueueSize);
/* methods */
+/* send a command to a specific active thread. If the thread is not
+ * active, the command is not sent.
+ */
+static inline rsRetVal
+queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads);
+
+ if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRDCMD_RUN) {
+ dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx);
+ pThis->pWrkThrds[iIdx].tCurrCmd = tCmd;
+ } else {
+ dbgprintf("Queue 0x%lx: command %d NOT sent to inactive thread %d\n", queueGetID(pThis), tCmd, iIdx);
+ }
+
+ return iRet;
+}
+
/* send a command to a specific thread
+ * TODO: check if we can run into trouble with inactive threads
*/
static inline rsRetVal
queueTellWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
@@ -114,10 +137,34 @@ queueStrtWrkThrd(queue_t *pThis, int i)
}
+/* send a command to all active worker threads. A start index can be
+ * given. Usually, this is 0 or 1. Thread 0 is reserved to disk-assisted
+ * mode and this start index take care of the special handling it needs to
+ * receive. -- rgerhards, 2008-01-16
+ */
+static inline rsRetVal
+queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd)
+{
+ DEFiRet;
+ int i;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ assert(iStartIdx == 0 || iStartIdx == 1);
+
+ /* tell the workers our request */
+ for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i)
+ if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRDCMD_TERMINATED)
+ queueTellActWrkThrd(pThis, i, tCmd);
+
+ return iRet;
+}
+
+
/* send a command to all worker threads. A start index can be
* given. Usually, this is 0 or 1. Thread 0 is reserved to disk-assisted
* mode and this start index take care of the special handling it needs to
* receive.
+ * TODO: check if we run into trouble with inactive worker threads
*/
static inline rsRetVal
queueTellWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd)
@@ -173,6 +220,27 @@ queueTimeoutComp(struct timespec *pt, int iTimeout)
}
+/* wake up all worker threads. Param bWithDAWrk tells if the DA worker
+ * is to be awaken, too. It needs special handling because it waits on
+ * two different conditions depending on processing state.
+ * rgerhards, 2008-01-16
+ */
+static inline rsRetVal
+queueWakeupWrkThrds(queue_t *pThis, int bWithDAWrk)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+ pthread_cond_broadcast(pThis->notEmpty);
+ if(bWithDAWrk && pThis->qRunsDA != QRUNS_REGULAR) {
+ /* if running disk-assisted, workers may wait on that condition, too */
+ pthread_cond_broadcast(&pThis->condDA);
+ }
+
+ return iRet;
+}
+
/* --------------- code for disk-assisted (DA) queue modes -------------------- */
@@ -201,10 +269,9 @@ queueTurnOffDAMode(queue_t *pThis)
queueStrtAllWrkThrds(pThis); /* restore our regular worker threads */
pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */
- /* note: a disk queue alsways has a single worker and it alwas has the ID 1 */
- queueTellWrkThrd(pThis->pqDA, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* tell the DA worker to terminate... */
- pthread_mutex_unlock(&pThis->mutDA); /* ... permit it to run ... */
- queueJoinWrkThrd(pThis->pqDA, 1); /* ... and wait for the shutdown to happen */
+ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
+ * this will be quick.
+ */
queueDestruct(pThis->pqDA); /* and now we are ready to destruct the DA queue */
pThis->pqDA = NULL;
@@ -288,43 +355,48 @@ queueChkIsDA(queue_t *pThis)
* rgerhards, 2008-01-14
*/
static inline rsRetVal
-queueDAConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr)
+queueDAConsumer(queue_t *pThis, int iMyThrdIndx, int iQueueSize, void *pUsr)
{
DEFiRet;
int iCancelStateSave;
+ int iSizeDAQueue;
ISOBJ_TYPE_assert(pThis, queue);
ISOBJ_assert(pUsr);
assert(pThis->qRunsDA != QRUNS_REGULAR);
-dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, pThis->iQueueSize);
+dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, iQueueSize);
CHKiRet(queueEnqObj(pThis->pqDA, pUsr));
- /* we check if we reached the low water mark, but only if we are not in shutdown mode */
- if(pThis->iQueueSize == pThis->iLowWtrMrk && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
+ /* We check if we reached the low water mark (but only if we are not in shutdown mode)
+ * Note that the child queue now in almost all cases is non-empty, because we just enqueued
+ * a message.
+ */
+ if(iQueueSize <= pThis->iLowWtrMrk && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n",
- queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
+ queueGetID(pThis), iMyThrdIndx, iQueueSize);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
dbgprintf("pre mutex lock (think about CLEANUP!)\n");
pthread_mutex_lock(&pThis->mutDA);
dbgprintf("mutex locked (think about CLEANUP!)\n");
+ /* wait for either passing the high water mark or the child disk queue drain */
pthread_cond_wait(&pThis->condDA, &pThis->mutDA);
dbgprintf("condition returned\n");
- /* we are back. either we have passed the high water mark or the child disk queue
- * is empty. We check for the later. If that is the case, we switch back to
- * non-DA mode
- */
- if(pThis->pqDA->iQueueSize == 0) {
- dbgprintf("Queue 0x%lx/w%d: %d entries - disk assisted child queue signaled it is empty\n",
- queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
- CHKiRet(queueTurnOffDAMode(pThis)); /* this also unlocks the mutex! */
- } else {
- pthread_mutex_unlock(&pThis->mutDA);
- }
+ pthread_mutex_unlock(&pThis->mutDA);
dbgprintf("mutex unlocked (think about CLEANUP!)\n");
pthread_setcancelstate(iCancelStateSave, NULL);
}
+ /* now check if the DA queue is empty. If so, we can turn off DA mode. Note that we must
+ * use queueGetQueueSize() in order to avoid a race on child iQueueSize. -- rgerhards, 2008-01-16
+ */
+ CHKiRet(queueGetQueueSize(pThis->pqDA, &iSizeDAQueue));
+
+dbgprintf("Queue %p/w%d: DA queue size now %d\n", pThis, iMyThrdIndx, iSizeDAQueue);
+ if(iSizeDAQueue == 0) {
+ CHKiRet(queueTurnOffDAMode(pThis)); /* this also unlocks the mutex! */
+ }
+
finalize_it:
dbgprintf("DAConsumer returns with iRet %d\n", iRet);
return iRet;
@@ -358,7 +430,8 @@ queueStrtDA(queue_t *pThis)
*/
pThis->pqDA->condSignalOnEmpty = &pThis->condDA;
pThis->pqDA->mutSignalOnEmpty = &pThis->mutDA;
- pThis->pqDA->bSignalOnEmpty = 1;
+ pThis->pqDA->condSignalOnEmpty2 = pThis->notEmpty;
+ pThis->pqDA->bSignalOnEmpty = 2;
CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
@@ -397,7 +470,7 @@ queueStrtDA(queue_t *pThis)
* on one. So even if the scheduler plays badly with us, things should be
* quite well. -- rgerhards, 2008-01-15
*/
- pthread_cond_broadcast(pThis->notEmpty);
+ queueWakeupWrkThrds(pThis, 0); /* awake all workers, but not ourselves ;) */
pThis->qRunsDA = QRUNS_DA; /* we are now in DA mode! */
@@ -934,6 +1007,28 @@ queueDel(queue_t *pThis, void *pUsr)
}
+/* Send a shutdown command to all workers and awake them. This function
+ * does NOT wait for them to terminate. Set bIncludeDAWRk to send the
+ * termination command to the DA worker, too (else this does not happen).
+ * rgerhards, 2008-01-16
+ */
+static inline rsRetVal
+queueWrkThrdReqTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int bIncludeDAWrk)
+{
+ DEFiRet;
+
+ if(bIncludeDAWrk) {
+ queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */
+ queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
+ } else {
+ queueTellActWrkThrds(pThis, 1, tShutdownCmd);/* first tell the workers our request */
+ queueWakeupWrkThrds(pThis, 0); /* awake all workers but not DA-worker */
+ }
+
+ return iRet;
+}
+
+
/* Send a shutdown command to all workers and see if they terminate.
* A timeout may be specified.
* rgerhards, 2008-01-14
@@ -945,16 +1040,9 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
int bTimedOut;
struct timespec t;
- /* first tell the workers our request */
- queueTellWrkThrds(pThis, 0, tShutdownCmd);
-
- /* awake them... */
- pthread_cond_broadcast(pThis->notEmpty);
- if(pThis->qRunsDA != QRUNS_REGULAR) /* if running disk-assisted, workers may wait on that condition, too */
- pthread_cond_broadcast(&pThis->condDA);
-
- /* get timeout */
- queueTimeoutComp(&t, iTimeout);
+ queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */
+ queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
+ queueTimeoutComp(&t, iTimeout);/* get timeout */
/* and wait for their termination */
pthread_mutex_lock(pThis->mut);
@@ -989,9 +1077,7 @@ queueWrkThrdCancel(queue_t *pThis)
// worker cancellation! -- rgerhards, 2008-01-14
/* awake the workers one more time, just to be sure */
- pthread_cond_broadcast(pThis->notEmpty);
- if(pThis->qRunsDA != QRUNS_REGULAR) /* if running disk-assisted, workers may wait on that condition, too */
- pthread_cond_broadcast(&pThis->condDA);
+ queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
/* first tell the workers our request */
for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i)
@@ -1079,6 +1165,7 @@ queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave)
DEFiRet;
rsRetVal iRetLocal;
int iSeverity;
+ int iQueueSize;
void *pUsr;
int qRunsDA;
@@ -1094,6 +1181,7 @@ queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave)
iRet = queueDel(pThis, &pUsr);
queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
qRunsDA = pThis->qRunsDA; /* do a local copy so that we prevent a race after mutex release */
+ iQueueSize = pThis->iQueueSize; /* ... and the same for this property */
pthread_mutex_unlock(pThis->mut);
pthread_cond_signal(pThis->notFull);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -1107,25 +1195,26 @@ queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave)
FINALIZE;
if(qRunsDA == QRUNS_DA) {
- queueDAConsumer(pThis, iMyThrdIndx, pUsr);
+ queueDAConsumer(pThis, iMyThrdIndx, iQueueSize, pUsr);
} else {
/* we are running in normal, non-disk-assisted mode */
/* do a quick check if we need to drain the queue */
- if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) {
+ if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) {
iRetLocal = objGetSeverity(pUsr, &iSeverity);
if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
dbgprintf("Queue 0x%lx/w%d: dequeue/queue nearly full (%d entries), "
"discarded severity %d message\n",
- queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, iSeverity);
+ queueGetID(pThis), iMyThrdIndx, iQueueSize, iSeverity);
objDestruct(pUsr);
}
} else {
dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n",
queueGetID(pThis), iMyThrdIndx);
iRetLocal = pThis->pConsumer(pUsr);
- if(iRetLocal != RS_RET_OK)
+ if(iRetLocal != RS_RET_OK) {
dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n",
queueGetID(pThis), iMyThrdIndx, iRetLocal);
+ }
}
}
@@ -1193,7 +1282,7 @@ queueWorker(void *arg)
while(pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
queueGetID(pThis), iMyThrdIndx);
- if(pThis->bSignalOnEmpty) {
+ if(pThis->bSignalOnEmpty > 0) {
/* we need to signal our parent queue that we are empty */
dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx);
pthread_mutex_lock(pThis->mutSignalOnEmpty);
@@ -1204,7 +1293,12 @@ dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx);
* run. This is important because the parent may have changed our
* state. So we simply go back to the begin of the loop.
*/
- continue;
+ //continue;
+ }
+ if(pThis->bSignalOnEmpty > 1) {
+ /* no mutex associated with this condition, it's just a try (but needed
+ * to wakeup a parent worker if e.g. the queue was restarted from disk) */
+ pthread_cond_signal(pThis->condSignalOnEmpty2);
}
/* If we arrive here, we have the regular case, where we can safely assume that
* iQueueSize and tCmd have not changed since the while().
@@ -1517,13 +1611,24 @@ rsRetVal queueDestruct(queue_t *pThis)
assert(pThis != NULL);
- /* first, terminate worker threads */
+ /* if running DA, tell the DA workers to shut down. This saves us some CPU cycles which
+ * we can use to persist the remaining in-memory data to disk quicker. -- rgerhads, 2008-01-16
+ */
+ if(pThis->qRunsDA != QRUNS_REGULAR)
+ queueWrkThrdReqTrm(pThis->pqDA, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 0);
+
+ /* then, terminate our own worker threads */
if(pThis->pWrkThrds != NULL) {
queueShutdownWorkers(pThis);
free(pThis->pWrkThrds);
pThis->pWrkThrds = NULL;
}
+ /* of we have now data left in in-memory queues, this data will be lost if we do not
+ * persist it to a disk queue.
+ * TODO: implement code rgerhards, 2008-01-16
+ */
+
/* if running DA, terminate disk queue */
if(pThis->qRunsDA != QRUNS_REGULAR)
queueDestruct(pThis->pqDA);
@@ -1689,6 +1794,33 @@ DEFpropSetMeth(queue, iDiscardSeverity, int);
DEFpropSetMeth(queue, bIsDA, int);
+/* get the size of this queue. The important thing about this get method is that it
+ * is synchronized via the queue mutex. So it provides the information back without
+ * any chance of race. Obviously, this causes quite some overhead, so this
+ * function should only be called in situations where a race needs to be avoided.
+ * rgerhards, 2008-01-16
+ */
+static rsRetVal
+queueGetQueueSize(queue_t *pThis, int *piQueueSize)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ assert(piQueueSize != NULL);
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ pthread_mutex_lock(pThis->mut);
+
+ *piQueueSize = pThis->iQueueSize; /* tell the world there is one more worker */
+
+ pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+
+ return iRet;
+}
+
+
/* This function can be used as a generic way to set properties. Only the subset
* of properties required to read persisted property bags is supported. This
* functions shall only be called by the property bag reader, thus it is static.