summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-15 16:17:51 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-15 16:17:51 +0000
commitbb7c2ef720618e4c7707013f732ef14ba751908c (patch)
treec7963cf65207010ceac19b7d1b335c53978ac84a /queue.c
parent8f5c0764aaafc9eab72d20761ecba6a27d321f89 (diff)
downloadrsyslog-bb7c2ef720618e4c7707013f732ef14ba751908c.tar.gz
rsyslog-bb7c2ef720618e4c7707013f732ef14ba751908c.tar.xz
rsyslog-bb7c2ef720618e4c7707013f732ef14ba751908c.zip
changed startup of disk assisted mode to allow for higher concurrency, most
importantly allow the input to continue enqueue msgs while the disk queue is initialized. This may help somewhat with UDP and other lossy sources
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c276
1 files changed, 155 insertions, 121 deletions
diff --git a/queue.c b/queue.c
index 9bc39464..a33c677a 100644
--- a/queue.c
+++ b/queue.c
@@ -63,6 +63,7 @@ queueTellWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
{
DEFiRet;
+ dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx);
ISOBJ_TYPE_assert(pThis, queue);
assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads);
@@ -87,7 +88,7 @@ queueJoinWrkThrd(queue_t *pThis, int iIdx)
pThis->pWrkThrds[iIdx].tCurrCmd);
pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL);
pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRDCMD_NEVER_RAN; /* back to virgin... */
- dbgprintf("Queue 0x%lx: thread %d state %d, exited\n", queueGetID(pThis), iIdx,
+ dbgprintf("Queue 0x%lx: thread %d state %d, has exited\n", queueGetID(pThis), iIdx,
pThis->pWrkThrds[iIdx].tCurrCmd);
return iRet;
@@ -191,13 +192,15 @@ queueTurnOffDAMode(queue_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, queue);
- assert(pThis->bRunsDA == 1);
+ assert(pThis->qRunsDA != QRUNS_REGULAR);
- /* pull any data that we still need from the (child) disk queue... */
- pThis->pConsumer = pThis->pqDA->pConsumer; /* restore regular consumer */
+ /* if we need to pull any data that we still need from the (child) disk queue,
+ * now would be the time to do so. At present, we do not need this, but I'd like to
+ * keep that comment if future need arises.
+ */
queueStrtAllWrkThrds(pThis); /* restore our regular worker threads */
- pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
+ pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */
/* note: a disk queue alsways has a single worker and it alwas has the ID 1 */
queueTellWrkThrd(pThis->pqDA, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* tell the DA worker to terminate... */
@@ -205,8 +208,12 @@ queueTurnOffDAMode(queue_t *pThis)
queueJoinWrkThrd(pThis->pqDA, 1); /* ... and wait for the shutdown to happen */
queueDestruct(pThis->pqDA); /* and now we are ready to destruct the DA queue */
pThis->pqDA = NULL;
- queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */
+ /* now free the remaining resources */
+ pthread_mutex_destroy(&pThis->mutDA);
+ pthread_cond_destroy(&pThis->condDA);
+
+ queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */
dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
queueGetID(pThis), iRet);
@@ -233,9 +240,6 @@ queueChkWrkThrdChanges(queue_t *pThis)
case eWRKTHRDCMD_TERMINATED:
queueJoinWrkThrd(pThis, i);
break;
- case eWRKTHRDCMD_TURN_OFF_DA_MODE:
- queueTurnOffDAMode(pThis);
- break;
/* these cases just to satisfy the compiler, we do not act an them: */
case eWRKTHRDCMD_NEVER_RAN:
case eWRKTHRDCMD_RUN:
@@ -292,14 +296,14 @@ queueDAConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr)
ISOBJ_TYPE_assert(pThis, queue);
ISOBJ_assert(pUsr);
- assert(pThis->bRunsDA);
+ assert(pThis->qRunsDA != QRUNS_REGULAR);
dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, pThis->iQueueSize);
CHKiRet(queueEnqObj(pThis->pqDA, pUsr));
if(pThis->iQueueSize == pThis->iLowWtrMrk) {
- dbgprintf("Queue 0x%lx: %d entries - passed low water mark in DA mode, sleeping\n",
- queueGetID(pThis), pThis->iQueueSize);
+ dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n",
+ queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
dbgprintf("pre mutex lock (think about CLEANUP!)\n");
pthread_mutex_lock(&pThis->mutDA);
@@ -311,8 +315,8 @@ dbgprintf("condition returned\n");
* non-DA mode
*/
if(pThis->pqDA->iQueueSize == 0) {
- dbgprintf("Queue 0x%lx: %d entries - disk assisted child queue signaled it is empty\n",
- queueGetID(pThis), pThis->iQueueSize);
+ dbgprintf("Queue 0x%lx/w%d: %d entries - disk assisted child queue signaled it is empty\n",
+ queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
CHKiRet(queueTurnOffDAMode(pThis)); /* this also unlocks the mutex! */
} else {
pthread_mutex_unlock(&pThis->mutDA);
@@ -320,58 +324,29 @@ dbgprintf("condition returned\n");
dbgprintf("mutex unlocked (think about CLEANUP!)\n");
pthread_setcancelstate(iCancelStateSave, NULL);
}
-dbgprintf("DAConsumer returns\n");
finalize_it:
+dbgprintf("DAConsumer returns with iRet %d\n", iRet);
return iRet;
}
-/* check if we need to start disk assisted mode
- * rgerhards, 2008-01-14
+/* Start disk-assisted queue mode. All internal settings are changed. This is supposed
+ * to be called from the DA worker, which must have been started before. The most important
+ * chore of this function is to create the DA queue object. If that function fails,
+ * the DA worker should return with an appropriate state, which in turn should lead to
+ * a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this
+ * function is called, else a race on pThis->qRunsDA may happen.
+ * rgerhards, 2008-01-15
*/
static rsRetVal
-queueChkStrtDA(queue_t *pThis)
+queueStrtDA(queue_t *pThis)
{
DEFiRet;
- int iCancelStateSave;
ISOBJ_TYPE_assert(pThis, queue);
- if(pThis->bRunsDA) {
- if(pThis->iQueueSize < pThis->iHighWtrMrk)
- pThis->bWasBelowHighWtr = 1;
- else if(pThis->iQueueSize == pThis->iHighWtrMrk && pThis->bWasBelowHighWtr) {
- /* then we need to signal that we are at the high water mark again.*/
- dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n",
- queueGetID(pThis), pThis->iQueueSize);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- pthread_mutex_lock(&pThis->mutDA);
- pthread_cond_signal(&pThis->condDA);
- pthread_mutex_unlock(&pThis->mutDA);
- pthread_setcancelstate(iCancelStateSave, NULL);
- queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */
- }
- /* we need to re-check if we run disk-assisted, because that status may have changed
- * in our high water mark processing.
- */
- if(pThis->bRunsDA)
- FINALIZE;
- }
-
-
- /* we run into this part if we are NOT currently running DA.
- * TODO: split this function, I think that would make the code easier
- * to read. -- rgerhards, 2008-10-15
- */
- /* if we do not hit the high water mark, we have nothing to do */
- if(pThis->iQueueSize != pThis->iHighWtrMrk)
- ABORT_FINALIZE(RS_RET_OK);
-
- dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n",
- queueGetID(pThis), pThis->iQueueSize);
-
- /* set up sync objects for low water mark algo */
+ /* set up sync objects */
pthread_mutex_init(&pThis->mutDA, NULL);
pthread_cond_init(&pThis->condDA, NULL);
@@ -399,18 +374,14 @@ queueChkStrtDA(queue_t *pThis)
if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
FINALIZE; /* something is wrong */
- /* if we reach this point, we have a working disk queue
- * so we now need to change our consumer to utilize it.
- */
- pThis->bRunsDA = 1; /* and that's all we need to do - the worker handles the rest ;) */
- pThis->bWasBelowHighWtr = 0;/* init to be sure */
-
- /* now we must start our DA worker thread and shutdown all others */
- CHKiRet(queueStrtWrkThrd(pThis, 0));
+ /* tell our fellow workers to shut down */
CHKiRet(queueTellWrkThrds(pThis, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE));
+ pThis->qRunsDA = QRUNS_DA; /* we are now in DA mode! */
+
dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n",
queueGetID(pThis), queueGetID(pThis->pqDA));
+
finalize_it:
if(iRet != RS_RET_OK) {
if(pThis->pqDA != NULL) {
@@ -426,6 +397,60 @@ finalize_it:
}
+/* check if we need to start disk assisted mode and send some signals to
+ * keep it running if we are already in it.
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueChkStrtDA(queue_t *pThis)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+ /* if we do not hit the high water mark, we have nothing to do */
+ if(pThis->iQueueSize != pThis->iHighWtrMrk)
+ ABORT_FINALIZE(RS_RET_OK);
+
+ if(pThis->qRunsDA != QRUNS_REGULAR) {
+ /* then we need to signal that we are at the high water mark again. If that happens
+ * on our way down the queue, that doesn't matter, because then nobody is waiting
+ * on the condition variable.
+ */
+ dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n",
+ queueGetID(pThis), pThis->iQueueSize);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ pthread_mutex_lock(&pThis->mutDA);
+ pthread_cond_signal(&pThis->condDA);
+ pthread_mutex_unlock(&pThis->mutDA);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */
+
+ /* we need to re-check if we run disk-assisted, because that status may have changed
+ * in our high water mark processing.
+ */
+ if(pThis->qRunsDA != QRUNS_REGULAR)
+ FINALIZE;
+ }
+
+ /* if we reach this point, we are NOT currently running in DA mode.
+ * TODO: split this function, I think that would make the code easier
+ * to read. -- rgerhards, 2008-10-15
+ */
+ dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n",
+ queueGetID(pThis), pThis->iQueueSize);
+
+ pThis->qRunsDA = QRUNS_DA_INIT; /* indicate we now run in DA mode - this is reset by the DA worker if it fails */
+
+ /* now we must start our DA worker thread - it does the rest of the initialization */
+ CHKiRet(queueStrtWrkThrd(pThis, 0));
+
+finalize_it:
+ return iRet;
+}
+
+
/* --------------- end code for disk-assisted queue modes -------------------- */
@@ -865,7 +890,7 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
/* awake them... */
pthread_cond_broadcast(pThis->notEmpty);
dbgprintf("queueWrkThrdTrm broadcasted notEmpty\n");
- if(pThis->bRunsDA) /* if running disk-assisted, workers may wait on that condition, too */
+ if(pThis->qRunsDA != QRUNS_REGULAR) /* if running disk-assisted, workers may wait on that condition, too */
pthread_cond_broadcast(&pThis->condDA);
/* get timeout */
@@ -905,7 +930,7 @@ queueWrkThrdCancel(queue_t *pThis)
/* awake the workers one more time, just to be sure */
pthread_cond_broadcast(pThis->notEmpty);
- if(pThis->bRunsDA) /* if running disk-assisted, workers may wait on that condition, too */
+ if(pThis->qRunsDA != QRUNS_REGULAR) /* if running disk-assisted, workers may wait on that condition, too */
pthread_cond_broadcast(&pThis->condDA);
/* first tell the workers our request */
@@ -983,18 +1008,45 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* This is a helper for queueWorker() it either calls the configured
- * consumer or the DA-consumer (if in disk-assisted mode). It is NOT
- * protected by the queue mutex.
+ * consumer or the DA-consumer (if in disk-assisted mode). It is
+ * protected by the queue mutex, but MUST release it as soon as possible.
+ * Most importantly, it must release it before the consumer is called.
* rgerhards, 2008-01-14
*/
static inline rsRetVal
-queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr)
+queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave)
{
DEFiRet;
rsRetVal iRetLocal;
int iSeverity;
+ void *pUsr;
+ int qRunsDA;
+
+
+ /* first check if we have still something to process */
+ if(pThis->iQueueSize == 0) {
+ pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ FINALIZE;
+ }
+
+ /* dequeue element (still protected from mutex) */
+ iRet = queueDel(pThis, &pUsr);
+ queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
+ qRunsDA = pThis->qRunsDA; /* do a local copy so that we prevent a race after mutex release */
+ pthread_mutex_unlock(pThis->mut);
+ pthread_cond_signal(pThis->notFull);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ /* do actual processing (the lengthy part, runs in parallel)
+ * If we had a problem while dequeing, we do not call the consumer,
+ * but we otherwise ignore it. This is in the hopes that it will be
+ * self-healing. However, this is really not a good thing.
+ * rgerhards, 2008-01-03
+ */
+ if(iRet != RS_RET_OK)
+ FINALIZE;
- if(pThis->bRunsDA) {
+ if(qRunsDA == QRUNS_DA) {
queueDAConsumer(pThis, iMyThrdIndx, pUsr);
} else {
/* we are running in normal, non-disk-assisted mode */
@@ -1017,7 +1069,12 @@ queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr)
}
}
-dbgprintf("CallConsumer returns\n");
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things "
+ "may happen\n", queueGetID(pThis), iMyThrdIndx, iRet);
+ }
+dbgprintf("CallConsumer returns %d\n", iRet);
return iRet;
}
@@ -1029,18 +1086,12 @@ dbgprintf("CallConsumer returns\n");
static void *
queueWorker(void *arg)
{
- DEFiRet;
queue_t *pThis = (queue_t*) arg;
- void *pUsr;
sigset_t sigSet;
int iMyThrdIndx; /* index for this thread in queue thread table */
int iCancelStateSave;
- int bInitialEmpty = 1; /* if running as a DA child, we do NOT need to signal the parent
- * on the first occasion we are empty (because that happens on every
- * startup. This var keeps track of the state.
- */
- assert(pThis != NULL);
+ ISOBJ_TYPE_assert(pThis, queue);
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
@@ -1053,12 +1104,22 @@ queueWorker(void *arg)
dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx);
- /* tell the world there is one more worker */
+ /* do some one-time initialization */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
- pThis->iCurNumWrkThrd++;
+
+ pThis->iCurNumWrkThrd++; /* tell the world there is one more worker */
+
+ if(iMyThrdIndx == 0) { /* are we the DA worker? */
+ if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */
+ /* if we could not init the DA queue, we have nothing to do, so shut down. */
+ queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);
+ }
+ }
+
pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
+ /* end one-time stuff */
/* now we have our identity, on to real processing */
while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN
@@ -1069,22 +1130,17 @@ queueWorker(void *arg)
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
queueGetID(pThis), iMyThrdIndx);
if(pThis->bSignalOnEmpty) {
- if(bInitialEmpty == 1) {
- /* ignore */
- bInitialEmpty = 0;
- } else {
- /* we need to signal our parent queue that we are empty */
+ /* we need to signal our parent queue that we are empty */
dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx);
- pthread_mutex_lock(pThis->mutSignalOnEmpty);
- pthread_cond_signal(pThis->condSignalOnEmpty);
- pthread_mutex_unlock(pThis->mutSignalOnEmpty);
+ pthread_mutex_lock(pThis->mutSignalOnEmpty);
+ pthread_cond_signal(pThis->condSignalOnEmpty);
+ pthread_mutex_unlock(pThis->mutSignalOnEmpty);
dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx);
- /* we now need to re-check if we still shall continue to
- * run. This is important because the parent may have changed our
- * state. So we simply go back to the begin of the loop.
- */
- continue;
- }
+ /* we now need to re-check if we still shall continue to
+ * run. This is important because the parent may have changed our
+ * state. So we simply go back to the begin of the loop.
+ */
+ continue;
}
/* If we arrive here, we have the regular case, where we can safely assume that
* iQueueSize and tCmd have not changed since the while().
@@ -1093,35 +1149,13 @@ dbgprintf("Queue %p/w%d: pre condwait ->notEmpty\n", pThis, iMyThrdIndx);
pthread_cond_wait(pThis->notEmpty, pThis->mut);
dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx);
}
- if(pThis->iQueueSize > 0) {
- /* dequeue element (still protected from mutex) */
- iRet = queueDel(pThis, &pUsr);
- queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
- pthread_mutex_unlock(pThis->mut);
- pthread_cond_signal(pThis->notFull);
- pthread_setcancelstate(iCancelStateSave, NULL);
- /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
- * a cancellation point in itself. As we run most of the time without cancel enabled, I fear
- * we may never get cancelled if we do not create a cancellation point ourselfs.
- */
- pthread_testcancel();
- /* do actual processing (the lengthy part, runs in parallel)
- * If we had a problem while dequeing, we do not call the consumer,
- * but we otherwise ignore it. This is in the hopes that it will be
- * self-healing. However, this is really not a good thing.
- * rgerhards, 2008-01-03
- */
- if(iRet == RS_RET_OK) {
- queueWorkerCallConsumer(pThis, iMyThrdIndx, pUsr);
- } else {
- dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things "
- "may happen\n", queueGetID(pThis), iMyThrdIndx, iRet);
- }
- } else { /* the mutex must be unlocked in any case (important for termination) */
- pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
- }
+ queueWorkerCallConsumer(pThis, iMyThrdIndx, iCancelStateSave);
+ /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
+ * a cancellation point in itself. As we run most of the time without cancel enabled, I fear
+ * we may never get cancelled if we do not create a cancellation point ourselfs.
+ */
+ pthread_testcancel();
/* We now yield to give the other threads a chance to obtain the mutex. If we do not
* do that, this thread may very well aquire the mutex again before another thread
* has even a chance to run. The reason is that mutex operations are free to be
@@ -1404,7 +1438,7 @@ rsRetVal queueDestruct(queue_t *pThis)
}
/* if running DA, terminate disk queue */
- if(pThis->bRunsDA)
+ if(pThis->qRunsDA != QRUNS_REGULAR)
queueDestruct(pThis->pqDA);
/* persist the queue (we always do that - queuePersits() does cleanup it the queue is empty) */
@@ -1495,7 +1529,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
int iSeverity = 8;
rsRetVal iRetLocal;
- assert(pThis != NULL);
+ ISOBJ_TYPE_assert(pThis, queue);
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE