diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-16 17:55:16 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-16 17:55:16 +0000 |
commit | 6b8b242250123d6c3105b48cde831ef749c88647 (patch) | |
tree | 6eb6e244f4ff73d7d3194fde119433bb6f6ac11d | |
parent | c701cbaaeb9202887a97a6b2de60852713d1785e (diff) | |
download | rsyslog-6b8b242250123d6c3105b48cde831ef749c88647.tar.gz rsyslog-6b8b242250123d6c3105b48cde831ef749c88647.tar.xz rsyslog-6b8b242250123d6c3105b48cde831ef749c88647.zip |
some more cleanup and flagged places where we need to implement
DA-input-only mode
-rw-r--r-- | queue.c | 48 |
1 files changed, 13 insertions, 35 deletions
@@ -1,3 +1,4 @@ + // DA-input only // TODO: "preforked" worker threads // TODO: do an if(debug) in dbgrintf - performanc ein release build! // TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in @@ -332,7 +333,7 @@ queueTurnOffDAMode(queue_t *pThis) pthread_mutex_destroy(&pThis->mutDA); pthread_cond_destroy(&pThis->condDA); - queueTellWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */ + queueTellActWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */ dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n", queueGetID(pThis), iRet); @@ -559,6 +560,7 @@ queueInitDA(queue_t *pThis) pThis->qRunsDA = QRUNS_DA_INIT; /* now we must start our DA worker thread - it does the rest of the initialization */ + // DA-input only mode! iRet = queueStrtWrkThrd(pThis, 0); return iRet; @@ -764,22 +766,6 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) /* -------------------- disk -------------------- */ -/* This method checks if there is any persistent information on the - * queue. - */ -#if 0 -static rsRetVal -queueTryLoadPersistedInfo(queue_t *pThis) -{ - DEFiRet; - strm_t *psQIF = NULL; - uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; - struct stat stat_buf; -} -#endif - - static rsRetVal queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t *pThis) { @@ -850,7 +836,6 @@ queueTryLoadPersistedInfo(queue_t *pThis) (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); /* check if the file exists */ -dbgprintf("stat '%s'\n", pszQIFNam); if(stat((char*) pszQIFNam, &stat_buf) == -1) { if(errno == ENOENT) { dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis)); @@ -892,7 +877,7 @@ finalize_it: strmDestruct(psQIF); if(iRet != RS_RET_OK) { - dbgprintf("Queue 0x%lx: error %d reading .qi file - can not start queue\n", + dbgprintf("Queue 0x%lx: error %d reading .qi file - can not read persisted info (if any)\n", queueGetID(pThis), iRet); } @@ -1010,7 +995,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr) assert(pThis != NULL); - /* TODO: calling the consumer should go into its own function! -- rgerhards, 2008-01-05*/ + /* calling the consumer is quite different here than it is from a worker thread */ iRetLocal = pThis->pConsumer(pUsr); if(iRetLocal != RS_RET_OK) dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n", @@ -1083,6 +1068,7 @@ queueWrkThrdReqTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int bIncludeDAWrk) { DEFiRet; + // DA-input only if(bIncludeDAWrk) { queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ @@ -1203,15 +1189,6 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) } } - /* as we may have cancelled a thread, clean up our internal structure. All are - * terminated now. For simplicity, we simply overwrite the states. - */ - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRD_STOPPED) { - pThis->pWrkThrds[i].tCurrCmd = eWRKTHRD_TERMINATED; - } - } - dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", queueGetID(pThis), pThis->iQueueSize); @@ -1319,6 +1296,10 @@ queueWorker(void *arg) sigfillset(&sigSet); pthread_sigmask(SIG_BLOCK, &sigSet, NULL); + /* do some one-time initialization */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + pthread_mutex_lock(pThis->mut); + /* first find myself in the queue's thread table */ for(iMyThrdIndx = 0 ; iMyThrdIndx <= pThis->iNumWorkerThreads ; ++iMyThrdIndx) if(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()) @@ -1329,16 +1310,12 @@ dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis, dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx); - /* do some one-time initialization */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - pthread_mutex_lock(pThis->mut); - pThis->iCurNumWrkThrd++; /* tell the world there is one more worker */ if(iMyThrdIndx == 0) { /* are we the DA worker? */ if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */ /* if we could not init the DA queue, we have nothing to do, so shut down. */ - queueTellWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE); + queueTellActWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE); } } @@ -1698,6 +1675,7 @@ rsRetVal queueDestruct(queue_t *pThis) */ if(pThis->qRunsDA != QRUNS_REGULAR) queueWrkThrdReqTrm(pThis->pqDA, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0); + // DA-input only /* then, terminate our own worker threads */ if(pThis->pWrkThrds != NULL) { @@ -1855,7 +1833,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) queueChkPersist(pThis); finalize_it: - /* now activate the worker thread */ + /* now awake sleeping worker threads */ if(pThis->pWrkThrds != NULL) { pthread_mutex_unlock(pThis->mut); i = pthread_cond_signal(pThis->notEmpty); |