summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-16 17:27:16 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-16 17:27:16 +0000
commitc701cbaaeb9202887a97a6b2de60852713d1785e (patch)
tree518286ec521b9cb5f97bf81da22b183882897900 /queue.c
parent75a8f92d5001f555606b2ddb5de30acf689e2422 (diff)
downloadrsyslog-c701cbaaeb9202887a97a6b2de60852713d1785e.tar.gz
rsyslog-c701cbaaeb9202887a97a6b2de60852713d1785e.tar.xz
rsyslog-c701cbaaeb9202887a97a6b2de60852713d1785e.zip
some cleanup and fixes
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c45
1 files changed, 19 insertions, 26 deletions
diff --git a/queue.c b/queue.c
index acfdefb3..4291c509 100644
--- a/queue.c
+++ b/queue.c
@@ -112,6 +112,7 @@ queueJoinWrkThrd(queue_t *pThis, int iIdx)
pThis->pWrkThrds[iIdx].tCurrCmd);
pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL);
pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRD_STOPPED; /* back to virgin... */
+ pThis->pWrkThrds[iIdx].thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
dbgprintf("Queue 0x%lx: thread %d state %d, has exited\n", queueGetID(pThis), iIdx,
pThis->pWrkThrds[iIdx].tCurrCmd);
@@ -133,7 +134,7 @@ queueStrtWrkThrd(queue_t *pThis, int i)
queueTellWrkThrd(pThis, i, eWRKTHRD_RUN_INIT);
iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
- dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n",
+ dbgprintf("Queue 0x%lx: starting Worker thread %x, index %d with state %d.\n",
(unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState);
return iRet;
@@ -158,15 +159,17 @@ queueStrtNewWrkThrd(queue_t *pThis)
* we do NOT start a new one. Let's give the other one a chance, first.
*/
iStartingUp = -1;
- for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i)
+ for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) {
+dbgprintf("Queue %p: search thrd tbl slot: i %d, CuccCmd %d\n", pThis, i, pThis->pWrkThrds[i].tCurrCmd);
if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_STOPPED) {
break;
} else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_INIT) {
iStartingUp = i;
break;
}
+ }
-dbgprintf("after thrd search: i %d, iStartingUp %d\n", i, iStartingUp);
+dbgprintf("Queue %p: after thrd search: i %d, iStartingUp %d\n", pThis, i, iStartingUp);
if(iStartingUp > -1)
ABORT_FINALIZE(RS_RET_ALREADY_STARTING);
@@ -209,25 +212,6 @@ queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd)
}
-/* This once was used to start all regular worker threads. Now, we have
- * dynamic grow of the worker thread pool, based on needs. This function is
- * still preserved, but it now does not start all but only worker 1, which
- * is always present.
- * rgerhards, 2008-01-16
- */
-static inline rsRetVal
-queueStrtAllWrkThrds(queue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, queue);
- assert(pThis->pWrkThrds[1].tCurrCmd < eWRKTHRD_RUN_INIT);
- //iRet = queueStrtWrkThrd(pThis, 1);
-
- return iRet;
-}
-
-
/* compute an absolute time timeout suitable for calls to pthread_cond_timedwait()
* rgerhards, 2008-01-14
*/
@@ -288,8 +272,8 @@ queueChkAndStrtWrk(queue_t *pThis)
/* check if we need to start up another worker (only in regular mode) */
if(pThis->qRunsDA == QRUNS_REGULAR) {
if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) {
-dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d\n",
- pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd);
+dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qRunsDA: %d\n",
+ pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA);
/* check if we satisfy the min nbr of messages per worker to start a new one */
if(pThis->iCurNumWrkThrd == 0 ||
pThis->iQueueSize / pThis->iCurNumWrkThrd > pThis->iMinMsgsPerWrkr) {
@@ -328,7 +312,14 @@ queueTurnOffDAMode(queue_t *pThis)
* keep that comment if future need arises.
*/
- queueStrtAllWrkThrds(pThis); /* restore our regular worker threads */
+ /* we start at least one worker thread. If no new messages come in, this will
+ * be the only one for the time being. I am not yet sure if that is acceptable.
+ * To solve that issue, queueWorker () would need to check if it needs to fire
+ * up addtl ones. I am not yet sure if that is justified. After all, if no new
+ * messages come into the queue, we may be well off with a single worker.
+ * rgerhards, 2008-01-16
+ */
+ queueStrtNewWrkThrd(pThis);
pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
@@ -1332,6 +1323,8 @@ queueWorker(void *arg)
for(iMyThrdIndx = 0 ; iMyThrdIndx <= pThis->iNumWorkerThreads ; ++iMyThrdIndx)
if(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self())
break;
+dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis,
+ (unsigned) pThis->pWrkThrds[iMyThrdIndx].thrdID, iMyThrdIndx, (unsigned) pthread_self());
assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self());
dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx);
@@ -1586,7 +1579,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
queueTellWrkThrd(pThis, 0, eWRKTHRD_STOPPED);
/* fire up the worker threads */
- queueStrtAllWrkThrds(pThis);
+ // TODO: preforked workers! queueStrtAllWrkThrds(pThis);
}
finalize_it: