summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-16 16:40:11 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-16 16:40:11 +0000
commit75a8f92d5001f555606b2ddb5de30acf689e2422 (patch)
tree7db2ad786fa5589cb279c0e465b029434d0a0946 /queue.c
parent19c9b187ab29f9304adb82d9c6005c69c92b3c17 (diff)
downloadrsyslog-75a8f92d5001f555606b2ddb5de30acf689e2422.tar.gz
rsyslog-75a8f92d5001f555606b2ddb5de30acf689e2422.tar.xz
rsyslog-75a8f92d5001f555606b2ddb5de30acf689e2422.zip
implemented dynamic startup and shutdown of worker threads based on current
activity
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c324
1 files changed, 207 insertions, 117 deletions
diff --git a/queue.c b/queue.c
index 00f6e63f..acfdefb3 100644
--- a/queue.c
+++ b/queue.c
@@ -1,3 +1,4 @@
+// TODO: "preforked" worker threads
// TODO: do an if(debug) in dbgrintf - performanc ein release build!
// TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in
// call consumer state. Facilitates retaining messages in queue until action could
@@ -53,6 +54,7 @@ DEFobjStaticHelpers
rsRetVal queueChkPersist(queue_t *pThis);
static void *queueWorker(void *arg);
static rsRetVal queueGetQueueSize(queue_t *pThis, int *piQueueSize);
+static rsRetVal queueChkWrkThrdChanges(queue_t *pThis);
/* methods */
@@ -67,7 +69,7 @@ queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
ISOBJ_TYPE_assert(pThis, queue);
assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads);
- if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRDCMD_RUN) {
+ if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_INIT) {
dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx);
pThis->pWrkThrds[iIdx].tCurrCmd = tCmd;
} else {
@@ -104,12 +106,12 @@ queueJoinWrkThrd(queue_t *pThis, int iIdx)
ISOBJ_TYPE_assert(pThis, queue);
assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads);
- assert(pThis->pWrkThrds[iIdx].tCurrCmd != eWRKTHRDCMD_NEVER_RAN);
+ assert(pThis->pWrkThrds[iIdx].tCurrCmd != eWRKTHRD_STOPPED);
dbgprintf("Queue 0x%lx: thread %d state %d, waiting for exit\n", queueGetID(pThis), iIdx,
pThis->pWrkThrds[iIdx].tCurrCmd);
pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL);
- pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRDCMD_NEVER_RAN; /* back to virgin... */
+ pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRD_STOPPED; /* back to virgin... */
dbgprintf("Queue 0x%lx: thread %d state %d, has exited\n", queueGetID(pThis), iIdx,
pThis->pWrkThrds[iIdx].tCurrCmd);
@@ -127,9 +129,9 @@ queueStrtWrkThrd(queue_t *pThis, int i)
ISOBJ_TYPE_assert(pThis, queue);
assert(i >= 0 && i <= pThis->iNumWorkerThreads);
- assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRDCMD_RUN);
+ assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRD_RUN_INIT);
- queueTellWrkThrd(pThis, i, eWRKTHRDCMD_RUN);
+ queueTellWrkThrd(pThis, i, eWRKTHRD_RUN_INIT);
iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n",
(unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState);
@@ -138,37 +140,59 @@ queueStrtWrkThrd(queue_t *pThis, int i)
}
-/* send a command to all active worker threads. A start index can be
- * given. Usually, this is 0 or 1. Thread 0 is reserved to disk-assisted
- * mode and this start index take care of the special handling it needs to
- * receive. -- rgerhards, 2008-01-16
+/* Starts a *new* worker thread. Function searches itself for a free index spot. It must only
+ * be called when we have less than max workers active. Pending wrkr thread requests MUST have
+ * been processed before calling this function. -- rgerhards, 2008-01-16
*/
static inline rsRetVal
-queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd)
+queueStrtNewWrkThrd(queue_t *pThis)
{
DEFiRet;
int i;
+ int iStartingUp;
+ int iState;
ISOBJ_TYPE_assert(pThis, queue);
- assert(iStartIdx == 0 || iStartIdx == 1);
- /* tell the workers our request */
- for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i)
- if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRDCMD_TERMINATED)
- queueTellActWrkThrd(pThis, i, tCmd);
+ /* find free spot in thread table. If we find at least one worker that is in initializiation,
+ * we do NOT start a new one. Let's give the other one a chance, first.
+ */
+ iStartingUp = -1;
+ for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i)
+ if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_STOPPED) {
+ break;
+ } else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_INIT) {
+ iStartingUp = i;
+ break;
+ }
+dbgprintf("after thrd search: i %d, iStartingUp %d\n", i, iStartingUp);
+ if(iStartingUp > -1)
+ ABORT_FINALIZE(RS_RET_ALREADY_STARTING);
+
+ assert(i <= pThis->iNumWorkerThreads); /* now there must be a free spot, else something is really wrong! */
+
+ queueTellWrkThrd(pThis, i, eWRKTHRD_RUN_INIT);
+ iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
+ dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n",
+ (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState);
+ /* we try to give the starting worker a little boost. It won't help much as we still
+ * hold the queue's mutex, but at least it has a chance to start on a single-CPU system.
+ */
+ pthread_yield();
+
+finalize_it:
return iRet;
}
-/* send a command to all worker threads. A start index can be
+/* send a command to all active worker threads. A start index can be
* given. Usually, this is 0 or 1. Thread 0 is reserved to disk-assisted
* mode and this start index take care of the special handling it needs to
- * receive.
- * TODO: check if we run into trouble with inactive worker threads
+ * receive. -- rgerhards, 2008-01-16
*/
static inline rsRetVal
-queueTellWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd)
+queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd)
{
DEFiRet;
int i;
@@ -178,25 +202,27 @@ queueTellWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd)
/* tell the workers our request */
for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i)
- if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRDCMD_TERMINATED)
- queueTellWrkThrd(pThis, i, tCmd);
+ if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATED)
+ queueTellActWrkThrd(pThis, i, tCmd);
return iRet;
}
-/* start all regular worker threads
- * rgerhards, 2008-01-15
+
+/* This once was used to start all regular worker threads. Now, we have
+ * dynamic grow of the worker thread pool, based on needs. This function is
+ * still preserved, but it now does not start all but only worker 1, which
+ * is always present.
+ * rgerhards, 2008-01-16
*/
static inline rsRetVal
queueStrtAllWrkThrds(queue_t *pThis)
{
DEFiRet;
- int i;
- /* fire up the worker threads */
- for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) {
- queueStrtWrkThrd(pThis, i);
- }
+ ISOBJ_TYPE_assert(pThis, queue);
+ assert(pThis->pWrkThrds[1].tCurrCmd < eWRKTHRD_RUN_INIT);
+ //iRet = queueStrtWrkThrd(pThis, 1);
return iRet;
}
@@ -243,6 +269,41 @@ queueWakeupWrkThrds(queue_t *pThis, int bWithDAWrk)
}
+/* This function Checks if (another) worker threads needs to be started. It
+ * must be called while the caller holds a lock on the queue mutex. So it must not
+ * do anything that either reaquires the mutex or forces somebody else to aquire
+ * it (that would lead to a deadlock).
+ * rgerhards, 2008-01-16
+ */
+static inline rsRetVal
+queueChkAndStrtWrk(queue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+ /* process any pending thread requests */
+ queueChkWrkThrdChanges(pThis);
+
+ /* check if we need to start up another worker (only in regular mode) */
+ if(pThis->qRunsDA == QRUNS_REGULAR) {
+ if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) {
+dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d\n",
+ pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd);
+ /* check if we satisfy the min nbr of messages per worker to start a new one */
+ if(pThis->iCurNumWrkThrd == 0 ||
+ pThis->iQueueSize / pThis->iCurNumWrkThrd > pThis->iMinMsgsPerWrkr) {
+ dbgprintf("Queue 0x%lx: high activity - starting additional worker thread.\n",
+ queueGetID(pThis));
+ queueStrtNewWrkThrd(pThis);
+ }
+ }
+ }
+
+ return iRet;
+}
+
+
/* --------------- code for disk-assisted (DA) queue modes -------------------- */
@@ -280,7 +341,7 @@ queueTurnOffDAMode(queue_t *pThis)
pthread_mutex_destroy(&pThis->mutDA);
pthread_cond_destroy(&pThis->condDA);
- queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */
+ queueTellWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */
dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
queueGetID(pThis), iRet);
@@ -289,6 +350,7 @@ queueTurnOffDAMode(queue_t *pThis)
/* check if we had any worker thread changes and, if so, act
* on them. At a minimum, terminated threads are harvested (joined).
+ * This function MUST NEVER block on the queue mutex!
*/
static rsRetVal
queueChkWrkThrdChanges(queue_t *pThis)
@@ -304,14 +366,15 @@ queueChkWrkThrdChanges(queue_t *pThis)
/* go through all threads (including DA thread) */
for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
switch(pThis->pWrkThrds[i].tCurrCmd) {
- case eWRKTHRDCMD_TERMINATED:
+ case eWRKTHRD_TERMINATED:
queueJoinWrkThrd(pThis, i);
break;
/* these cases just to satisfy the compiler, we do not act an them: */
- case eWRKTHRDCMD_NEVER_RAN:
- case eWRKTHRDCMD_RUN:
- case eWRKTHRDCMD_SHUTDOWN:
- case eWRKTHRDCMD_SHUTDOWN_IMMEDIATE:
+ case eWRKTHRD_STOPPED:
+ case eWRKTHRD_RUN_INIT:
+ case eWRKTHRD_RUNNING:
+ case eWRKTHRD_SHUTDOWN:
+ case eWRKTHRD_SHUTDOWN_IMMEDIATE:
/* DO NOTHING */
break;
}
@@ -373,7 +436,7 @@ dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx,
* Note that the child queue now in almost all cases is non-empty, because we just enqueued
* a message.
*/
- if(iQueueSize <= pThis->iLowWtrMrk && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
+ if(iQueueSize <= pThis->iLowWtrMrk && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) {
dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n",
queueGetID(pThis), iMyThrdIndx, iQueueSize);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
@@ -463,7 +526,7 @@ queueStrtDA(queue_t *pThis)
* reserving worker thread 0 for DA queues. So if we would join the other
* workers, we would screw up and do against our design goal.
*/
- CHKiRet(queueTellWrkThrds(pThis, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE));
+ CHKiRet(queueTellActWrkThrds(pThis, 1, eWRKTHRD_SHUTDOWN_IMMEDIATE));
/* as we are right now starting DA mode because we are so busy, it is
* extremely unlikely that any worker is sleeping on empty queue. HOWEVER,
@@ -493,6 +556,24 @@ finalize_it:
}
+/* initiate DA mode
+ * rgerhards, 2008-01-16
+ */
+static inline rsRetVal
+queueInitDA(queue_t *pThis)
+{
+ DEFiRet;
+
+ /* indicate we now run in DA mode - this is reset by the DA worker if it fails */
+ pThis->qRunsDA = QRUNS_DA_INIT;
+
+ /* now we must start our DA worker thread - it does the rest of the initialization */
+ iRet = queueStrtWrkThrd(pThis, 0);
+
+ return iRet;
+}
+
+
/* check if we need to start disk assisted mode and send some signals to
* keep it running if we are already in it.
* rgerhards, 2008-01-14
@@ -522,25 +603,18 @@ queueChkStrtDA(queue_t *pThis)
pthread_mutex_unlock(&pThis->mutDA);
pthread_setcancelstate(iCancelStateSave, NULL);
queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */
-
- /* we need to re-check if we run disk-assisted, because that status may have changed
- * in our high water mark processing.
- */
- if(pThis->qRunsDA != QRUNS_REGULAR)
- FINALIZE;
}
- /* if we reach this point, we are NOT currently running in DA mode.
- * TODO: split this function, I think that would make the code easier
- * to read. -- rgerhards, 2008-10-15
+ /* we need to re-check if we run disk-assisted, because that status may have changed
+ * in our high water mark processing.
*/
- dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n",
- queueGetID(pThis), pThis->iQueueSize);
-
- pThis->qRunsDA = QRUNS_DA_INIT; /* indicate we now run in DA mode - this is reset by the DA worker if it fails */
+ if(pThis->qRunsDA == QRUNS_REGULAR) {
+ /* if we reach this point, we are NOT currently running in DA mode. */
+ dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n",
+ queueGetID(pThis), pThis->iQueueSize);
- /* now we must start our DA worker thread - it does the rest of the initialization */
- CHKiRet(queueStrtWrkThrd(pThis, 0));
+ queueInitDA(pThis); /* initiate DA mode */
+ }
finalize_it:
return iRet;
@@ -1082,7 +1156,7 @@ queueWrkThrdCancel(queue_t *pThis)
/* first tell the workers our request */
for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i)
- if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRDCMD_TERMINATED) {
+ if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATED) {
dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i);
pthread_cancel(pThis->pWrkThrds[i].thrdID);
}
@@ -1108,7 +1182,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as
* good.
*/
- iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, pThis->toQShutdown);
+ iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN, pThis->toQShutdown);
if(iRet == RS_RET_TIMED_OUT) {
if(pThis->toQShutdown == 0) {
iRet = RS_RET_OK;
@@ -1116,7 +1190,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* OK, we now need to try force the shutdown */
dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n",
queueGetID(pThis));
- iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown);
+ iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown);
}
}
@@ -1133,7 +1207,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* time set (timeout == 0)! -- rgerhards, 2008-01-14
*/
for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_NEVER_RAN) {
+ if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRD_STOPPED) {
queueJoinWrkThrd(pThis, i);
}
}
@@ -1142,8 +1216,8 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* terminated now. For simplicity, we simply overwrite the states.
*/
for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_NEVER_RAN) {
- pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_TERMINATED;
+ if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRD_STOPPED) {
+ pThis->pWrkThrds[i].tCurrCmd = eWRKTHRD_TERMINATED;
}
}
@@ -1161,7 +1235,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* rgerhards, 2008-01-14
*/
static inline rsRetVal
-queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave)
+queueWorkerChkAndCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave)
{
DEFiRet;
rsRetVal iRetLocal;
@@ -1172,7 +1246,10 @@ queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave)
/* first check if we have still something to process */
- if(pThis->iQueueSize == 0) {
+ if(pThis->iQueueSize == 0 ||
+ ( (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd != eWRKTHRD_RUNNING)
+ && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd != eWRKTHRD_SHUTDOWN)
+ )) {
pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
FINALIZE;
@@ -1242,6 +1319,7 @@ queueWorker(void *arg)
{
queue_t *pThis = (queue_t*) arg;
sigset_t sigSet;
+ struct timespec t;
int iMyThrdIndx; /* index for this thread in queue thread table */
int iCancelStateSave;
@@ -1267,20 +1345,32 @@ queueWorker(void *arg)
if(iMyThrdIndx == 0) { /* are we the DA worker? */
if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */
/* if we could not init the DA queue, we have nothing to do, so shut down. */
- queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);
+ queueTellWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE);
}
}
+ /* finally change to RUNNING state. We need to check if we actually should still run,
+ * because someone may have requested us to shut down even before we got a chance to do
+ * our init. That would be a bad race... -- rgerhards, 2008-01-16
+ */
+ if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUN_INIT)
+ pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_RUNNING; /* we are running now! */
+
pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
/* end one-time stuff */
/* now we have our identity, on to real processing */
- while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN
- || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) {
+ while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING
+ || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN && pThis->iQueueSize > 0)) {
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
- while(pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
+
+ /* process any pending thread requests */
+ queueChkWrkThrdChanges(pThis);
+
+dbgprintf("Queue %p/w%d: pre empty queue, qsize %d\n", pThis, iMyThrdIndx, pThis->iQueueSize);
+ while(pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) {
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
queueGetID(pThis), iMyThrdIndx);
if(pThis->bSignalOnEmpty > 0) {
@@ -1304,11 +1394,26 @@ dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx);
/* If we arrive here, we have the regular case, where we can safely assume that
* iQueueSize and tCmd have not changed since the while().
*/
-dbgprintf("Queue %p/w%d: pre condwait ->notEmpty\n", pThis, iMyThrdIndx);
- pthread_cond_wait(pThis->notEmpty, pThis->mut);
+dbgprintf("Queue %p/w%d: pre condwait ->notEmpty, worker shutdown %d\n", pThis, iMyThrdIndx, pThis->toWrkShutdown);
+ if(pThis->toWrkShutdown == -1) {
+dbgprintf("worker never times out!\n");
+ /* never shut down any started worker */
+ pthread_cond_wait(pThis->notEmpty, pThis->mut);
+ } else {
+ queueTimeoutComp(&t, pThis->toWrkShutdown);/* get absolute timeout */
+ if(pthread_cond_timedwait (pThis->notEmpty, pThis->mut, &t) != 0) {
+ dbgprintf("Queue 0x%lx/w%d: inactivity timeout, worker terminating...\n",
+ queueGetID(pThis), iMyThrdIndx);
+ /* we use SHUTDOWN (and not SHUTDOWN_IMMEDIATE) so that the worker
+ * does not terminate if in the mean time a new message arrived.
+ */
+ pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_SHUTDOWN;
+ }
+ }
dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx);
}
- queueWorkerCallConsumer(pThis, iMyThrdIndx, iCancelStateSave);
+
+ queueWorkerChkAndCallConsumer(pThis, iMyThrdIndx, iCancelStateSave);
/* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
* a cancellation point in itself. As we run most of the time without cancel enabled, I fear
@@ -1328,10 +1433,10 @@ dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx);
* and would be very hard to debug. The yield() is a sure fix, its performance overhead
* should be well accepted given the above facts. -- rgerhards, 2008-01-10
*/
+ pthread_yield();
dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n",
queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd);
- pthread_yield();
- if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN) && pThis->iQueueSize > 0)
+ if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0)
dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has "
" %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
}
@@ -1340,12 +1445,12 @@ dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n",
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
pThis->iCurNumWrkThrd--;
- if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN ||
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN_IMMEDIATE) {
+ if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN ||
+ pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN_IMMEDIATE) {
/* in shutdown case, we need to flag termination. All other commands
* have a meaning to the thread harvester, so we can not overwrite them
*/
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED;
+ pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_TERMINATED;
}
pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
@@ -1440,7 +1545,6 @@ finalize_it:
rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
- int i;
rsRetVal iRetLocal;
int bInitialized = 0; /* is queue already initialized? */
@@ -1452,61 +1556,42 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
dbgprintf("Queue 0x%lx: type %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis), pThis->qType,
pThis->bIsDA, pThis->iMaxFileSize);
- if(pThis->qType != QUEUETYPE_DIRECT) {
- if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
-
- if(pThis->bIsDA) {
- /* If we are disk-assisted, we need to check if there is a QIF file
- * which we need to load. -- rgerhards, 2008-01-15
- */
- iRetLocal = queueHaveQIF(pThis);
- if(iRetLocal == RS_RET_OK) {
- dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n",
- queueGetID(pThis));
+ if(pThis->qType == QUEUETYPE_DIRECT)
+ FINALIZE; /* with direct queues, we are already finished... */
- /* indicate we now run in DA mode - this is reset by the DA worker if it fails */
- pThis->qRunsDA = QRUNS_DA_INIT;
-
- /* now we must start our DA worker thread - it does the rest of the initialization */
- CHKiRet(queueStrtWrkThrd(pThis, 0));
- bInitialized = 1;
- }
- }
+ if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- if(!bInitialized) {
- dbgprintf("Queue 0x%lx: queue starts up without loading any disk state\n", queueGetID(pThis));
- /* worker 0 is reserved for disk-assisted mode, so do not start */
- queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_NEVER_RAN);
+ if(pThis->bIsDA) {
+ /* If we are disk-assisted, we need to check if there is a QIF file
+ * which we need to load. -- rgerhards, 2008-01-15
+ */
+ iRetLocal = queueHaveQIF(pThis);
+ if(iRetLocal == RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n",
+ queueGetID(pThis));
- /* fire up the worker threads */
- for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) {
- queueStrtWrkThrd(pThis, i);
- }
+ queueInitDA(pThis); /* initiate DA mode */
+ bInitialized = 1; /* we are done */
+ } else {
+ // TODO: use logerror? -- rgerhards, 2008-01-16
+ dbgprintf("Queue 0x%lx: error %d trying to access on-disk queue files, starting without them. "
+ "Some data may be lost\n", queueGetID(pThis), iRetLocal);
}
}
-finalize_it:
- return iRet;
-}
-
-
-#if 0
-/* persist disk status on disk. This is necessary if we run either
- * a disk queue or in a disk assisted mode.
- */
-static rsRetVal queuePersistDskFilInfo(queue_t *pThis)
-{
- DEFiRet;
-
- assert(pThis != NULL);
+ if(!bInitialized) {
+ dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis));
+ /* worker 0 is reserved for disk-assisted mode, so do not start */
+ queueTellWrkThrd(pThis, 0, eWRKTHRD_STOPPED);
+ /* fire up the worker threads */
+ queueStrtAllWrkThrds(pThis);
+ }
finalize_it:
return iRet;
}
-#endif
-
/* persist the queue to disk. If we have something to persist, we first
@@ -1619,7 +1704,7 @@ rsRetVal queueDestruct(queue_t *pThis)
* leave it for the time being. -- rgerhards, 2008-01-16
*/
if(pThis->qRunsDA != QRUNS_REGULAR)
- queueWrkThrdReqTrm(pThis->pqDA, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 0);
+ queueWrkThrdReqTrm(pThis->pqDA, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0);
/* then, terminate our own worker threads */
if(pThis->pWrkThrds != NULL) {
@@ -1760,6 +1845,9 @@ queueEnqObj(queue_t *pThis, void *pUsr)
if(pThis->bIsDA)
CHKiRet(queueChkStrtDA(pThis));
+ /* re-process any new pending thread requests and see if we need to start workers */
+ queueChkAndStrtWrk(pThis);
+
/* and finally (try to) enqueue what is left over */
while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis));
@@ -1790,12 +1878,14 @@ finalize_it:
DEFpropSetMeth(queue, iPersistUpdCnt, int);
DEFpropSetMeth(queue, toQShutdown, long);
DEFpropSetMeth(queue, toActShutdown, long);
+DEFpropSetMeth(queue, toWrkShutdown, long);
DEFpropSetMeth(queue, toEnq, long);
DEFpropSetMeth(queue, iHighWtrMrk, int);
DEFpropSetMeth(queue, iLowWtrMrk, int);
DEFpropSetMeth(queue, iDiscardMrk, int);
DEFpropSetMeth(queue, iDiscardSeverity, int);
DEFpropSetMeth(queue, bIsDA, int);
+DEFpropSetMeth(queue, iMinMsgsPerWrkr, int);
/* get the size of this queue. The important thing about this get method is that it