diff options
-rw-r--r-- | ChangeLog | 1 | ||||
-rw-r--r-- | queue.c | 55 | ||||
-rw-r--r-- | queue.h | 6 | ||||
-rw-r--r-- | wtp.c | 4 |
4 files changed, 57 insertions, 9 deletions
@@ -2,6 +2,7 @@ Version 3.10.3 (rgerhards), 2008-01-14 - fixed a bug with standard template definitions (not a big deal) - thanks to varmojfekoj for spotting it +- run-time instrumentation added --------------------------------------------------------------------------- Version 3.10.2 (rgerhards), 2008-01-14 - added the ability to keep stop rsyslogd without the need to drain @@ -1,3 +1,5 @@ +// TODO: DA worker must not wait eternal on shutdown when in enqueue only mode!:w +// // TODO: we need to implement peek(), without it (today!) we lose one message upon // worker cancellation! -- rgerhards, 2008-01-14 // TODO: think about mutDA - I think it's no longer needed @@ -154,7 +156,12 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) RUNLOG_VAR("%d", pThis->bEnqOnly); if(!pThis->bEnqOnly) { if(pThis->bRunsDA) { - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ + /* if we have not yet reached the high water mark, there is no need to start a + * worker. -- rgerhards, 2008-01-26 + */ + // WRONG: if(pThis->iQueueSize >= pThis->iHighWtrMrk) { + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ + //} } else { if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { iMaxWorkers = 1; @@ -262,6 +269,7 @@ dbgprintf("Queue %p: queueStartDA pre start\n", pThis); wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */ pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */ + pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */ pthread_cond_signal(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n", @@ -980,11 +988,17 @@ RUNLOG_VAR("%d", pThis->iQueueSize); /* ... and now the DA queue, if it exists (should always be after the primary one) */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) { + //TODO: use right mutex! + //old: if(pThis->pqDA != NULL && pThis->pqDA->iQueueSize > 0) { +if(pThis->pqDA != NULL) { +RUNLOG_VAR("%p", pThis->pqDA->pWtpReg); +RUNLOG_VAR("%d", pThis->pqDA->pWtpReg->iCurNumWrkThrd); +} + if(pThis->pqDA != NULL && pThis->pqDA->pWtpReg->iCurNumWrkThrd > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); dbgprintf("Queue 0x%lx: checking to see if we need to cancel any worker threads of the DA queue\n", queueGetID(pThis)); - iRetLocal = wtpCancelAll(pThis->pWtpReg); + iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg); if(iRetLocal != RS_RET_OK) { dbgprintf("Queue 0x%lx: unexpected iRet state %d trying to cancel DA queue worker " "threads, continuing, but results are unpredictable\n", @@ -1078,23 +1092,28 @@ finalize_it: /* cancellation cleanup handler for queueWorker () * Updates admin structure and frees ressources. + * Params: + * arg1 - user pointer (in this case a queue_t) + * arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!]) * rgerhards, 2008-01-16 */ static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2) { + DEFiRet; + queue_t *pThis = (queue_t*) arg1; - wti_t *pWti = (wti_t*) arg2; + obj_t *pUsr = (obj_t*) arg2; ISOBJ_TYPE_assert(pThis, queue); - ISOBJ_TYPE_assert(pWti, wti); + ISOBJ_assert(pUsr); dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n", queueGetID(pThis)); /* TODO: re-enqueue the data element! */ - return RS_RET_OK; + RETiRet; } @@ -1308,8 +1327,13 @@ queueRegOnWrkrShutdown(queue_t *pThis) { DEFiRet; + ISOBJ_TYPE_assert(pThis, queue); + if(pThis->pqParent != NULL) { +RUNLOG_VAR("%p", pThis->pqParent); +RUNLOG_VAR("%p", pThis->pqParent->pWtpDA); assert(pThis->pqParent->pWtpDA != NULL); + pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ } @@ -1317,6 +1341,24 @@ queueRegOnWrkrShutdown(queue_t *pThis) } +/* The following function is called when a regular queue worker starts up. We need this + * hook to indicate in the parent queue (if we are a child) that we are not done yet. + */ +static rsRetVal +queueRegOnWrkrStartup(queue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + + if(pThis->pqParent != NULL) { + pThis->pqParent->bChildIsDone = 0; + } + + RETiRet; +} + + /* start up the queue - it must have been constructed and parameters defined * before. */ @@ -1368,6 +1410,7 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, queueIsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, queueConsumerReg)); CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, queueConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, queueRegOnWrkrStartup)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, queueRegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); @@ -95,8 +95,10 @@ typedef struct queue_s { pthread_cond_t *condSignalOnEmpty;/* caller-provided condition to be signalled when queue is empty (DA mode!) */ pthread_mutex_t *mutSignalOnEmpty; /* and its associated mutex */ pthread_cond_t *condSignalOnEmpty2;/* another condition to be signalled on empty */ - int bSignalOnEmpty; /* signal caller when queue is empty via xxxSignalOnEmpty cond/mut, - 0 = do not, 1 = signal only condSignalOnEmpty, 2 = signal both condSig..*/ + //int bSignalOnEmpty; /* signal caller when queue is empty via xxxSignalOnEmpty cond/mut, + // 0 = do not, 1 = signal only condSignalOnEmpty, 2 = signal both condSig..*/ // TODO: no longer needed? + + int bChildIsDone; /* set to 1 when the child DA queue has finished processing, 0 otherwise */ int bThrdStateChanged; /* at least one thread state has changed if 1 */ /* end sync variables */ /* the following variables are always present, because they @@ -352,9 +352,11 @@ wtpCancelAll(wtp_t *pThis) /* process any pending thread requests so that we know who actually is still running */ wtpProcessThrdChanges(pThis); - /* first tell the workers our request */ +RUNLOG_VAR("%d", pThis->iCurNumWrkThrd); + /* go through all workers and cancel those that are active */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { // TODO: mutex lock! +RUNLOG_VAR("%d", pThis->pWrkr[i]->tCurrCmd); if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) { dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); pthread_cancel(pThis->pWrkr[i]->thrdID); |