diff options
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 85 |
1 files changed, 52 insertions, 33 deletions
@@ -1,7 +1,6 @@ -// TODO: DA worker must not wait eternal on shutdown when in enqueue only mode!:w -// - // TODO: we need to implement peek(), without it (today!) we lose one message upon - // worker cancellation! -- rgerhards, 2008-01-14 +// TODO: DA worker must not wait eternal on shutdown when in enqueue only mode! +// TODO: we need to implement peek(), without it (today!) we lose one message upon +// worker cancellation! -- rgerhards, 2008-01-14 // TODO: think about mutDA - I think it's no longer needed // TODO: start up the correct num of workers when switching to non-DA mode // TODO: "preforked" worker threads @@ -10,18 +9,18 @@ // call consumer state. Facilitates retaining messages in queue until action could // be called! /* queue.c - * - * This file implements the queue object and its several queueing methods. - * - * File begun on 2008-01-03 by RGerhards - * - * There is some in-depth documentation available in doc/dev_queue.html - * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it - * if you are getting aquainted to the object. - * - * Copyright 2008 Rainer Gerhards and Adiscon GmbH. - * - * This file is part of rsyslog. +* +* This file implements the queue object and its several queueing methods. +* +* File begun on 2008-01-03 by RGerhards +* +* There is some in-depth documentation available in doc/dev_queue.html +* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it +* if you are getting aquainted to the object. +* +* Copyright 2008 Rainer Gerhards and Adiscon GmbH. +* +* This file is part of rsyslog. * * Rsyslog is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -88,18 +87,12 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); -RUNLOG_VAR("%d", pThis->bEnqOnly); if(!pThis->bEnqOnly) { -RUNLOG_VAR("%d", pThis->bRunsDA); if(pThis->bRunsDA) { -RUNLOG_VAR("%d", pThis->bQueueStarted); -RUNLOG_VAR("%d", pThis->iQueueSize); -RUNLOG_VAR("%d", pThis->iHighWtrMrk); /* if we have not yet reached the high water mark, there is no need to start a * worker. -- rgerhards, 2008-01-26 */ if(pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { -RUNLOG; wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } } else { @@ -227,7 +220,6 @@ queueStartDA(queue_t *pThis) /* set up sync objects */ pthread_mutex_init(&pThis->mutDA, NULL); - pthread_cond_init(&pThis->condDA, NULL); /* create message queue */ dbgprintf("Queue %p: queueSTrtDA pre child queue construct,\n", pThis); @@ -842,6 +834,11 @@ queueDel(queue_t *pThis, void *pUsr) * before this function is called so that DA queues will be fully persisted to * disk (if configured to do so). * rgerhards, 2008-01-24 + * Please note that this function shuts down BOTH the parent AND the child queue + * in DA case. This is necessary because their timeouts are tightly coupled. Most + * importantly, the timeouts would be applied twice (or logic be extremely + * complex) if each would have its own shutdown. The function does not self check + * this condition - the caller must make sure it is not called with a parent. */ static rsRetVal queueShutdownWorkers(queue_t *pThis) { @@ -851,6 +848,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) rsRetVal iRetLocal; ISOBJ_TYPE_assert(pThis, queue); + assert(pThis->pqParent == NULL); /* detect invalid calling sequence */ dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis)); @@ -891,6 +889,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) */ RUNLOG_VAR("%d", pThis->toQShutdown); timeoutComp(&tTimeout, pThis->toQShutdown); + dbgprintf("Queue 0x%lx: trying shutdown of regular workers\n", queueGetID(pThis)); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { dbgprintf("Queue 0x%lx: regular shutdown timed out on primary queue (this is OK)\n", queueGetID(pThis)); @@ -905,6 +904,7 @@ RUNLOG_VAR("%d", pThis->toQShutdown); /* we use the same absolute timeout as above, so we do not use more than the configured * timeout interval! */ + dbgprintf("Queue 0x%lx: trying shutdown of DA workers\n", queueGetID(pThis)); iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { dbgprintf("Queue 0x%lx: shutdown timed out on DA queue (this is OK)\n", @@ -959,7 +959,6 @@ RUNLOG; * the queue is now empty. If regular workers are still running, and try to pull the next message, * they will automatically terminate as there no longer is any message left to process. */ - // TODO: use pWtp mutex? - guess so! BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ RUNLOG_VAR("%d", pThis->iQueueSize); //old: if(pThis->iQueueSize > 0) { @@ -967,6 +966,8 @@ RUNLOG_VAR("%d", pThis->iQueueSize); timeoutComp(&tTimeout, pThis->toActShutdown); if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgprintf("Queue 0x%lx: trying immediate shutdown of regular workers\n", queueGetID(pThis)); + // TODO: ??? cut&paste error? iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and " @@ -976,11 +977,15 @@ RUNLOG_VAR("%d", pThis->iQueueSize); "in disk save mode. Continuing, but results are unpredictable\n", queueGetID(pThis), iRetLocal); } - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + if(pThis->bIsDA) { + /* we need to re-aquire the mutex for the next check in this case! */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + } } - if(wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) { + if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) { /* and now the same for the DA queue */ END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgprintf("Queue 0x%lx: trying immediate shutdown of DA workers\n", queueGetID(pThis)); iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { dbgprintf("Queue 0x%lx: immediate shutdown timed out on DA queue (this is acceptable and " @@ -1042,6 +1047,8 @@ RUNLOG_VAR("%d", pThis->iQueueSize); } } +// TODO: think about joining all workers, so that the destructors are called +// /* ... finally ... all worker threads have terminated :-) * Well, more precisely, they *are in termination*. Some cancel cleanup handlers * may still be running. @@ -1341,8 +1348,10 @@ if(pThis->pqDA != NULL) static int queueChkStopWrkrReg(queue_t *pThis) { + BEGINfunc int bStopWrkr = pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && pThis->iQueueSize == 0); RUNLOG_VAR("%d", bStopWrkr); + ENDfunc return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && pThis->iQueueSize == 0); } @@ -1354,10 +1363,12 @@ static int queueIsIdleDA(queue_t *pThis) { /* remember: iQueueSize is the DA queue size, not the main queue! */ + BEGINfunc RUNLOG_VAR("%d", pThis->iLowWtrMrk); dbgprintf("queueIsIdleDA(%p) returns %d, qsize %d\n", pThis, pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk), pThis->iQueueSize); //// TODO: I think we need just a single function... //return (pThis->iQueueSize == 0); + ENDfunc return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk)); } /* must only be called when the queue mutex is locked, else results @@ -1386,6 +1397,10 @@ queueRegOnWrkrShutdown(queue_t *pThis) if(pThis->pqParent != NULL) { RUNLOG_VAR("%p", pThis->pqParent); RUNLOG_VAR("%p", pThis->pqParent->pWtpDA); + assert(pThis->pqParent->pWtpDA != NULL); + pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ + wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ +#if 0 if(pThis->pqParent->pWtpDA == NULL) { /* this can happen if we are not set to save on an eternal timeout. We * log a warning but otherwise do nothing @@ -1396,6 +1411,7 @@ RUNLOG_VAR("%p", pThis->pqParent->pWtpDA); pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ } +#endif } RETiRet; @@ -1633,11 +1649,14 @@ rsRetVal queueDestruct(queue_t **ppThis) pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ - /* shut down all workers (handles *all* of the persistence logic) */ - //if(!pThis->bEnqOnly) /* in enque-only mode, we have no worker pool! */ - if(!pThis->bEnqOnly && pThis->pqParent == NULL) /* in enque-only mode, we have no worker pool! */ + /* shut down all workers (handles *all* of the persistence logic) + * See function head comment of queueShutdownWorkers () on why we don't call it + * We also do not need to shutdown workers when we are in enqueue-only mode or we are a + * direct queue - because in both cases we have none... ;) + * with a child! -- rgerhards, 2008-01-28 + */ + if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL) queueShutdownWorkers(pThis); -RUNLOG; /* finally destruct our (regular) worker thread pool * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen, @@ -1767,8 +1786,6 @@ queueEnqObj(queue_t *pThis, void *pUsr) ISOBJ_TYPE_assert(pThis, queue); -// TODO: check if queue is terminating and if so either discard message or enqeue it to the DA queue *directly* -dbgprintf("Queue %p: EnqObj() 1\n", pThis); RUNLOG_VAR("%d", pThis->bRunsDA); /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE @@ -1815,7 +1832,9 @@ finalize_it: } /* make sure at least one worker is running. */ - queueAdviseMaxWorkers(pThis); + if(pThis->qType != QUEUETYPE_DIRECT) { + queueAdviseMaxWorkers(pThis); + } RETiRet; } |