diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-30 19:07:23 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-30 19:07:23 +0000 |
commit | 0e3b40fd8a6106fbfa83e7cab1a5af515f698111 (patch) | |
tree | ede9dc66448f03d4ed84da7243229ff3f8eaceec | |
parent | 05538a2bad4f9a2c1be7a50099e30ab22249a2ff (diff) | |
download | rsyslog-0e3b40fd8a6106fbfa83e7cab1a5af515f698111.tar.gz rsyslog-0e3b40fd8a6106fbfa83e7cab1a5af515f698111.tar.xz rsyslog-0e3b40fd8a6106fbfa83e7cab1a5af515f698111.zip |
- implemented limiting disk space allocated to queues
- addded $MainMsgQueueMaxDiskSpace config directive
- addded $ActionQueueMaxDiskSpace config directive
-rw-r--r-- | action.c | 7 | ||||
-rw-r--r-- | queue.c | 75 | ||||
-rw-r--r-- | queue.h | 4 | ||||
-rw-r--r-- | stream.c | 19 | ||||
-rw-r--r-- | stream.h | 1 | ||||
-rw-r--r-- | syslogd.c | 7 |
6 files changed, 108 insertions, 5 deletions
@@ -67,6 +67,7 @@ static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdow static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */ +static size_t iActionQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */ /* the counter below counts actions created. It is used to obtain unique IDs for the action. They * should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice @@ -105,6 +106,7 @@ actionResetQueueParams(void) iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ iActionQueueDeqSlowdown = 0; + iActionQueMaxDiskSpace = 0; if(pszActionQFName != NULL) free(pszActionQFName); @@ -200,6 +202,7 @@ actionConstructFinalize(action_t *pThis) } queueSetpUsr(pThis->pQueue, pThis); + setQPROP(queueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace); setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); @@ -218,6 +221,9 @@ actionConstructFinalize(action_t *pThis) # undef setQPROP # undef setQPROPstr + dbgoprint((obj_t*) pThis->pQueue, "save on shutdown %d, max disk space allowed %ld\n", + bActionQSaveOnShutdown, iActionQueMaxDiskSpace); + CHKiRet(queueStart(pThis->pQueue)); dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue); @@ -635,6 +641,7 @@ actionAddCfSysLineHdrl(void) CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL)); @@ -172,6 +172,14 @@ queueTurnOffDAMode(queue_t *pThis) } } + /* TODO: we have a *really biiiiig* memory leak here: if the queue could not be persisted, all of + * its data elements are still in memory. That doesn't really matter if we are terminated, but on + * HUP this memory leaks. We MUST add a loop of destructor calls here. However, this takes time + * (possibly a lot), so it is probably best to have a config variable for that. + * Something for 3.11.1! + * rgerhards, 2008-01-30 + */ + RETiRet; } @@ -235,6 +243,7 @@ queueStartDA(queue_t *pThis) pThis->pqDA->pqParent = pThis; CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr)); + CHKiRet(queueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax)); CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); @@ -700,9 +709,7 @@ queueTryLoadPersistedInfo(queue_t *pThis) iUngottenObjs = pThis->iUngottenObjs; pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */ -RUNLOG_VAR("%d", iUngottenObjs); while(iUngottenObjs > 0) { -RUNLOG_VAR("%d", iUngottenObjs); /* fill the queue from disk */ CHKiRet(objDeserialize((void*) &pUsr, OBJmsg, psQIF, NULL, NULL)); queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); @@ -811,11 +818,28 @@ static rsRetVal qDestructDisk(queue_t *pThis) static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) { DEFiRet; + size_t offsIn; + size_t offsOut; ASSERT(pThis != NULL); + CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsIn)); CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite)); CHKiRet(strmFlush(pThis->tVars.disk.pWrite)); + CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsOut)); + + if(offsIn < offsOut) { + offsIn = offsOut - offsIn; + } else { + /* we had a file switch, so the second offset is the actual number of bytes + * written. So... + */ + offsIn = offsOut; + } + + pThis->tVars.disk.sizeOnDisk += offsIn; + + dbgoprint((obj_t*) pThis, "write wrote %ld octets to disk, queue disk size now %ld octets\n", offsIn, pThis->tVars.disk.sizeOnDisk); finalize_it: RETiRet; @@ -823,7 +847,32 @@ finalize_it: static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) { - return objDeserialize(ppUsr, OBJmsg, pThis->tVars.disk.pRead, NULL, NULL); + DEFiRet; + + size_t offsIn; + size_t offsOut; + + CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsIn)); + CHKiRet(objDeserialize(ppUsr, OBJmsg, pThis->tVars.disk.pRead, NULL, NULL)); + CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsOut)); + + /* This time it is a bit tricky: we free disk space only upon file deletion. So we need + * to keep track of what we have read until we get an out-offset that is lower than the + * in-offset (which indicates file change). Then, we can subtract the whole thing from + * the on-disk size. -- rgerhards, 2008-01-30 + */ + if(offsIn < offsOut) { + pThis->tVars.disk.bytesRead += offsOut - offsIn; + } else { + pThis->tVars.disk.sizeOnDisk -= pThis->tVars.disk.bytesRead; + pThis->tVars.disk.bytesRead = offsOut; + dbgoprint((obj_t*) pThis, "a file has been deleted, now %ld octets disk space used\n", pThis->tVars.disk.sizeOnDisk); + /* awake possibly waiting enq process */ + pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */ + } + +finalize_it: + RETiRet; } /* -------------------- direct (no queueing) -------------------- */ @@ -1442,7 +1491,13 @@ queueChkStopWrkrDA(queue_t *pThis) bStopWrkr = 1; } else { if(pThis->bRunsDA) { - if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { + ASSERT(pThis->pqDA != NULL); + if( pThis->pqDA->bEnqOnly + && pThis->pqDA->sizeOnDiskMax > 0 + && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { + /* this queue can never grow, so we can give up... */ + bStopWrkr = 1; + } else if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { bStopWrkr = 1; } else { bStopWrkr = 0; @@ -1691,6 +1746,8 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis)); objSerializeSCALAR(psQIF, iQueueSize, INT); objSerializeSCALAR(psQIF, iUngottenObjs, INT); + objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, LONG); + objSerializeSCALAR(psQIF, tVars.disk.bytesRead, LONG); CHKiRet(objEndSerialize(psQIF)); /* now we must persist all objects on the ungotten queue - they can not go to @@ -1898,7 +1955,10 @@ RUNLOG_VAR("%d", pThis->bRunsDA); /* wait for the queue to be ready... */ - while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { + //while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { + while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) + || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0 + && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); timeoutComp(&t, pThis->toEnq); if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { @@ -1996,6 +2056,7 @@ DEFpropSetMeth(queue, iMinMsgsPerWrkr, int); DEFpropSetMeth(queue, bSaveOnShutdown, int); DEFpropSetMeth(queue, pUsr, void*); DEFpropSetMeth(queue, iDeqSlowdown, int); +DEFpropSetMeth(queue, sizeOnDiskMax, long); /* This function can be used as a generic way to set properties. Only the subset @@ -2015,6 +2076,10 @@ static rsRetVal queueSetProperty(queue_t *pThis, property_t *pProp) pThis->iQueueSize = pProp->val.vInt; } else if(isProp("iUngottenObjs")) { pThis->iUngottenObjs = pProp->val.vInt; + } else if(isProp("tVars.disk.sizeOnDisk")) { + pThis->tVars.disk.sizeOnDisk = pProp->val.vLong; + } else if(isProp("tVars.disk.bytesRead")) { + pThis->tVars.disk.bytesRead = pProp->val.vLong; } else if(isProp("qType")) { if(pThis->qType != pProp->val.vLong) ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH); @@ -114,6 +114,7 @@ typedef struct queue_s { size_t lenFilePrefix; int iNumberFiles; /* how many files make up the queue? */ size_t iMaxFileSize; /* max size for a single queue file */ + size_t sizeOnDiskMax; /* maximum size on disk allowed */ int bIsDA; /* is this queue disk assisted? */ int bRunsDA; /* is this queue actually *running* disk assisted? */ struct queue_s *pqDA; /* queue for disk-assisted modes */ @@ -136,6 +137,8 @@ typedef struct queue_s { qLinkedList_t *pLast; } linklist; struct { + size_t sizeOnDisk; /* current amount of disk space used */ + size_t bytesRead; /* number of bytes read from current (undeleted!) file */ strm_t *pWrite; /* current file to be written */ strm_t *pRead; /* current file to be read */ } disk; @@ -178,6 +181,7 @@ PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int); PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int); PROTOTYPEpropSetMeth(queue, pUsr, void*); PROTOTYPEpropSetMeth(queue, iDeqSlowdown, int); +PROTOTYPEpropSetMeth(queue, sizeOnDiskMax, long); #define queueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ @@ -754,6 +754,25 @@ finalize_it: } #undef isProp + +/* return the current offset inside the stream. Note that on two consequtive calls, the offset + * reported on the second call may actually be lower than on the first call. This is due to + * file circulation. A caller must deal with that. -- rgerhards, 2008-01-30 + */ +rsRetVal +strmGetCurrOffset(strm_t *pThis, size_t *pOffs) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + ASSERT(pOffs != NULL); + + *pOffs = pThis->iCurrOffs; + + RETiRet; +} + + /* Initialize the stream class. Must be called as the very first method * before anything else is called inside this class. * rgerhards, 2008-01-09 @@ -107,6 +107,7 @@ rsRetVal strmRecordBegin(strm_t *pThis); rsRetVal strmRecordEnd(strm_t *pThis); rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm); rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal); +rsRetVal strmGetCurrOffset(strm_t *pThis, size_t *pOffs); PROTOTYPEObjClassInit(strm); PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int); PROTOTYPEpropSetMeth(strm, iMaxFileSize, int); @@ -416,6 +416,7 @@ static int iMainMsgQtoWrkShutdown = 60000; /* timeout for worker thread shutdo static int iMainMsgQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ static int iMainMsgQDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */ static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ +static size_t iMainMsgQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */ /* This structure represents the files that will have log @@ -532,6 +533,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a iMainMsgQDeqSlowdown = 0; bMainMsgQSaveOnShutdown = 1; MainMsgQueType = QUEUETYPE_FIXED_ARRAY; + iMainMsgQueMaxDiskSpace = 0; glbliActionResumeRetryCount = 0; return RS_RET_OK; @@ -2940,12 +2942,15 @@ static void dbgPrintInitInfo(void) iMainMsgQtoQShutdown, iMainMsgQtoActShutdown, iMainMsgQtoEnq); dbgprintf("Main queue watermarks: high: %d, low: %d, discard: %d, discard-severity: %d\n", iMainMsgQHighWtrMark, iMainMsgQLowWtrMark, iMainMsgQDiscardMark, iMainMsgQDiscardSeverity); + dbgprintf("Main queue save on shutdown %d, max disk space allowed %ld\n", + bMainMsgQSaveOnShutdown, iMainMsgQueMaxDiskSpace); /* TODO: add iActionRetryCount = 0; iActionRetryInterval = 30000; static int iMainMsgQtoWrkShutdown = 60000; static int iMainMsgQtoWrkMinMsgs = 100; static int iMainMsgQbSaveOnShutdown = 1; + iMainMsgQueMaxDiskSpace = 0; setQPROP(queueSettoWrkShutdown, "$MainMsgQueueTimeoutWorkerThreadShutdown", 5000); setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100); setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1); @@ -3217,6 +3222,7 @@ init(void) } setQPROP(queueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize); + setQPROP(queueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace); setQPROPstr(queueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName); setQPROP(queueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt); setQPROP(queueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown ); @@ -4374,6 +4380,7 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iMainMsgQDeqSlowdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxDiskSpace, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL)); |