diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-30 19:07:23 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-30 19:07:23 +0000 |
commit | 0e3b40fd8a6106fbfa83e7cab1a5af515f698111 (patch) | |
tree | ede9dc66448f03d4ed84da7243229ff3f8eaceec /queue.c | |
parent | 05538a2bad4f9a2c1be7a50099e30ab22249a2ff (diff) | |
download | rsyslog-0e3b40fd8a6106fbfa83e7cab1a5af515f698111.tar.gz rsyslog-0e3b40fd8a6106fbfa83e7cab1a5af515f698111.tar.xz rsyslog-0e3b40fd8a6106fbfa83e7cab1a5af515f698111.zip |
- implemented limiting disk space allocated to queues
- addded $MainMsgQueueMaxDiskSpace config directive
- addded $ActionQueueMaxDiskSpace config directive
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 75 |
1 files changed, 70 insertions, 5 deletions
@@ -172,6 +172,14 @@ queueTurnOffDAMode(queue_t *pThis) } } + /* TODO: we have a *really biiiiig* memory leak here: if the queue could not be persisted, all of + * its data elements are still in memory. That doesn't really matter if we are terminated, but on + * HUP this memory leaks. We MUST add a loop of destructor calls here. However, this takes time + * (possibly a lot), so it is probably best to have a config variable for that. + * Something for 3.11.1! + * rgerhards, 2008-01-30 + */ + RETiRet; } @@ -235,6 +243,7 @@ queueStartDA(queue_t *pThis) pThis->pqDA->pqParent = pThis; CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr)); + CHKiRet(queueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax)); CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); @@ -700,9 +709,7 @@ queueTryLoadPersistedInfo(queue_t *pThis) iUngottenObjs = pThis->iUngottenObjs; pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */ -RUNLOG_VAR("%d", iUngottenObjs); while(iUngottenObjs > 0) { -RUNLOG_VAR("%d", iUngottenObjs); /* fill the queue from disk */ CHKiRet(objDeserialize((void*) &pUsr, OBJmsg, psQIF, NULL, NULL)); queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); @@ -811,11 +818,28 @@ static rsRetVal qDestructDisk(queue_t *pThis) static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) { DEFiRet; + size_t offsIn; + size_t offsOut; ASSERT(pThis != NULL); + CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsIn)); CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite)); CHKiRet(strmFlush(pThis->tVars.disk.pWrite)); + CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsOut)); + + if(offsIn < offsOut) { + offsIn = offsOut - offsIn; + } else { + /* we had a file switch, so the second offset is the actual number of bytes + * written. So... + */ + offsIn = offsOut; + } + + pThis->tVars.disk.sizeOnDisk += offsIn; + + dbgoprint((obj_t*) pThis, "write wrote %ld octets to disk, queue disk size now %ld octets\n", offsIn, pThis->tVars.disk.sizeOnDisk); finalize_it: RETiRet; @@ -823,7 +847,32 @@ finalize_it: static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) { - return objDeserialize(ppUsr, OBJmsg, pThis->tVars.disk.pRead, NULL, NULL); + DEFiRet; + + size_t offsIn; + size_t offsOut; + + CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsIn)); + CHKiRet(objDeserialize(ppUsr, OBJmsg, pThis->tVars.disk.pRead, NULL, NULL)); + CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsOut)); + + /* This time it is a bit tricky: we free disk space only upon file deletion. So we need + * to keep track of what we have read until we get an out-offset that is lower than the + * in-offset (which indicates file change). Then, we can subtract the whole thing from + * the on-disk size. -- rgerhards, 2008-01-30 + */ + if(offsIn < offsOut) { + pThis->tVars.disk.bytesRead += offsOut - offsIn; + } else { + pThis->tVars.disk.sizeOnDisk -= pThis->tVars.disk.bytesRead; + pThis->tVars.disk.bytesRead = offsOut; + dbgoprint((obj_t*) pThis, "a file has been deleted, now %ld octets disk space used\n", pThis->tVars.disk.sizeOnDisk); + /* awake possibly waiting enq process */ + pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */ + } + +finalize_it: + RETiRet; } /* -------------------- direct (no queueing) -------------------- */ @@ -1442,7 +1491,13 @@ queueChkStopWrkrDA(queue_t *pThis) bStopWrkr = 1; } else { if(pThis->bRunsDA) { - if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { + ASSERT(pThis->pqDA != NULL); + if( pThis->pqDA->bEnqOnly + && pThis->pqDA->sizeOnDiskMax > 0 + && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { + /* this queue can never grow, so we can give up... */ + bStopWrkr = 1; + } else if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { bStopWrkr = 1; } else { bStopWrkr = 0; @@ -1691,6 +1746,8 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis)); objSerializeSCALAR(psQIF, iQueueSize, INT); objSerializeSCALAR(psQIF, iUngottenObjs, INT); + objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, LONG); + objSerializeSCALAR(psQIF, tVars.disk.bytesRead, LONG); CHKiRet(objEndSerialize(psQIF)); /* now we must persist all objects on the ungotten queue - they can not go to @@ -1898,7 +1955,10 @@ RUNLOG_VAR("%d", pThis->bRunsDA); /* wait for the queue to be ready... */ - while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { + //while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { + while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) + || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0 + && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); timeoutComp(&t, pThis->toEnq); if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { @@ -1996,6 +2056,7 @@ DEFpropSetMeth(queue, iMinMsgsPerWrkr, int); DEFpropSetMeth(queue, bSaveOnShutdown, int); DEFpropSetMeth(queue, pUsr, void*); DEFpropSetMeth(queue, iDeqSlowdown, int); +DEFpropSetMeth(queue, sizeOnDiskMax, long); /* This function can be used as a generic way to set properties. Only the subset @@ -2015,6 +2076,10 @@ static rsRetVal queueSetProperty(queue_t *pThis, property_t *pProp) pThis->iQueueSize = pProp->val.vInt; } else if(isProp("iUngottenObjs")) { pThis->iUngottenObjs = pProp->val.vInt; + } else if(isProp("tVars.disk.sizeOnDisk")) { + pThis->tVars.disk.sizeOnDisk = pProp->val.vLong; + } else if(isProp("tVars.disk.bytesRead")) { + pThis->tVars.disk.bytesRead = pProp->val.vLong; } else if(isProp("qType")) { if(pThis->qType != pProp->val.vLong) ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH); |