summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/rsyslog_conf.html1
-rw-r--r--queue.c49
-rw-r--r--queue.h4
-rw-r--r--syslogd.c8
4 files changed, 55 insertions, 7 deletions
diff --git a/doc/rsyslog_conf.html b/doc/rsyslog_conf.html
index 073d9697..a8eaf062 100644
--- a/doc/rsyslog_conf.html
+++ b/doc/rsyslog_conf.html
@@ -63,6 +63,7 @@ development and quite unstable...). So you have been warned ;)</p>
<li>$MainMsgQueueImmediateShutdown [on/<b>off</b>]</li>
<li><a href="rsconf1_mainmsgqueuesize.html">$MainMsgQueueSize</a></li>
<li>$MainMsgQueueMaxFileSize &lt;size_nbr&gt;, default 1m</li>
+ <li>$MainMsgQueuePersistUpdateCount &lt;number&gt;</li>
<li>$MainMsgQueueType [<b>FixedArray</b>/LinkedList/Direct/Disk]</li>
<li>$MainMsgQueueWorkerThreads &lt;number&gt;, num worker threads, default 1,
recommended 1</li>
diff --git a/queue.c b/queue.c
index d9a3f130..e3f10078 100644
--- a/queue.c
+++ b/queue.c
@@ -48,6 +48,9 @@
/* static data */
DEFobjStaticHelpers
+/* forward-definitions */
+rsRetVal queueChkPersist(queue_t *pThis);
+
/* methods */
/* first, we define type-specific handlers. The provide a generic functionality,
@@ -550,6 +553,7 @@ queueWorker(void *arg)
if(pThis->iQueueSize > 0) {
/* dequeue element (still protected from mutex) */
iRet = queueDel(pThis, &pUsr);
+ queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
pthread_mutex_unlock(pThis->mut);
pthread_cond_signal (pThis->notFull);
/* do actual processing (the lengthy part, runs in parallel)
@@ -743,7 +747,6 @@ static rsRetVal queuePersist(queue_t *pThis)
strm_t *psQIF = NULL;; /* Queue Info File */
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
- int i;
assert(pThis != NULL);
if(pThis->qType != QUEUETYPE_DISK)
@@ -758,6 +761,8 @@ static rsRetVal queuePersist(queue_t *pThis)
unlink((char*)pszQIFNam);
pThis->bNeedDelQIF = 0;
}
+ /* indicate spool file needs to be deleted */
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
FINALIZE; /* nothing left to do, so be happy */
}
@@ -775,8 +780,6 @@ static rsRetVal queuePersist(queue_t *pThis)
* we know when somebody has changed the queue type... -- rgerhards, 2008-01-11
*/
CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis));
- i = pThis->qType;
- objSerializeSCALAR_VAR(psQIF, qType, INT, i);
objSerializeSCALAR(psQIF, iQueueSize, INT);
CHKiRet(objEndSerialize(psQIF));
@@ -786,9 +789,14 @@ static rsRetVal queuePersist(queue_t *pThis)
/* persist queue object itself */
- /* tell the input file object that it must not delete the file on close */
+ /* tell the input file object that it must not delete the file on close if the queue is non-empty */
CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
+ /* we have persisted the queue object. So whenever it comes to an empty queue,
+ * we need to delete the QIF. Thus, we indicte that need.
+ */
+ pThis->bNeedDelQIF = 1;
+
finalize_it:
if(psQIF != NULL)
strmDestruct(psQIF);
@@ -797,6 +805,28 @@ finalize_it:
}
+/* check if we need to persist the current queue info. If an
+ * error occurs, thus should be ignored by caller (but we still
+ * abide to our regular call interface)...
+ * rgerhards, 2008-01-13
+ */
+rsRetVal queueChkPersist(queue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+dbgprintf("chkPersist: PersUpdCnt %d, UpdsSincePers %d\n", pThis->iPersistUpdCnt, pThis->iUpdsSincePersist);
+ if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
+dbgprintf("persistintg queue info!\n");
+ queuePersist(pThis);
+ pThis->iUpdsSincePersist = 0;
+ }
+
+ return iRet;
+}
+
+
/* destructor for the queue object */
rsRetVal queueDestruct(queue_t *pThis)
{
@@ -922,6 +952,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
}
}
CHKiRet(queueAdd(pThis, pUsr));
+ queueChkPersist(pThis);
finalize_it:
/* now activate the worker thread */
@@ -938,6 +969,16 @@ finalize_it:
/* some simple object access methods */
DEFpropSetMeth(queue, bImmediateShutdown, int);
+DEFpropSetMeth(queue, iPersistUpdCnt, int);
+#if 0
+rsRetVal queueSetiPersistUpdCnt(queue_t *pThis, int pVal)
+{
+ dbgprintf("queueSetiPersistUpdCnt(), val %d\n", pVal);
+ pThis->iPersistUpdCnt = pVal;
+dbgprintf("queSetiPersist..(): PersUpdCnt %d, UpdsSincePers %d\n", pThis->iPersistUpdCnt, pThis->iUpdsSincePersist);
+ return RS_RET_OK;
+}
+#endif
/* This function can be used as a generic way to set properties. Only the subset
diff --git a/queue.h b/queue.h
index 6593ce84..e5b64ff8 100644
--- a/queue.h
+++ b/queue.h
@@ -81,7 +81,8 @@ typedef struct queue_s {
int iNumWorkerThreads;/* number of worker threads to use */
qWrkThrd_t *pWrkThrds;/* array with control structure for the worker thread(s) associated with this queue */
int bImmediateShutdown;/* on shutdown, drain the queue --> 0 / do NOT drain the queue --> 1 */
- //int bNeedPersist; /* does the queue need to be persisted on disk (updated since last persist?) */
+ int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
+ int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
int bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */
rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */
/* type-specific handlers (set during construction) */
@@ -132,6 +133,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*));
PROTOTYPEObjClassInit(queue);
PROTOTYPEpropSetMeth(queue, bImmediateShutdown, int);
+PROTOTYPEpropSetMeth(queue, iPersistUpdCnt, int);
#define queueGetID(pThis) ((unsigned long) pThis)
#endif /* #ifndef QUEUE_H_INCLUDED */
diff --git a/syslogd.c b/syslogd.c
index 4950a8f9..c3135237 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -421,6 +421,7 @@ static queueType_t MainMsgQueType = QUEUETYPE_FIXED_ARRAY; /* type of the main m
static uchar *pszMainMsgQFName = NULL; /* prefix for the main message queue file */
static size_t iMainMsgQueMaxFileSize = 1024*1024;
static int bMainMsgQImmediateShutdown = 0; /* shut down the queue immediately? */
+static int iMainMsgQPersistUpdCnt = 0; /* persist queue info every n updates */
/* This structure represents the files that will have log
@@ -525,6 +526,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iMainMsgQueMaxFileSize = 1024 * 1024;
iMainMsgQueueNumWorkers = 1;
bMainMsgQImmediateShutdown = 0;
+ iMainMsgQPersistUpdCnt = 0;
MainMsgQueType = QUEUETYPE_FIXED_ARRAY;
return RS_RET_OK;
@@ -3138,8 +3140,8 @@ static void dbgPrintInitInfo(void)
cCCEscapeChar);
dbgprintf("Main queue size %d messages.\n", iMainMsgQueueSize);
- dbgprintf("Main queue worker threads: %d, ImmediateShutdown: %d\n",
- iMainMsgQueueNumWorkers, bMainMsgQImmediateShutdown);
+ dbgprintf("Main queue worker threads: %d, ImmediateShutdown: %d, Perists every %d updates.\n",
+ iMainMsgQueueNumWorkers, bMainMsgQImmediateShutdown, iMainMsgQPersistUpdCnt);
dbgprintf("Work Directory: '%s'.\n", pszWorkDir);
}
@@ -3398,6 +3400,7 @@ init(void)
setQPROP(queueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize);
setQPROPstr(queueSetFilePrefix, "$MainMsgQueueFileName",
(pszMainMsgQFName == NULL ? (uchar*) "mainq" : pszMainMsgQFName));
+ setQPROP(queueSetiPersistUpdCnt, "$MainMsgQueuePersistUpdateCount", iMainMsgQPersistUpdCnt);
# undef setQPROP
# undef setQPROPstr
@@ -4559,6 +4562,7 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszMainMsgQFName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesize", 0, eCmdHdlrInt, NULL, &iMainMsgQueueSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueimmediateshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQImmediateShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuepersistupdatecount", 0, eCmdHdlrInt, NULL, &iMainMsgQPersistUpdCnt, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetype", 0, eCmdHdlrGetWord, setMainMsgQueType, NULL, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iMainMsgQueueNumWorkers, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL));