diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-15 11:04:46 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-15 11:04:46 +0000 |
commit | 8f5c0764aaafc9eab72d20761ecba6a27d321f89 (patch) | |
tree | 5ce4356474a84b37598813398350b7b588ed6894 | |
parent | 75b645f16b930f142b777b00b529fb726ef10243 (diff) | |
download | rsyslog-8f5c0764aaafc9eab72d20761ecba6a27d321f89.tar.gz rsyslog-8f5c0764aaafc9eab72d20761ecba6a27d321f89.tar.xz rsyslog-8f5c0764aaafc9eab72d20761ecba6a27d321f89.zip |
disk assisted queue works quite well, except for startup from disk queue
-rw-r--r-- | queue.c | 232 | ||||
-rw-r--r-- | queue.h | 5 | ||||
-rw-r--r-- | stream.c | 14 |
3 files changed, 215 insertions, 36 deletions
@@ -1,3 +1,4 @@ +// TODO: do an if(debug) in dbgrintf - performanc ein release build! // TODO: remove bIsDA? // TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in // call consumer state. Facilitates retaining messages in queue until action could @@ -71,6 +72,28 @@ queueTellWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) } +/* join a specific worker thread + */ +static inline rsRetVal +queueJoinWrkThrd(queue_t *pThis, int iIdx) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); + assert(pThis->pWrkThrds[iIdx].tCurrCmd != eWRKTHRDCMD_NEVER_RAN); + + dbgprintf("Queue 0x%lx: thread %d state %d, waiting for exit\n", queueGetID(pThis), iIdx, + pThis->pWrkThrds[iIdx].tCurrCmd); + pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL); + pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRDCMD_NEVER_RAN; /* back to virgin... */ + dbgprintf("Queue 0x%lx: thread %d state %d, exited\n", queueGetID(pThis), iIdx, + pThis->pWrkThrds[iIdx].tCurrCmd); + + return iRet; +} + + /* Starts a worker thread (on a specific index [i]!) */ static inline rsRetVal @@ -113,6 +136,23 @@ queueTellWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd) return iRet; } +/* start all regular worker threads + * rgerhards, 2008-01-15 + */ +static inline rsRetVal +queueStrtAllWrkThrds(queue_t *pThis) +{ + DEFiRet; + int i; + + /* fire up the worker threads */ + for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) { + queueStrtWrkThrd(pThis, i); + } + + return iRet; +} + /* compute an absolute time timeout suitable for calls to pthread_cond_timedwait() * rgerhards, 2008-01-14 @@ -137,6 +177,80 @@ queueTimeoutComp(struct timespec *pt, int iTimeout) /* --------------- code for disk-assisted (DA) queue modes -------------------- */ +/* Destruct DA queue. This is the last part of DA-to-normal-mode + * transistion. This is called asynchronously and some time quite a + * while after the actual transistion. The key point is that we need to + * do it at some later time, because we need to destruct the DA queue. That, + * however, can not be done in a thread that has been signalled + * This is to be called when we revert back to our own queue. + * rgerhards, 2008-01-15 + */ +static inline rsRetVal +queueTurnOffDAMode(queue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + assert(pThis->bRunsDA == 1); + + /* pull any data that we still need from the (child) disk queue... */ + pThis->pConsumer = pThis->pqDA->pConsumer; /* restore regular consumer */ + + queueStrtAllWrkThrds(pThis); /* restore our regular worker threads */ + pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ + + /* note: a disk queue alsways has a single worker and it alwas has the ID 1 */ + queueTellWrkThrd(pThis->pqDA, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* tell the DA worker to terminate... */ + pthread_mutex_unlock(&pThis->mutDA); /* ... permit it to run ... */ + queueJoinWrkThrd(pThis->pqDA, 1); /* ... and wait for the shutdown to happen */ + queueDestruct(pThis->pqDA); /* and now we are ready to destruct the DA queue */ + pThis->pqDA = NULL; + queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */ + + dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n", + queueGetID(pThis), iRet); + + return iRet; +} + +/* check if we had any worker thread changes and, if so, act + * on them. At a minimum, terminated threads are harvested (joined). + */ +static rsRetVal +queueChkWrkThrdChanges(queue_t *pThis) +{ + DEFiRet; + int i; + + ISOBJ_TYPE_assert(pThis, queue); + + if(pThis->bThrdStateChanged == 0) + FINALIZE; + + /* go through all threads (including DA thread) */ + for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { + switch(pThis->pWrkThrds[i].tCurrCmd) { + case eWRKTHRDCMD_TERMINATED: + queueJoinWrkThrd(pThis, i); + break; + case eWRKTHRDCMD_TURN_OFF_DA_MODE: + queueTurnOffDAMode(pThis); + break; + /* these cases just to satisfy the compiler, we do not act an them: */ + case eWRKTHRDCMD_NEVER_RAN: + case eWRKTHRDCMD_RUN: + case eWRKTHRDCMD_SHUTDOWN: + case eWRKTHRDCMD_SHUTDOWN_IMMEDIATE: + /* DO NOTHING */ + break; + } + } + +finalize_it: + return iRet; +} + + /* check if we run in disk-assisted mode and record that * setting for easy (and quick!) access in the future. This * function must only be called from constructors and only @@ -161,7 +275,6 @@ queueChkIsDA(queue_t *pThis) } - /* This is a special consumer to feed the disk-queue in disk-assited mode. * When active, our own queue more or less acts as a memory buffer to the disk. * So this consumer just needs to drain the memory queue and submit entries @@ -175,6 +288,7 @@ static inline rsRetVal queueDAConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr) { DEFiRet; + int iCancelStateSave; ISOBJ_TYPE_assert(pThis, queue); ISOBJ_assert(pUsr); @@ -186,13 +300,27 @@ dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, if(pThis->iQueueSize == pThis->iLowWtrMrk) { dbgprintf("Queue 0x%lx: %d entries - passed low water mark in DA mode, sleeping\n", queueGetID(pThis), pThis->iQueueSize); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); +dbgprintf("pre mutex lock (think about CLEANUP!)\n"); pthread_mutex_lock(&pThis->mutDA); dbgprintf("mutex locked (think about CLEANUP!)\n"); pthread_cond_wait(&pThis->condDA, &pThis->mutDA); dbgprintf("condition returned\n"); - pthread_mutex_unlock(&pThis->mutDA); + /* we are back. either we have passed the high water mark or the child disk queue + * is empty. We check for the later. If that is the case, we switch back to + * non-DA mode + */ + if(pThis->pqDA->iQueueSize == 0) { + dbgprintf("Queue 0x%lx: %d entries - disk assisted child queue signaled it is empty\n", + queueGetID(pThis), pThis->iQueueSize); + CHKiRet(queueTurnOffDAMode(pThis)); /* this also unlocks the mutex! */ + } else { + pthread_mutex_unlock(&pThis->mutDA); + } dbgprintf("mutex unlocked (think about CLEANUP!)\n"); + pthread_setcancelstate(iCancelStateSave, NULL); } +dbgprintf("DAConsumer returns\n"); finalize_it: return iRet; @@ -206,28 +334,40 @@ static rsRetVal queueChkStrtDA(queue_t *pThis) { DEFiRet; + int iCancelStateSave; ISOBJ_TYPE_assert(pThis, queue); + if(pThis->bRunsDA) { + if(pThis->iQueueSize < pThis->iHighWtrMrk) + pThis->bWasBelowHighWtr = 1; + else if(pThis->iQueueSize == pThis->iHighWtrMrk && pThis->bWasBelowHighWtr) { + /* then we need to signal that we are at the high water mark again.*/ + dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", + queueGetID(pThis), pThis->iQueueSize); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + pthread_mutex_lock(&pThis->mutDA); + pthread_cond_signal(&pThis->condDA); + pthread_mutex_unlock(&pThis->mutDA); + pthread_setcancelstate(iCancelStateSave, NULL); + queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */ + } + /* we need to re-check if we run disk-assisted, because that status may have changed + * in our high water mark processing. + */ + if(pThis->bRunsDA) + FINALIZE; + } + + /* we run into this part if we are NOT currently running DA. + * TODO: split this function, I think that would make the code easier + * to read. -- rgerhards, 2008-10-15 + */ /* if we do not hit the high water mark, we have nothing to do */ if(pThis->iQueueSize != pThis->iHighWtrMrk) ABORT_FINALIZE(RS_RET_OK); - if(pThis->bRunsDA) { - /* then we need to signal that we are at the high water mark again. The DA - * consumer shall check if it needs to restart. Note that we may pass the - * high water mark while we drain the queue. - * TODO: problem here? (condition signalled on drain...) - */ - dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", - queueGetID(pThis), pThis->iQueueSize); - pthread_mutex_lock(&pThis->mutDA); - pthread_cond_signal(&pThis->condDA); - pthread_mutex_unlock(&pThis->mutDA); - FINALIZE; - } - dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n", queueGetID(pThis), pThis->iQueueSize); @@ -263,6 +403,7 @@ queueChkStrtDA(queue_t *pThis) * so we now need to change our consumer to utilize it. */ pThis->bRunsDA = 1; /* and that's all we need to do - the worker handles the rest ;) */ + pThis->bWasBelowHighWtr = 0;/* init to be sure */ /* now we must start our DA worker thread and shutdown all others */ CHKiRet(queueStrtWrkThrd(pThis, 0)); @@ -723,6 +864,9 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) /* awake them... */ pthread_cond_broadcast(pThis->notEmpty); +dbgprintf("queueWrkThrdTrm broadcasted notEmpty\n"); + if(pThis->bRunsDA) /* if running disk-assisted, workers may wait on that condition, too */ + pthread_cond_broadcast(&pThis->condDA); /* get timeout */ queueTimeoutComp(&t, iTimeout); @@ -761,6 +905,8 @@ queueWrkThrdCancel(queue_t *pThis) /* awake the workers one more time, just to be sure */ pthread_cond_broadcast(pThis->notEmpty); + if(pThis->bRunsDA) /* if running disk-assisted, workers may wait on that condition, too */ + pthread_cond_broadcast(&pThis->condDA); /* first tell the workers our request */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) @@ -816,8 +962,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_NEVER_RAN) { - dbgprintf("Queue 0x%lx: joining worker thread %d\n", queueGetID(pThis), i); - pthread_join(pThis->pWrkThrds[i].thrdID, NULL); + queueJoinWrkThrd(pThis, i); } } @@ -872,6 +1017,7 @@ queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr) } } +dbgprintf("CallConsumer returns\n"); return iRet; } @@ -889,6 +1035,10 @@ queueWorker(void *arg) sigset_t sigSet; int iMyThrdIndx; /* index for this thread in queue thread table */ int iCancelStateSave; + int bInitialEmpty = 1; /* if running as a DA child, we do NOT need to signal the parent + * on the first occasion we are empty (because that happens on every + * startup. This var keeps track of the state. + */ assert(pThis != NULL); @@ -915,17 +1065,33 @@ queueWorker(void *arg) || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) { pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); - while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { + while(pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", queueGetID(pThis), iMyThrdIndx); if(pThis->bSignalOnEmpty) { - /* we need to signal our parent queue that we are empty */ + if(bInitialEmpty == 1) { + /* ignore */ + bInitialEmpty = 0; + } else { + /* we need to signal our parent queue that we are empty */ dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); - pthread_mutex_lock(pThis->mutSignalOnEmpty); - pthread_cond_signal(pThis->condSignalOnEmpty); - pthread_mutex_unlock(pThis->mutSignalOnEmpty); + pthread_mutex_lock(pThis->mutSignalOnEmpty); + pthread_cond_signal(pThis->condSignalOnEmpty); + pthread_mutex_unlock(pThis->mutSignalOnEmpty); +dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx); + /* we now need to re-check if we still shall continue to + * run. This is important because the parent may have changed our + * state. So we simply go back to the begin of the loop. + */ + continue; + } } + /* If we arrive here, we have the regular case, where we can safely assume that + * iQueueSize and tCmd have not changed since the while(). + */ +dbgprintf("Queue %p/w%d: pre condwait ->notEmpty\n", pThis, iMyThrdIndx); pthread_cond_wait(pThis->notEmpty, pThis->mut); +dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); } if(pThis->iQueueSize > 0) { /* dequeue element (still protected from mutex) */ @@ -969,8 +1135,9 @@ dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); * and would be very hard to debug. The yield() is a sure fix, its performance overhead * should be well accepted given the above facts. -- rgerhards, 2008-01-10 */ +dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n", + queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd); pthread_yield(); - if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN) && pThis->iQueueSize > 0) dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has " " %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); @@ -980,7 +1147,14 @@ dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); pThis->iCurNumWrkThrd--; - pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; + if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN || + pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN_IMMEDIATE) { + /* in shutdown case, we need to flag termination. All other commands + * have a meaning to the thread harvester, so we can not overwrite them + */ + pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; + } + pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd); @@ -1334,6 +1508,9 @@ queueEnqObj(queue_t *pThis, void *pUsr) pthread_mutex_lock(pThis->mut); } + /* process any pending thread requests */ + queueChkWrkThrdChanges(pThis); + /* first check if we can discard anything */ if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) { iRetLocal = objGetSeverity(pUsr, &iSeverity); @@ -1352,13 +1529,12 @@ queueEnqObj(queue_t *pThis, void *pUsr) /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) CHKiRet(queueChkStrtDA(pThis)); - + /* and finally (try to) enqueue what is left over */ while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis)); queueTimeoutComp(&t, pThis->toEnq); - if(pthread_cond_timedwait (pThis->notFull, - pThis->mut, &t) != 0) { + if(pthread_cond_timedwait (pThis->notFull, pThis->mut, &t) != 0) { dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", queueGetID(pThis)); objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); @@ -66,7 +66,8 @@ typedef enum { /* ALL active states MUST be numerically higher than eWRKTHRDCMD_TERMINATED and NONE must be lower! */ eWRKTHRDCMD_RUN = 2, eWRKTHRDCMD_SHUTDOWN = 3, - eWRKTHRDCMD_SHUTDOWN_IMMEDIATE = 4 + eWRKTHRDCMD_SHUTDOWN_IMMEDIATE = 4, + eWRKTHRDCMD_TURN_OFF_DA_MODE = 5 } qWrkCmd_t; /* commands for queue worker threads */ typedef struct qWrkThrd_s { @@ -86,6 +87,7 @@ typedef struct queue_s { int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ int iHighWtrMrk; /* high water mark for disk-assisted memory queues */ + int bWasBelowHighWtr;/* when running in DA mode: queue was below high wtr mrk at least once */ int iLowWtrMrk; /* low water mark for disk-assisted memory queues */ int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */ int iDiscardSeverity;/* messages of this severity above are discarded on too-full queue */ @@ -107,6 +109,7 @@ typedef struct queue_s { pthread_cond_t *condSignalOnEmpty;/* caller-provided condition to be signalled when queue is empty (DA mode!) */ pthread_mutex_t *mutSignalOnEmpty; /* and its associated mutex */ int bSignalOnEmpty; /* signal caller when queue is empty via xxxSignalOnEmpty cond/mut */ + int bThrdStateChanged; /* at least one thread state has changed if 1 */ /* end sync variables */ /* the following variables are always present, because they * are not only used for the "disk" queueing mode but also for @@ -121,7 +121,7 @@ static rsRetVal strmCloseFile(strm_t *pThis) assert(pThis != NULL); assert(pThis->fd != -1); - dbgprintf("Stream 0x%lx: closing file %d\n", (unsigned long) pThis, pThis->fd); + dbgprintf("Stream 0x%lx: file %d closing\n", (unsigned long) pThis, pThis->fd); if(pThis->tOperationsMode == STREAMMODE_WRITE) strmFlush(pThis); @@ -201,15 +201,15 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) /* first check if we need to (re)open the file (we may have switched to a new one!) */ CHKiRet(strmOpenFile(pThis)); iLenRead = read(pThis->fd, pThis->pIOBuf, pThis->sIOBufSize); - dbgprintf("Stream 0x%lx: read %ld bytes from file %d\n", (unsigned long) pThis, - iLenRead, pThis->fd); + dbgprintf("Stream 0x%lx: file %d read %ld bytes\n", (unsigned long) pThis, + pThis->fd, iLenRead); if(iLenRead == 0) { if(pThis->iMaxFiles == 0) ABORT_FINALIZE(RS_RET_EOF); else { /* we have multiple files and need to switch to the next one */ /* TODO: think about emulating EOF in this case (not yet needed) */ - dbgprintf("Stream 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->fd); + dbgprintf("Stream 0x%lx: file %d EOF\n", (unsigned long) pThis, pThis->fd); CHKiRet(strmNextFile(pThis)); } } else if(iLenRead < 0) @@ -388,7 +388,7 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(strmOpenFile(pThis)); iWritten = write(pThis->fd, pBuf, lenBuf); - dbgprintf("Stream 0x%lx: write wrote %d bytes to file %d, errno: %d\n", (unsigned long) pThis, + dbgprintf("Stream 0x%lx: file %d write wrote %d bytes, errno: %d\n", (unsigned long) pThis, iWritten, pThis->fd, errno); /* TODO: handle error case -- rgerhards, 2008-01-07 */ @@ -422,7 +422,7 @@ rsRetVal strmFlush(strm_t *pThis) DEFiRet; assert(pThis != NULL); - dbgprintf("Stream 0x%lx: flush file %d, buflen %ld\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr); + dbgprintf("Stream 0x%lx: file %d flush, buflen %ld\n", (unsigned long) pThis, pThis->fd, pThis->iBufPtr); if(pThis->tOperationsMode == STREAMMODE_WRITE && pThis->iBufPtr > 0) { iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); @@ -447,7 +447,7 @@ static rsRetVal strmSeek(strm_t *pThis, off_t offs) else strmFlush(pThis); int i; - dbgprintf("Stream 0x%lx: seek file %d, pos %ld\n", (unsigned long) pThis, pThis->fd, offs); + dbgprintf("Stream 0x%lx: file %d seek, pos %ld\n", (unsigned long) pThis, pThis->fd, offs); i = lseek(pThis->fd, offs, SEEK_SET); // TODO: check error! dbgprintf("seek(%d, %ld): %d\n", pThis->fd, offs, i); pThis->iCurrOffs = offs; /* we are now at *this* offset */ |