summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--debug.c2
-rw-r--r--doc/rsyslog_high_database_rate.html1
-rw-r--r--obj.c4
-rw-r--r--queue.c192
-rw-r--r--queue.h6
5 files changed, 185 insertions, 20 deletions
diff --git a/debug.c b/debug.c
index 82ed2442..06140b69 100644
--- a/debug.c
+++ b/debug.c
@@ -59,7 +59,7 @@ int Debug; /* debug flag - read-only after startup */
int debugging_on = 0; /* read-only, except on sig USR1 */
static int bLogFuncFlow = 0; /* shall the function entry and exit be logged to the debug log? */
static int bPrintFuncDBOnExit = 0; /* shall the function entry and exit be logged to the debug log? */
-static int bPrintMutexAction = 0; /* shall mutex calls be printed to the debug log? */
+static int bPrintMutexAction = 1; /* shall mutex calls be printed to the debug log? */
static int bPrintTime = 1; /* print a timestamp together with debug message */
static char *pszAltDbgFileName = NULL; /* if set, debug output is *also* sent to here */
static FILE *altdbg = NULL; /* and the handle for alternate debug output */
diff --git a/doc/rsyslog_high_database_rate.html b/doc/rsyslog_high_database_rate.html
index fe355c35..7b1f984f 100644
--- a/doc/rsyslog_high_database_rate.html
+++ b/doc/rsyslog_high_database_rate.html
@@ -74,6 +74,7 @@ following simple config file, you log anything you receive to a MySQL database
and have buffering applied automatically.</p>
<textarea rows="11" cols="80">$ModLoad ommysql.so # load the output driver (use ompgsql.so for PostgreSQL)
$ModLoad imudp.so # network reception
+$UDPServerRun 514 # start a udp server at port 514
$ModLoad imuxsock.so # local message reception
$WorkDirectory /rsyslog/work # default location for work (spool) files
diff --git a/obj.c b/obj.c
index 1559628b..9181dec9 100644
--- a/obj.c
+++ b/obj.c
@@ -616,7 +616,9 @@ finalize_it:
/* De-Serialize an object.
* Params: Pointer to object Pointer (pObj) (like a obj_t**, but can not do that due to compiler warning)
- * expected object ID (to check against)
+ * expected object ID (to check against), a fixup function that can modify the object before it is finalized
+ * and a user pointer that is to be passed to that function in addition to the object. The fixup function
+ * pointer may be NULL, in which case none is called.
* The caller must destruct the created object.
* rgerhards, 2008-01-07
*/
diff --git a/queue.c b/queue.c
index f69b0042..ce5ed9d9 100644
--- a/queue.c
+++ b/queue.c
@@ -58,6 +58,7 @@ static int queueChkStopWrkrDA(queue_t *pThis);
static int queueIsIdleDA(queue_t *pThis);
static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2);
+static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex);
/* methods */
@@ -135,8 +136,6 @@ queueTurnOffDAMode(queue_t *pThis)
* during the lifetime of DA-mode, depending on how often the DA worker receives an
* inactivity timeout. -- rgerhards, 2008-01-25
*/
-RUNLOG_VAR("%p", pThis->pqDA);
-RUNLOG_VAR("%d", pThis->pqDA->iQueueSize);
if(pThis->pqDA->iQueueSize == 0) {
pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
@@ -171,7 +170,6 @@ queueChkIsDA(queue_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, queue);
-RUNLOG_VAR("%s", pThis->pszFilePrefix);
if(pThis->pszFilePrefix != NULL) {
pThis->bIsDA = 1;
dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n");
@@ -444,6 +442,62 @@ static rsRetVal qDelFixedArray(queue_t *pThis, void **out)
/* -------------------- linked list -------------------- */
+
+/* first some generic functions which are also used for the unget linked list */
+
+static inline rsRetVal queueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr)
+{
+ DEFiRet;
+ qLinkedList_t *pEntry;
+
+ ASSERT(ppRoot != NULL);
+ ASSERT(ppLast != NULL);
+
+ if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) {
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+
+ pEntry->pNext = NULL;
+ pEntry->pUsr = pUsr;
+
+ if(*ppRoot == NULL) {
+ *ppRoot = *ppLast = pEntry;
+ } else {
+ (*ppLast)->pNext = pEntry;
+ *ppLast = pEntry;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr)
+{
+ DEFiRet;
+ qLinkedList_t *pEntry;
+
+ ASSERT(ppRoot != NULL);
+ ASSERT(ppLast != NULL);
+ ASSERT(ppUsr != NULL);
+ ASSERT(*ppRoot != NULL);
+
+ pEntry = *ppRoot;
+ *ppUsr = pEntry->pUsr;
+
+ if(*ppRoot == *ppLast) {
+ *ppRoot = NULL;
+ *ppLast = NULL;
+ } else {
+ *ppRoot = pEntry->pNext;
+ }
+ free(pEntry);
+
+ RETiRet;
+}
+
+/* end generic functions which are also used for the unget linked list */
+
+
static rsRetVal qConstructLinkedList(queue_t *pThis)
{
DEFiRet;
@@ -475,6 +529,9 @@ static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis)
static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr)
{
DEFiRet;
+
+ iRet = queueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
+#if 0
qLinkedList_t *pEntry;
ASSERT(pThis != NULL);
@@ -493,12 +550,15 @@ static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr)
}
finalize_it:
+#endif
RETiRet;
}
-static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
+static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
+ iRet = queueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
+#if 0
qLinkedList_t *pEntry;
ASSERT(pThis != NULL);
@@ -515,6 +575,7 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
}
free(pEntry);
+#endif
RETiRet;
}
@@ -583,6 +644,8 @@ queueTryLoadPersistedInfo(queue_t *pThis)
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
struct stat stat_buf;
+ int iUngottenObjs;
+ obj_t *pUsr;
ISOBJ_TYPE_assert(pThis, queue);
@@ -613,6 +676,19 @@ queueTryLoadPersistedInfo(queue_t *pThis)
/* first, we try to read the property bag for ourselfs */
CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF));
+ /* then the ungotten object queue */
+ iUngottenObjs = pThis->iUngottenObjs;
+ pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */
+
+RUNLOG_VAR("%d", iUngottenObjs);
+ while(iUngottenObjs > 0) {
+RUNLOG_VAR("%d", iUngottenObjs);
+ /* fill the queue from disk */
+ CHKiRet(objDeserialize((void*) &pUsr, OBJMsg, psQIF, NULL, NULL));
+ queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
+ --iUngottenObjs; /* one less */
+ }
+
/* and now the stream objects (some order as when persisted!) */
CHKiRet(objDeserialize(&pThis->tVars.disk.pWrite, OBJstrm, psQIF,
(rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
@@ -771,6 +847,67 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__
/* --------------- end type-specific handlers -------------------- */
+/* get the overall queue size, which includes ungotten objects. Must only be called
+ * while mutex is locked!
+ * rgerhards, 2008-01-29
+ */
+static inline int
+queueGetOverallQueueSize(queue_t *pThis)
+{
+BEGINfunc
+RUNLOG_VAR("%d", pThis->iQueueSize);
+RUNLOG_VAR("%d", pThis->iUngottenObjs);
+ENDfunc
+ return pThis->iQueueSize + pThis->iUngottenObjs;
+}
+
+
+/* unget a user pointer that has been dequeued. This functionality is especially important
+ * for consumer cancel cleanup handlers. To support it, a short list of ungotten user pointers
+ * is maintened in memory.
+ * rgerhards, 2008-01-20
+ */
+static rsRetVal
+queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex)
+{
+ DEFiRet;
+ DEFVARS_mutexProtection;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_assert(pUsr);
+
+ dbgoprint((obj_t*) pThis, "ungetting user object %s\n", objGetName(pUsr));
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
+ iRet = queueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
+ ++pThis->iUngottenObjs; /* indicate one more */
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+
+ RETiRet;
+}
+
+
+/* dequeues a user pointer from the ungotten queue. Pointers from there should always be
+ * dequeued first.
+ *
+ * This function must only be called when the mutex is locked!
+ *
+ * rgerhards, 2008-01-29
+ */
+static rsRetVal
+queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ ASSERT(ppUsr != NULL);
+
+ iRet = queueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
+ --pThis->iUngottenObjs; /* indicate one less */
+ dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", objGetName(*ppUsr));
+
+ RETiRet;
+}
+
/* generic code to add a queue entry */
static rsRetVal
@@ -790,7 +927,10 @@ finalize_it:
}
-/* generic code to remove a queue entry */
+/* generic code to remove a queue entry
+ * rgerhards, 2008-01-29: we must first see if there is any object in the
+ * ungotten list and, if so, dequeue it first.
+ */
static rsRetVal
queueDel(queue_t *pThis, void *pUsr)
{
@@ -803,8 +943,12 @@ queueDel(queue_t *pThis, void *pUsr)
* If we decrement, however, we may lose a message. But that is better than
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
- iRet = pThis->qDel(pThis, pUsr);
- --pThis->iQueueSize;
+ if(pThis->iUngottenObjs > 0) {
+ iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr);
+ } else {
+ iRet = pThis->qDel(pThis, pUsr);
+ --pThis->iQueueSize;
+ }
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
iRet, pThis->iQueueSize);
@@ -1117,10 +1261,13 @@ queueConsumerCancelCleanup(void *arg1, void *arg2)
ISOBJ_TYPE_assert(pThis, queue);
- dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n");
+ if(pUsr != NULL) {
+ /* make sure the data element is not lost */
+ dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
+ CHKiRet(queueUngetObj(pThis, pUsr, LOCK_MUTEX));
+ }
- /* TODO: re-enqueue the data element! This will also make the compiler warning go away... */
-
+finalize_it:
RETiRet;
}
@@ -1181,7 +1328,7 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
/* dequeue element (still protected from mutex) */
iRet = queueDel(pThis, &pUsr);
queueChkPersist(pThis);
- iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */
+ iQueueSize = queueGetOverallQueueSize(pThis); /* cache this for after mutex release */
bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
pWti->pUsrp = pUsr; /* save it for the cancel cleanup handler */
d_pthread_mutex_unlock(pThis->mut);
@@ -1318,10 +1465,8 @@ static int
queueIsIdleDA(queue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
- BEGINfunc
/* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
- ENDfunc
- return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
+ return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
}
/* must only be called when the queue mutex is locked, else results
* are not stable! Regular queue version
@@ -1329,7 +1474,7 @@ queueIsIdleDA(queue_t *pThis)
static int
queueIsIdleReg(queue_t *pThis)
{
- return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
+ return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
}
@@ -1483,6 +1628,7 @@ static rsRetVal queuePersist(queue_t *pThis)
strm_t *psQIF = NULL; /* Queue Info File */
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
+ obj_t *pUsr;
ASSERT(pThis != NULL);
@@ -1529,14 +1675,22 @@ static rsRetVal queuePersist(queue_t *pThis)
*/
CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
+ objSerializeSCALAR(psQIF, iUngottenObjs, INT);
CHKiRet(objEndSerialize(psQIF));
- /* this is disk specific and must be moved to a function */
+ /* now we must persist all objects on the ungotten queue - they can not go to
+ * to the regular files. -- rgerhards, 2008-01-29
+ */
+ while(pThis->iUngottenObjs > 0) {
+ CHKiRet(queueGetUngottenObj(pThis, &pUsr));
+ CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
+ objDestruct(pUsr);
+ }
+
+ /* now persist the stream info */
CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF));
- /* persist queue object itself */
-
/* tell the input file object that it must not delete the file on close if the queue is non-empty */
CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
@@ -1859,6 +2013,8 @@ static rsRetVal queueSetProperty(queue_t *pThis, property_t *pProp)
if(isProp("iQueueSize")) {
pThis->iQueueSize = pProp->val.vInt;
+ } else if(isProp("iUngottenObjs")) {
+ pThis->iUngottenObjs = pProp->val.vInt;
} else if(isProp("qType")) {
if(pThis->qType != pProp->val.vLong)
ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH);
diff --git a/queue.h b/queue.h
index e48d9796..12e4fcb6 100644
--- a/queue.h
+++ b/queue.h
@@ -116,6 +116,12 @@ typedef struct queue_s {
struct queue_s *pqDA; /* queue for disk-assisted modes */
struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
int bDAEnqOnly; /* EnqOnly setting for DA queue */
+ /* some data elements for the queueUngetObj() functionality. This list should always be short
+ * and is always kept in memory
+ */
+ qLinkedList_t *pUngetRoot;
+ qLinkedList_t *pUngetLast;
+ int iUngottenObjs; /* number of objects currently in the "ungotten" list */
/* now follow queueing mode specific data elements */
union { /* different data elements based on queue type (qType) */
struct {