summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-30 19:07:23 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-30 19:07:23 +0000
commit0e3b40fd8a6106fbfa83e7cab1a5af515f698111 (patch)
treeede9dc66448f03d4ed84da7243229ff3f8eaceec /queue.c
parent05538a2bad4f9a2c1be7a50099e30ab22249a2ff (diff)
downloadrsyslog-0e3b40fd8a6106fbfa83e7cab1a5af515f698111.tar.gz
rsyslog-0e3b40fd8a6106fbfa83e7cab1a5af515f698111.tar.xz
rsyslog-0e3b40fd8a6106fbfa83e7cab1a5af515f698111.zip
- implemented limiting disk space allocated to queues
- addded $MainMsgQueueMaxDiskSpace config directive - addded $ActionQueueMaxDiskSpace config directive
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c75
1 files changed, 70 insertions, 5 deletions
diff --git a/queue.c b/queue.c
index d829b9bd..6546efea 100644
--- a/queue.c
+++ b/queue.c
@@ -172,6 +172,14 @@ queueTurnOffDAMode(queue_t *pThis)
}
}
+ /* TODO: we have a *really biiiiig* memory leak here: if the queue could not be persisted, all of
+ * its data elements are still in memory. That doesn't really matter if we are terminated, but on
+ * HUP this memory leaks. We MUST add a loop of destructor calls here. However, this takes time
+ * (possibly a lot), so it is probably best to have a config variable for that.
+ * Something for 3.11.1!
+ * rgerhards, 2008-01-30
+ */
+
RETiRet;
}
@@ -235,6 +243,7 @@ queueStartDA(queue_t *pThis)
pThis->pqDA->pqParent = pThis;
CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr));
+ CHKiRet(queueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
@@ -700,9 +709,7 @@ queueTryLoadPersistedInfo(queue_t *pThis)
iUngottenObjs = pThis->iUngottenObjs;
pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */
-RUNLOG_VAR("%d", iUngottenObjs);
while(iUngottenObjs > 0) {
-RUNLOG_VAR("%d", iUngottenObjs);
/* fill the queue from disk */
CHKiRet(objDeserialize((void*) &pUsr, OBJmsg, psQIF, NULL, NULL));
queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
@@ -811,11 +818,28 @@ static rsRetVal qDestructDisk(queue_t *pThis)
static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
{
DEFiRet;
+ size_t offsIn;
+ size_t offsOut;
ASSERT(pThis != NULL);
+ CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsIn));
CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite));
CHKiRet(strmFlush(pThis->tVars.disk.pWrite));
+ CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsOut));
+
+ if(offsIn < offsOut) {
+ offsIn = offsOut - offsIn;
+ } else {
+ /* we had a file switch, so the second offset is the actual number of bytes
+ * written. So...
+ */
+ offsIn = offsOut;
+ }
+
+ pThis->tVars.disk.sizeOnDisk += offsIn;
+
+ dbgoprint((obj_t*) pThis, "write wrote %ld octets to disk, queue disk size now %ld octets\n", offsIn, pThis->tVars.disk.sizeOnDisk);
finalize_it:
RETiRet;
@@ -823,7 +847,32 @@ finalize_it:
static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
{
- return objDeserialize(ppUsr, OBJmsg, pThis->tVars.disk.pRead, NULL, NULL);
+ DEFiRet;
+
+ size_t offsIn;
+ size_t offsOut;
+
+ CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsIn));
+ CHKiRet(objDeserialize(ppUsr, OBJmsg, pThis->tVars.disk.pRead, NULL, NULL));
+ CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsOut));
+
+ /* This time it is a bit tricky: we free disk space only upon file deletion. So we need
+ * to keep track of what we have read until we get an out-offset that is lower than the
+ * in-offset (which indicates file change). Then, we can subtract the whole thing from
+ * the on-disk size. -- rgerhards, 2008-01-30
+ */
+ if(offsIn < offsOut) {
+ pThis->tVars.disk.bytesRead += offsOut - offsIn;
+ } else {
+ pThis->tVars.disk.sizeOnDisk -= pThis->tVars.disk.bytesRead;
+ pThis->tVars.disk.bytesRead = offsOut;
+ dbgoprint((obj_t*) pThis, "a file has been deleted, now %ld octets disk space used\n", pThis->tVars.disk.sizeOnDisk);
+ /* awake possibly waiting enq process */
+ pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
+ }
+
+finalize_it:
+ RETiRet;
}
/* -------------------- direct (no queueing) -------------------- */
@@ -1442,7 +1491,13 @@ queueChkStopWrkrDA(queue_t *pThis)
bStopWrkr = 1;
} else {
if(pThis->bRunsDA) {
- if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
+ ASSERT(pThis->pqDA != NULL);
+ if( pThis->pqDA->bEnqOnly
+ && pThis->pqDA->sizeOnDiskMax > 0
+ && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
+ /* this queue can never grow, so we can give up... */
+ bStopWrkr = 1;
+ } else if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
bStopWrkr = 1;
} else {
bStopWrkr = 0;
@@ -1691,6 +1746,8 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
objSerializeSCALAR(psQIF, iUngottenObjs, INT);
+ objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, LONG);
+ objSerializeSCALAR(psQIF, tVars.disk.bytesRead, LONG);
CHKiRet(objEndSerialize(psQIF));
/* now we must persist all objects on the ungotten queue - they can not go to
@@ -1898,7 +1955,10 @@ RUNLOG_VAR("%d", pThis->bRunsDA);
/* wait for the queue to be ready... */
- while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
+ //while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
+ while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize)
+ || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
+ && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
timeoutComp(&t, pThis->toEnq);
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
@@ -1996,6 +2056,7 @@ DEFpropSetMeth(queue, iMinMsgsPerWrkr, int);
DEFpropSetMeth(queue, bSaveOnShutdown, int);
DEFpropSetMeth(queue, pUsr, void*);
DEFpropSetMeth(queue, iDeqSlowdown, int);
+DEFpropSetMeth(queue, sizeOnDiskMax, long);
/* This function can be used as a generic way to set properties. Only the subset
@@ -2015,6 +2076,10 @@ static rsRetVal queueSetProperty(queue_t *pThis, property_t *pProp)
pThis->iQueueSize = pProp->val.vInt;
} else if(isProp("iUngottenObjs")) {
pThis->iUngottenObjs = pProp->val.vInt;
+ } else if(isProp("tVars.disk.sizeOnDisk")) {
+ pThis->tVars.disk.sizeOnDisk = pProp->val.vLong;
+ } else if(isProp("tVars.disk.bytesRead")) {
+ pThis->tVars.disk.bytesRead = pProp->val.vLong;
} else if(isProp("qType")) {
if(pThis->qType != pProp->val.vLong)
ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH);