summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c7
-rw-r--r--queue.c75
-rw-r--r--queue.h4
-rw-r--r--stream.c19
-rw-r--r--stream.h1
-rw-r--r--syslogd.c7
6 files changed, 108 insertions, 5 deletions
diff --git a/action.c b/action.c
index bdaae9c7..d7016c42 100644
--- a/action.c
+++ b/action.c
@@ -67,6 +67,7 @@ static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdow
static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
+static size_t iActionQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */
/* the counter below counts actions created. It is used to obtain unique IDs for the action. They
* should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice
@@ -105,6 +106,7 @@ actionResetQueueParams(void)
iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
iActionQueueDeqSlowdown = 0;
+ iActionQueMaxDiskSpace = 0;
if(pszActionQFName != NULL)
free(pszActionQFName);
@@ -200,6 +202,7 @@ actionConstructFinalize(action_t *pThis)
}
queueSetpUsr(pThis->pQueue, pThis);
+ setQPROP(queueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
@@ -218,6 +221,9 @@ actionConstructFinalize(action_t *pThis)
# undef setQPROP
# undef setQPROPstr
+ dbgoprint((obj_t*) pThis->pQueue, "save on shutdown %d, max disk space allowed %ld\n",
+ bActionQSaveOnShutdown, iActionQueMaxDiskSpace);
+
CHKiRet(queueStart(pThis->pQueue));
dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue);
@@ -635,6 +641,7 @@ actionAddCfSysLineHdrl(void)
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL));
diff --git a/queue.c b/queue.c
index d829b9bd..6546efea 100644
--- a/queue.c
+++ b/queue.c
@@ -172,6 +172,14 @@ queueTurnOffDAMode(queue_t *pThis)
}
}
+ /* TODO: we have a *really biiiiig* memory leak here: if the queue could not be persisted, all of
+ * its data elements are still in memory. That doesn't really matter if we are terminated, but on
+ * HUP this memory leaks. We MUST add a loop of destructor calls here. However, this takes time
+ * (possibly a lot), so it is probably best to have a config variable for that.
+ * Something for 3.11.1!
+ * rgerhards, 2008-01-30
+ */
+
RETiRet;
}
@@ -235,6 +243,7 @@ queueStartDA(queue_t *pThis)
pThis->pqDA->pqParent = pThis;
CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr));
+ CHKiRet(queueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
@@ -700,9 +709,7 @@ queueTryLoadPersistedInfo(queue_t *pThis)
iUngottenObjs = pThis->iUngottenObjs;
pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */
-RUNLOG_VAR("%d", iUngottenObjs);
while(iUngottenObjs > 0) {
-RUNLOG_VAR("%d", iUngottenObjs);
/* fill the queue from disk */
CHKiRet(objDeserialize((void*) &pUsr, OBJmsg, psQIF, NULL, NULL));
queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
@@ -811,11 +818,28 @@ static rsRetVal qDestructDisk(queue_t *pThis)
static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
{
DEFiRet;
+ size_t offsIn;
+ size_t offsOut;
ASSERT(pThis != NULL);
+ CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsIn));
CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite));
CHKiRet(strmFlush(pThis->tVars.disk.pWrite));
+ CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsOut));
+
+ if(offsIn < offsOut) {
+ offsIn = offsOut - offsIn;
+ } else {
+ /* we had a file switch, so the second offset is the actual number of bytes
+ * written. So...
+ */
+ offsIn = offsOut;
+ }
+
+ pThis->tVars.disk.sizeOnDisk += offsIn;
+
+ dbgoprint((obj_t*) pThis, "write wrote %ld octets to disk, queue disk size now %ld octets\n", offsIn, pThis->tVars.disk.sizeOnDisk);
finalize_it:
RETiRet;
@@ -823,7 +847,32 @@ finalize_it:
static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
{
- return objDeserialize(ppUsr, OBJmsg, pThis->tVars.disk.pRead, NULL, NULL);
+ DEFiRet;
+
+ size_t offsIn;
+ size_t offsOut;
+
+ CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsIn));
+ CHKiRet(objDeserialize(ppUsr, OBJmsg, pThis->tVars.disk.pRead, NULL, NULL));
+ CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsOut));
+
+ /* This time it is a bit tricky: we free disk space only upon file deletion. So we need
+ * to keep track of what we have read until we get an out-offset that is lower than the
+ * in-offset (which indicates file change). Then, we can subtract the whole thing from
+ * the on-disk size. -- rgerhards, 2008-01-30
+ */
+ if(offsIn < offsOut) {
+ pThis->tVars.disk.bytesRead += offsOut - offsIn;
+ } else {
+ pThis->tVars.disk.sizeOnDisk -= pThis->tVars.disk.bytesRead;
+ pThis->tVars.disk.bytesRead = offsOut;
+ dbgoprint((obj_t*) pThis, "a file has been deleted, now %ld octets disk space used\n", pThis->tVars.disk.sizeOnDisk);
+ /* awake possibly waiting enq process */
+ pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
+ }
+
+finalize_it:
+ RETiRet;
}
/* -------------------- direct (no queueing) -------------------- */
@@ -1442,7 +1491,13 @@ queueChkStopWrkrDA(queue_t *pThis)
bStopWrkr = 1;
} else {
if(pThis->bRunsDA) {
- if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
+ ASSERT(pThis->pqDA != NULL);
+ if( pThis->pqDA->bEnqOnly
+ && pThis->pqDA->sizeOnDiskMax > 0
+ && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
+ /* this queue can never grow, so we can give up... */
+ bStopWrkr = 1;
+ } else if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
bStopWrkr = 1;
} else {
bStopWrkr = 0;
@@ -1691,6 +1746,8 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
objSerializeSCALAR(psQIF, iUngottenObjs, INT);
+ objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, LONG);
+ objSerializeSCALAR(psQIF, tVars.disk.bytesRead, LONG);
CHKiRet(objEndSerialize(psQIF));
/* now we must persist all objects on the ungotten queue - they can not go to
@@ -1898,7 +1955,10 @@ RUNLOG_VAR("%d", pThis->bRunsDA);
/* wait for the queue to be ready... */
- while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
+ //while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
+ while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize)
+ || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
+ && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
timeoutComp(&t, pThis->toEnq);
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
@@ -1996,6 +2056,7 @@ DEFpropSetMeth(queue, iMinMsgsPerWrkr, int);
DEFpropSetMeth(queue, bSaveOnShutdown, int);
DEFpropSetMeth(queue, pUsr, void*);
DEFpropSetMeth(queue, iDeqSlowdown, int);
+DEFpropSetMeth(queue, sizeOnDiskMax, long);
/* This function can be used as a generic way to set properties. Only the subset
@@ -2015,6 +2076,10 @@ static rsRetVal queueSetProperty(queue_t *pThis, property_t *pProp)
pThis->iQueueSize = pProp->val.vInt;
} else if(isProp("iUngottenObjs")) {
pThis->iUngottenObjs = pProp->val.vInt;
+ } else if(isProp("tVars.disk.sizeOnDisk")) {
+ pThis->tVars.disk.sizeOnDisk = pProp->val.vLong;
+ } else if(isProp("tVars.disk.bytesRead")) {
+ pThis->tVars.disk.bytesRead = pProp->val.vLong;
} else if(isProp("qType")) {
if(pThis->qType != pProp->val.vLong)
ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH);
diff --git a/queue.h b/queue.h
index 090ff4c5..e75c26a1 100644
--- a/queue.h
+++ b/queue.h
@@ -114,6 +114,7 @@ typedef struct queue_s {
size_t lenFilePrefix;
int iNumberFiles; /* how many files make up the queue? */
size_t iMaxFileSize; /* max size for a single queue file */
+ size_t sizeOnDiskMax; /* maximum size on disk allowed */
int bIsDA; /* is this queue disk assisted? */
int bRunsDA; /* is this queue actually *running* disk assisted? */
struct queue_s *pqDA; /* queue for disk-assisted modes */
@@ -136,6 +137,8 @@ typedef struct queue_s {
qLinkedList_t *pLast;
} linklist;
struct {
+ size_t sizeOnDisk; /* current amount of disk space used */
+ size_t bytesRead; /* number of bytes read from current (undeleted!) file */
strm_t *pWrite; /* current file to be written */
strm_t *pRead; /* current file to be read */
} disk;
@@ -178,6 +181,7 @@ PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int);
PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int);
PROTOTYPEpropSetMeth(queue, pUsr, void*);
PROTOTYPEpropSetMeth(queue, iDeqSlowdown, int);
+PROTOTYPEpropSetMeth(queue, sizeOnDiskMax, long);
#define queueGetID(pThis) ((unsigned long) pThis)
#endif /* #ifndef QUEUE_H_INCLUDED */
diff --git a/stream.c b/stream.c
index 0f756d1c..10131bf3 100644
--- a/stream.c
+++ b/stream.c
@@ -754,6 +754,25 @@ finalize_it:
}
#undef isProp
+
+/* return the current offset inside the stream. Note that on two consequtive calls, the offset
+ * reported on the second call may actually be lower than on the first call. This is due to
+ * file circulation. A caller must deal with that. -- rgerhards, 2008-01-30
+ */
+rsRetVal
+strmGetCurrOffset(strm_t *pThis, size_t *pOffs)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+ ASSERT(pOffs != NULL);
+
+ *pOffs = pThis->iCurrOffs;
+
+ RETiRet;
+}
+
+
/* Initialize the stream class. Must be called as the very first method
* before anything else is called inside this class.
* rgerhards, 2008-01-09
diff --git a/stream.h b/stream.h
index 0ac32b12..b5a8e518 100644
--- a/stream.h
+++ b/stream.h
@@ -107,6 +107,7 @@ rsRetVal strmRecordBegin(strm_t *pThis);
rsRetVal strmRecordEnd(strm_t *pThis);
rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm);
rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal);
+rsRetVal strmGetCurrOffset(strm_t *pThis, size_t *pOffs);
PROTOTYPEObjClassInit(strm);
PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int);
PROTOTYPEpropSetMeth(strm, iMaxFileSize, int);
diff --git a/syslogd.c b/syslogd.c
index 4aabba5d..a8b032cb 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -416,6 +416,7 @@ static int iMainMsgQtoWrkShutdown = 60000; /* timeout for worker thread shutdo
static int iMainMsgQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
static int iMainMsgQDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
+static size_t iMainMsgQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */
/* This structure represents the files that will have log
@@ -532,6 +533,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iMainMsgQDeqSlowdown = 0;
bMainMsgQSaveOnShutdown = 1;
MainMsgQueType = QUEUETYPE_FIXED_ARRAY;
+ iMainMsgQueMaxDiskSpace = 0;
glbliActionResumeRetryCount = 0;
return RS_RET_OK;
@@ -2940,12 +2942,15 @@ static void dbgPrintInitInfo(void)
iMainMsgQtoQShutdown, iMainMsgQtoActShutdown, iMainMsgQtoEnq);
dbgprintf("Main queue watermarks: high: %d, low: %d, discard: %d, discard-severity: %d\n",
iMainMsgQHighWtrMark, iMainMsgQLowWtrMark, iMainMsgQDiscardMark, iMainMsgQDiscardSeverity);
+ dbgprintf("Main queue save on shutdown %d, max disk space allowed %ld\n",
+ bMainMsgQSaveOnShutdown, iMainMsgQueMaxDiskSpace);
/* TODO: add
iActionRetryCount = 0;
iActionRetryInterval = 30000;
static int iMainMsgQtoWrkShutdown = 60000;
static int iMainMsgQtoWrkMinMsgs = 100;
static int iMainMsgQbSaveOnShutdown = 1;
+ iMainMsgQueMaxDiskSpace = 0;
setQPROP(queueSettoWrkShutdown, "$MainMsgQueueTimeoutWorkerThreadShutdown", 5000);
setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100);
setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1);
@@ -3217,6 +3222,7 @@ init(void)
}
setQPROP(queueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize);
+ setQPROP(queueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace);
setQPROPstr(queueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName);
setQPROP(queueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt);
setQPROP(queueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown );
@@ -4374,6 +4380,7 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iMainMsgQDeqSlowdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxDiskSpace, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL));