diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-18 23:24:51 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-18 23:24:51 +0000 |
commit | fabcb72a0994cd832cc1a5019123cfec35ef0b82 (patch) | |
tree | df0c126dd9fb19e6c9cd79698094f60fbfd3e2a6 /queue.c | |
parent | 2bd1e283527bae01d61b85682a7e8ecc778997a8 (diff) | |
download | rsyslog-fabcb72a0994cd832cc1a5019123cfec35ef0b82.tar.gz rsyslog-fabcb72a0994cd832cc1a5019123cfec35ef0b82.tar.xz rsyslog-fabcb72a0994cd832cc1a5019123cfec35ef0b82.zip |
saving state
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 174 |
1 files changed, 102 insertions, 72 deletions
@@ -1,3 +1,4 @@ +// TODO: think about mutDA - I think it's no longer needed // TODO: start up the correct num of workers when switching to non-DA mode // TODO: "preforked" worker threads // TODO: do an if(debug) in dbgrintf - performance in release build! @@ -64,6 +65,17 @@ static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly); /* methods */ + +/* cancellation cleanup handler - frees provided mutex + * rgerhards, 2008-01-14 + */ +static void queueMutexCleanup(void *arg) +{ + assert(arg != NULL); + pthread_mutex_unlock((pthread_mutex_t*) arg); +} + + /* get the current worker state. For simplicity and speed, we have * NOT used our regular calling interface this time. I hope that won't * bite in the long term... -- rgerhards, 2008-01-17 @@ -95,12 +107,14 @@ dbgprintf("Queue 0x%lx: trying to send command %d to thread %d\n", queueGetID(p switch(tCmd) { case eWRKTHRD_TERMINATING: pthread_cond_destroy(&pThis->condInitDone); + pthread_mutex_destroy(&pThis->mut); dbgprintf("Queue 0x%lx/w%d: thread terminating with %d entries left in queue, %d workers running.\n", queueGetID(pThis->pQueue), pThis->iThrd, pThis->pQueue->iQueueSize, pThis->pQueue->iCurNumWrkThrd); break; case eWRKTHRD_RUN_CREATED: pthread_cond_init(&pThis->condInitDone, NULL); + pthread_mutex_init(&pThis->mut, NULL); break; case eWRKTHRD_RUN_INIT: break; @@ -184,7 +198,7 @@ qWrkrWaitStartup(qWrkThrd_t *pThis) dbgprintf("Queue 0x%lx: waiting on worker thread %d startup\n", queueGetID(pThis->pQueue), pThis->iThrd); pthread_cond_wait(&pThis->condInitDone, pThis->pQueue->mut); -dbgprintf("startup done!\n"); +dbgprintf("worker startup done!\n"); } pthread_mutex_unlock(pThis->pQueue->mut); pthread_setcancelstate(iCancelStateSave, NULL); @@ -287,6 +301,30 @@ queueStrtWrkThrd(queue_t *pThis, int i) } +/* start the DA worker thread (if not already running) + */ +static inline rsRetVal +queueStrtDAWrkr(queue_t *pThis) +{ + DEFiRet; + int iCancelStateSave; + + ISOBJ_TYPE_assert(pThis, queue); + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); +dbgprintf("Queue %p: DAWrkr thread mutex lock\n", pThis); + pthread_mutex_lock(&pThis->pWrkThrds[0].mut); + pthread_cleanup_push(queueMutexCleanup, &pThis->pWrkThrds[0].mut); + pthread_setcancelstate(iCancelStateSave, NULL); + if(pThis->pWrkThrds[0].tCurrCmd == eWRKTHRD_STOPPED) { + iRet = queueStrtWrkThrd(pThis, 0); + } + pthread_cleanup_pop(1); +dbgprintf("Queue %p: DAWrkr thread mutex unlock\n", pThis); + + return iRet; +} + /* Starts a *new* worker thread. Function searches itself for a free index spot. It must only * be called when we have less than max workers active. Pending wrkr thread requests MUST have * been processed before calling this function. -- rgerhards, 2008-01-16 @@ -399,7 +437,7 @@ queueWakeupWrkThrds(queue_t *pThis, int bWithDAWrk) } -/* This function Checks if (another) worker threads needs to be started. It +/* This function checks if (another) worker threads needs to be started. It * must be called while the caller holds a lock on the queue mutex. So it must not * do anything that either reaquires the mutex or forces somebody else to aquire * it (that would lead to a deadlock). @@ -436,7 +474,7 @@ dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qR dbgprintf("Queue %p: DA worker is no longer running, restarting, qsize %d, workers %d, qRunsDA: %d\n", pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA); /* DA worker has timed out and needs to be restarted */ - iRet = queueStrtWrkThrd(pThis, 0); + iRet = queueStrtDAWrkr(pThis); } } @@ -476,7 +514,10 @@ queueTurnOffDAMode(queue_t *pThis) * messages come into the queue, we may be well off with a single worker. * rgerhards, 2008-01-16 */ - if(pThis->bEnqOnly == 0 && pThis->bQueueInDestruction == 0) +dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQueInDestr %d, NumWrkd %d\n", + queueGetID(pThis), + pThis->bEnqOnly,pThis->bQueueInDestruction,pThis->iCurNumWrkThrd); + if(pThis->bEnqOnly == 0 && pThis->bQueueInDestruction == 0 && pThis->iCurNumWrkThrd < 2) queueStrtNewWrkThrd(pThis); pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */ @@ -587,6 +628,7 @@ queueStrtDA(queue_t *pThis) pThis->pqDA->mutSignalOnEmpty = &pThis->mutDA; pThis->pqDA->condSignalOnEmpty2 = pThis->notEmpty; pThis->pqDA->bSignalOnEmpty = 2; + pThis->pqDA->pqParent = pThis; CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); @@ -665,7 +707,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly) * In enqueue-only mode, we do not start any workers. */ if(pThis->bEnqOnly == 0) - iRet = queueStrtWrkThrd(pThis, 0); + iRet = queueStrtDAWrkr(pThis); return iRet; } @@ -1302,16 +1344,6 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) return iRet; } - -/* cancellation cleanup handler - frees provided mutex - * rgerhards, 2008-01-14 - */ -static void queueMutexCleanup(void *arg) -{ - assert(arg != NULL); - pthread_mutex_unlock((pthread_mutex_t*) arg); -} - /* This is a special consumer to feed the disk-queue in disk-assited mode. * When active, our own queue more or less acts as a memory buffer to the disk. * So this consumer just needs to drain the memory queue and submit entries @@ -1489,8 +1521,9 @@ queueWorkerRemainActive(queue_t *pThis, qWrkThrd_t *pWrkrInst) || ((qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && (pThis->iQueueSize > 0))); dbgprintf("Queue %p/w%d: chk 1 pre empty queue, qsize %d, cont run: %d, cmd %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize, b, qWrkrGetState(pWrkrInst)); if(b && pWrkrInst->iThrd == 0 && pThis->qRunsDA == QRUNS_DA) { - queueGetQueueSize(pThis->pqDA, &iSizeDAQueue); - b = pThis->iQueueSize >= pThis->iHighWtrMrk || iSizeDAQueue != 0; +// queueGetQueueSize(pThis->pqDA, &iSizeDAQueue); +// b = pThis->iQueueSize >= pThis->iHighWtrMrk || iSizeDAQueue != 0; + b = pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->pqDA->iQueueSize != 0; } dbgprintf("Queue %p/w%d: pre empty queue, qsize %d, cont run: %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize, b); @@ -1578,13 +1611,23 @@ dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n", if(pThis->iQueueSize == 0) { dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", queueGetID(pThis), iMyThrdIndx); - /* TODO: check if the parent DA worker is running and, if not, initiate it */ + /* check if the parent DA worker is running and, if not, initiate it. Thanks + * to queueStrtDAWrkr (), we do not actually need to check (that routines does + * that for us, but we need to aquire the parent queue's mutex to call it. + */ + if(pThis->pqParent != NULL) { + dbgprintf("Queue %p: pre start parent %p worker\n", pThis, pThis->pqParent); + queueStrtDAWrkr(pThis->pqParent); + } + if(pThis->bSignalOnEmpty > 0) { /* we need to signal our parent queue that we are empty */ dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); - pthread_mutex_lock(pThis->mutSignalOnEmpty); + //pthread_mutex_lock(pThis->mutSignalOnEmpty); // TODO: this was commented out + pthread_mutex_lock(pThis->pqParent->mut); // TODO: this was commented out pthread_cond_signal(pThis->condSignalOnEmpty); - pthread_mutex_unlock(pThis->mutSignalOnEmpty); + //pthread_mutex_unlock(pThis->mutSignalOnEmpty); // TODO: this was commented out + pthread_mutex_unlock(pThis->pqParent->mut); // TODO: this was commented out dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx); } if(pThis->bSignalOnEmpty > 1) { @@ -1596,7 +1639,8 @@ dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n", * iQueueSize and tCmd have not changed since the while(). */ dbgprintf("Queue %p/w%d: pre condwait ->notEmpty, worker shutdown %d\n", pThis, iMyThrdIndx, pThis->toWrkShutdown); - if(pThis->toWrkShutdown == -1) { + /* DA worker and first worker never have an inactivity timeout */ + if(pWrkrInst->iThrd < 2 || pThis->toWrkShutdown == -1) { dbgprintf("worker never times out!\n"); /* never shut down any started worker */ pthread_cond_wait(pThis->notEmpty, pThis->mut); @@ -1804,11 +1848,13 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ if(!bInitialized) { dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis)); /* fire up the worker threads */ + queueStrtNewWrkThrd(pThis); // TODO: preforked workers! queueStrtAllWrkThrds(pThis); } pThis->bQueueStarted = 1; finalize_it: +dbgprintf("queueStart() exit, iret %d\n", iRet); return iRet; } @@ -1923,68 +1969,52 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ - /* optimize parameters for shutdown of DA-enabled queues */ - if(pThis->bIsDA) { -dbgprintf("IsDA queue, modifying params for draining\n"); - pThis->iHighWtrMrk = 1; /* make sure we drain */ - pThis->iLowWtrMrk = 0; /* disable low water mark algo */ - if(pThis->qRunsDA == QRUNS_REGULAR) { - if(pThis->iQueueSize > 0) { - queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* initiate DA mode */ + /* we do not need to take care of any messages left in queue if we are in enqueue only mode */ + if(!pThis->bEnqOnly) { + /* in regular mode, need look at termination */ + /* optimize parameters for shutdown of DA-enabled queues */ + if(pThis->bIsDA && pThis->iQueueSize > 0) { // TODO: atomic iQueueSize! + dbgprintf("IsDA queue, modifying params for draining\n"); + pThis->iHighWtrMrk = 1; /* make sure we drain */ + pThis->iLowWtrMrk = 0; /* disable low water mark algo */ + if(pThis->qRunsDA == QRUNS_REGULAR) { + if(pThis->iQueueSize > 0) { + queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* initiate DA mode */ + } + } else { + queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */ } - } else { - queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */ - } - if(pThis->bSaveOnShutdown) { -dbgprintf("bSaveOnShutdown set, eternal timeout set\n"); - pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; + if(pThis->bSaveOnShutdown) { + dbgprintf("bSaveOnShutdown set, eternal timeout set\n"); + pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; + } + /* now we need to activate workers (read doc/dev_queue.html) */ } - /* now we need to activate workers (read doc/dev_queue.html) */ - } - // TODO: we may need to startup a regular worker if not in DA mode! - /* wait until all pending workers are started up */ - qWrkrWaitAllWrkrStartup(pThis); + /* wait until all pending workers are started up */ + qWrkrWaitAllWrkrStartup(pThis); - /* terminate our own worker threads */ - if(pThis->pWrkThrds != NULL) { - queueShutdownWorkers(pThis); - } + // We need to startup a worker if we are in non-DA mode and the queue is not empty and not in enque-only mode */ + dbgprintf("Queue %p: queueDestruct probing if any regular workers need to be started, CurWrkr %d, qsize %d, qRunsDA %d\n", + pThis, pThis->iCurNumWrkThrd, pThis->iQueueSize, pThis->qRunsDA); + pthread_mutex_lock(pThis->mut); + dbgprintf("queueDestruct mutex locked\n"); + if(pThis->iCurNumWrkThrd == 0 && pThis->iQueueSize > 0 && !pThis->bEnqOnly) { + dbgprintf("Queue %p: queueDestruct must start regular workers!\n", pThis); + queueStrtNewWrkThrd(pThis); + } + pthread_mutex_unlock(pThis->mut); + dbgprintf("queueDestruct mutex unlocked\n"); -#if 0 - /* if running DA, switch the DA queue to enqueue-only mode. That saves us some CPU cycles as - * its workers do no longer need to run. It also prevents longer-running actions to spring into - * existence while we are draining the main (memory) queue. -- rgerhads, 2008-01-16 - */ - if(pThis->qRunsDA != QRUNS_REGULAR) { - queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */ - if(pThis->bSaveOnShutdown) - pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; + /* wait again in case a new worker was started */ + qWrkrWaitAllWrkrStartup(pThis); } - /* then, terminate our own worker threads */ + /* terminate our own worker threads */ if(pThis->pWrkThrds != NULL) { queueShutdownWorkers(pThis); } - /* If we currently run in DA mode, the in-memory queue is already persisted to disk. - * If we are not in DA mode, we may have data left in in-memory queues, this data will - * be lost if we do not persist it to a disk queue. So, if configured to do so, we will - * now start DA mode just to drain our queue. -- rgerhards, 2008-01-16 - * TODO: move to persist function! - */ - if(pThis->iQueueSize > 0 && pThis->bSaveOnShutdown && pThis->bIsDA) { - dbgprintf("Queue 0x%lx: in-memory queue contains %d entries after worker shutdown - using DA to save to disk\n", - queueGetID(pThis), pThis->iQueueSize); - pThis->iLowWtrMrk = 0; /* disable low water mark algo */ - pThis->iHighWtrMrk = 1; /* make sure we drain */ - queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* start DA queue in enqueue-only mode */ - qWrkrWaitStartup(QUEUE_PTR_DA_WORKER(pThis)); /* wait until DA worker has actually started */ - pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; - queueShutdownWorkers(pThis); /* and tell it to shut down. The trick is it will run until q is drained */ - } -#endif - /* if still running DA, terminate disk queue */ if(pThis->qRunsDA != QRUNS_REGULAR) queueDestruct(&pThis->pqDA); |