summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-13 17:16:59 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-13 17:16:59 +0000
commitbbf0aecbbd3d4adf2f952011418701dce0236234 (patch)
treec84a9cfb128177c596b0d4173194bf2bf2b50e66 /queue.c
parent7791cf780f348c18de06bd06627bb986078db063 (diff)
downloadrsyslog-bbf0aecbbd3d4adf2f952011418701dce0236234.tar.gz
rsyslog-bbf0aecbbd3d4adf2f952011418701dce0236234.tar.xz
rsyslog-bbf0aecbbd3d4adf2f952011418701dce0236234.zip
added $MainMsgQueuePersistUpdateCount config file directive
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c49
1 files changed, 45 insertions, 4 deletions
diff --git a/queue.c b/queue.c
index d9a3f130..e3f10078 100644
--- a/queue.c
+++ b/queue.c
@@ -48,6 +48,9 @@
/* static data */
DEFobjStaticHelpers
+/* forward-definitions */
+rsRetVal queueChkPersist(queue_t *pThis);
+
/* methods */
/* first, we define type-specific handlers. The provide a generic functionality,
@@ -550,6 +553,7 @@ queueWorker(void *arg)
if(pThis->iQueueSize > 0) {
/* dequeue element (still protected from mutex) */
iRet = queueDel(pThis, &pUsr);
+ queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
pthread_mutex_unlock(pThis->mut);
pthread_cond_signal (pThis->notFull);
/* do actual processing (the lengthy part, runs in parallel)
@@ -743,7 +747,6 @@ static rsRetVal queuePersist(queue_t *pThis)
strm_t *psQIF = NULL;; /* Queue Info File */
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
- int i;
assert(pThis != NULL);
if(pThis->qType != QUEUETYPE_DISK)
@@ -758,6 +761,8 @@ static rsRetVal queuePersist(queue_t *pThis)
unlink((char*)pszQIFNam);
pThis->bNeedDelQIF = 0;
}
+ /* indicate spool file needs to be deleted */
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
FINALIZE; /* nothing left to do, so be happy */
}
@@ -775,8 +780,6 @@ static rsRetVal queuePersist(queue_t *pThis)
* we know when somebody has changed the queue type... -- rgerhards, 2008-01-11
*/
CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis));
- i = pThis->qType;
- objSerializeSCALAR_VAR(psQIF, qType, INT, i);
objSerializeSCALAR(psQIF, iQueueSize, INT);
CHKiRet(objEndSerialize(psQIF));
@@ -786,9 +789,14 @@ static rsRetVal queuePersist(queue_t *pThis)
/* persist queue object itself */
- /* tell the input file object that it must not delete the file on close */
+ /* tell the input file object that it must not delete the file on close if the queue is non-empty */
CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
+ /* we have persisted the queue object. So whenever it comes to an empty queue,
+ * we need to delete the QIF. Thus, we indicte that need.
+ */
+ pThis->bNeedDelQIF = 1;
+
finalize_it:
if(psQIF != NULL)
strmDestruct(psQIF);
@@ -797,6 +805,28 @@ finalize_it:
}
+/* check if we need to persist the current queue info. If an
+ * error occurs, thus should be ignored by caller (but we still
+ * abide to our regular call interface)...
+ * rgerhards, 2008-01-13
+ */
+rsRetVal queueChkPersist(queue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+dbgprintf("chkPersist: PersUpdCnt %d, UpdsSincePers %d\n", pThis->iPersistUpdCnt, pThis->iUpdsSincePersist);
+ if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
+dbgprintf("persistintg queue info!\n");
+ queuePersist(pThis);
+ pThis->iUpdsSincePersist = 0;
+ }
+
+ return iRet;
+}
+
+
/* destructor for the queue object */
rsRetVal queueDestruct(queue_t *pThis)
{
@@ -922,6 +952,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
}
}
CHKiRet(queueAdd(pThis, pUsr));
+ queueChkPersist(pThis);
finalize_it:
/* now activate the worker thread */
@@ -938,6 +969,16 @@ finalize_it:
/* some simple object access methods */
DEFpropSetMeth(queue, bImmediateShutdown, int);
+DEFpropSetMeth(queue, iPersistUpdCnt, int);
+#if 0
+rsRetVal queueSetiPersistUpdCnt(queue_t *pThis, int pVal)
+{
+ dbgprintf("queueSetiPersistUpdCnt(), val %d\n", pVal);
+ pThis->iPersistUpdCnt = pVal;
+dbgprintf("queSetiPersist..(): PersUpdCnt %d, UpdsSincePers %d\n", pThis->iPersistUpdCnt, pThis->iUpdsSincePersist);
+ return RS_RET_OK;
+}
+#endif
/* This function can be used as a generic way to set properties. Only the subset