diff options
-rw-r--r-- | debug.c | 2 | ||||
-rw-r--r-- | doc/rsyslog_high_database_rate.html | 1 | ||||
-rw-r--r-- | obj.c | 4 | ||||
-rw-r--r-- | queue.c | 192 | ||||
-rw-r--r-- | queue.h | 6 |
5 files changed, 185 insertions, 20 deletions
@@ -59,7 +59,7 @@ int Debug; /* debug flag - read-only after startup */ int debugging_on = 0; /* read-only, except on sig USR1 */ static int bLogFuncFlow = 0; /* shall the function entry and exit be logged to the debug log? */ static int bPrintFuncDBOnExit = 0; /* shall the function entry and exit be logged to the debug log? */ -static int bPrintMutexAction = 0; /* shall mutex calls be printed to the debug log? */ +static int bPrintMutexAction = 1; /* shall mutex calls be printed to the debug log? */ static int bPrintTime = 1; /* print a timestamp together with debug message */ static char *pszAltDbgFileName = NULL; /* if set, debug output is *also* sent to here */ static FILE *altdbg = NULL; /* and the handle for alternate debug output */ diff --git a/doc/rsyslog_high_database_rate.html b/doc/rsyslog_high_database_rate.html index fe355c35..7b1f984f 100644 --- a/doc/rsyslog_high_database_rate.html +++ b/doc/rsyslog_high_database_rate.html @@ -74,6 +74,7 @@ following simple config file, you log anything you receive to a MySQL database and have buffering applied automatically.</p> <textarea rows="11" cols="80">$ModLoad ommysql.so # load the output driver (use ompgsql.so for PostgreSQL) $ModLoad imudp.so # network reception +$UDPServerRun 514 # start a udp server at port 514 $ModLoad imuxsock.so # local message reception $WorkDirectory /rsyslog/work # default location for work (spool) files @@ -616,7 +616,9 @@ finalize_it: /* De-Serialize an object. * Params: Pointer to object Pointer (pObj) (like a obj_t**, but can not do that due to compiler warning) - * expected object ID (to check against) + * expected object ID (to check against), a fixup function that can modify the object before it is finalized + * and a user pointer that is to be passed to that function in addition to the object. The fixup function + * pointer may be NULL, in which case none is called. * The caller must destruct the created object. * rgerhards, 2008-01-07 */ @@ -58,6 +58,7 @@ static int queueChkStopWrkrDA(queue_t *pThis); static int queueIsIdleDA(queue_t *pThis); static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave); static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2); +static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex); /* methods */ @@ -135,8 +136,6 @@ queueTurnOffDAMode(queue_t *pThis) * during the lifetime of DA-mode, depending on how often the DA worker receives an * inactivity timeout. -- rgerhards, 2008-01-25 */ -RUNLOG_VAR("%p", pThis->pqDA); -RUNLOG_VAR("%d", pThis->pqDA->iQueueSize); if(pThis->pqDA->iQueueSize == 0) { pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, @@ -171,7 +170,6 @@ queueChkIsDA(queue_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, queue); -RUNLOG_VAR("%s", pThis->pszFilePrefix); if(pThis->pszFilePrefix != NULL) { pThis->bIsDA = 1; dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n"); @@ -444,6 +442,62 @@ static rsRetVal qDelFixedArray(queue_t *pThis, void **out) /* -------------------- linked list -------------------- */ + +/* first some generic functions which are also used for the unget linked list */ + +static inline rsRetVal queueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr) +{ + DEFiRet; + qLinkedList_t *pEntry; + + ASSERT(ppRoot != NULL); + ASSERT(ppLast != NULL); + + if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) { + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + + pEntry->pNext = NULL; + pEntry->pUsr = pUsr; + + if(*ppRoot == NULL) { + *ppRoot = *ppLast = pEntry; + } else { + (*ppLast)->pNext = pEntry; + *ppLast = pEntry; + } + +finalize_it: + RETiRet; +} + +static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr) +{ + DEFiRet; + qLinkedList_t *pEntry; + + ASSERT(ppRoot != NULL); + ASSERT(ppLast != NULL); + ASSERT(ppUsr != NULL); + ASSERT(*ppRoot != NULL); + + pEntry = *ppRoot; + *ppUsr = pEntry->pUsr; + + if(*ppRoot == *ppLast) { + *ppRoot = NULL; + *ppLast = NULL; + } else { + *ppRoot = pEntry->pNext; + } + free(pEntry); + + RETiRet; +} + +/* end generic functions which are also used for the unget linked list */ + + static rsRetVal qConstructLinkedList(queue_t *pThis) { DEFiRet; @@ -475,6 +529,9 @@ static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis) static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr) { DEFiRet; + + iRet = queueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr); +#if 0 qLinkedList_t *pEntry; ASSERT(pThis != NULL); @@ -493,12 +550,15 @@ static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr) } finalize_it: +#endif RETiRet; } -static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) +static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr) { DEFiRet; + iRet = queueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr); +#if 0 qLinkedList_t *pEntry; ASSERT(pThis != NULL); @@ -515,6 +575,7 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) } free(pEntry); +#endif RETiRet; } @@ -583,6 +644,8 @@ queueTryLoadPersistedInfo(queue_t *pThis) uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; struct stat stat_buf; + int iUngottenObjs; + obj_t *pUsr; ISOBJ_TYPE_assert(pThis, queue); @@ -613,6 +676,19 @@ queueTryLoadPersistedInfo(queue_t *pThis) /* first, we try to read the property bag for ourselfs */ CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF)); + /* then the ungotten object queue */ + iUngottenObjs = pThis->iUngottenObjs; + pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */ + +RUNLOG_VAR("%d", iUngottenObjs); + while(iUngottenObjs > 0) { +RUNLOG_VAR("%d", iUngottenObjs); + /* fill the queue from disk */ + CHKiRet(objDeserialize((void*) &pUsr, OBJMsg, psQIF, NULL, NULL)); + queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); + --iUngottenObjs; /* one less */ + } + /* and now the stream objects (some order as when persisted!) */ CHKiRet(objDeserialize(&pThis->tVars.disk.pWrite, OBJstrm, psQIF, (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis)); @@ -771,6 +847,67 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__ /* --------------- end type-specific handlers -------------------- */ +/* get the overall queue size, which includes ungotten objects. Must only be called + * while mutex is locked! + * rgerhards, 2008-01-29 + */ +static inline int +queueGetOverallQueueSize(queue_t *pThis) +{ +BEGINfunc +RUNLOG_VAR("%d", pThis->iQueueSize); +RUNLOG_VAR("%d", pThis->iUngottenObjs); +ENDfunc + return pThis->iQueueSize + pThis->iUngottenObjs; +} + + +/* unget a user pointer that has been dequeued. This functionality is especially important + * for consumer cancel cleanup handlers. To support it, a short list of ungotten user pointers + * is maintened in memory. + * rgerhards, 2008-01-20 + */ +static rsRetVal +queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex) +{ + DEFiRet; + DEFVARS_mutexProtection; + + ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_assert(pUsr); + + dbgoprint((obj_t*) pThis, "ungetting user object %s\n", objGetName(pUsr)); + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex); + iRet = queueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr); + ++pThis->iUngottenObjs; /* indicate one more */ + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + + RETiRet; +} + + +/* dequeues a user pointer from the ungotten queue. Pointers from there should always be + * dequeued first. + * + * This function must only be called when the mutex is locked! + * + * rgerhards, 2008-01-29 + */ +static rsRetVal +queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + ASSERT(ppUsr != NULL); + + iRet = queueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr); + --pThis->iUngottenObjs; /* indicate one less */ + dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", objGetName(*ppUsr)); + + RETiRet; +} + /* generic code to add a queue entry */ static rsRetVal @@ -790,7 +927,10 @@ finalize_it: } -/* generic code to remove a queue entry */ +/* generic code to remove a queue entry + * rgerhards, 2008-01-29: we must first see if there is any object in the + * ungotten list and, if so, dequeue it first. + */ static rsRetVal queueDel(queue_t *pThis, void *pUsr) { @@ -803,8 +943,12 @@ queueDel(queue_t *pThis, void *pUsr) * If we decrement, however, we may lose a message. But that is better than * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ - iRet = pThis->qDel(pThis, pUsr); - --pThis->iQueueSize; + if(pThis->iUngottenObjs > 0) { + iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr); + } else { + iRet = pThis->qDel(pThis, pUsr); + --pThis->iQueueSize; + } dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n", iRet, pThis->iQueueSize); @@ -1117,10 +1261,13 @@ queueConsumerCancelCleanup(void *arg1, void *arg2) ISOBJ_TYPE_assert(pThis, queue); - dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n"); + if(pUsr != NULL) { + /* make sure the data element is not lost */ + dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n"); + CHKiRet(queueUngetObj(pThis, pUsr, LOCK_MUTEX)); + } - /* TODO: re-enqueue the data element! This will also make the compiler warning go away... */ - +finalize_it: RETiRet; } @@ -1181,7 +1328,7 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) /* dequeue element (still protected from mutex) */ iRet = queueDel(pThis, &pUsr); queueChkPersist(pThis); - iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */ + iQueueSize = queueGetOverallQueueSize(pThis); /* cache this for after mutex release */ bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ pWti->pUsrp = pUsr; /* save it for the cancel cleanup handler */ d_pthread_mutex_unlock(pThis->mut); @@ -1318,10 +1465,8 @@ static int queueIsIdleDA(queue_t *pThis) { /* remember: iQueueSize is the DA queue size, not the main queue! */ - BEGINfunc /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */ - ENDfunc - return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk)); + return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); } /* must only be called when the queue mutex is locked, else results * are not stable! Regular queue version @@ -1329,7 +1474,7 @@ queueIsIdleDA(queue_t *pThis) static int queueIsIdleReg(queue_t *pThis) { - return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk)); + return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); } @@ -1483,6 +1628,7 @@ static rsRetVal queuePersist(queue_t *pThis) strm_t *psQIF = NULL; /* Queue Info File */ uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; + obj_t *pUsr; ASSERT(pThis != NULL); @@ -1529,14 +1675,22 @@ static rsRetVal queuePersist(queue_t *pThis) */ CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis)); objSerializeSCALAR(psQIF, iQueueSize, INT); + objSerializeSCALAR(psQIF, iUngottenObjs, INT); CHKiRet(objEndSerialize(psQIF)); - /* this is disk specific and must be moved to a function */ + /* now we must persist all objects on the ungotten queue - they can not go to + * to the regular files. -- rgerhards, 2008-01-29 + */ + while(pThis->iUngottenObjs > 0) { + CHKiRet(queueGetUngottenObj(pThis, &pUsr)); + CHKiRet((objSerialize(pUsr))(pUsr, psQIF)); + objDestruct(pUsr); + } + + /* now persist the stream info */ CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF)); CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF)); - /* persist queue object itself */ - /* tell the input file object that it must not delete the file on close if the queue is non-empty */ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0)); @@ -1859,6 +2013,8 @@ static rsRetVal queueSetProperty(queue_t *pThis, property_t *pProp) if(isProp("iQueueSize")) { pThis->iQueueSize = pProp->val.vInt; + } else if(isProp("iUngottenObjs")) { + pThis->iUngottenObjs = pProp->val.vInt; } else if(isProp("qType")) { if(pThis->qType != pProp->val.vLong) ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH); @@ -116,6 +116,12 @@ typedef struct queue_s { struct queue_s *pqDA; /* queue for disk-assisted modes */ struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */ int bDAEnqOnly; /* EnqOnly setting for DA queue */ + /* some data elements for the queueUngetObj() functionality. This list should always be short + * and is always kept in memory + */ + qLinkedList_t *pUngetRoot; + qLinkedList_t *pUngetLast; + int iUngottenObjs; /* number of objects currently in the "ungotten" list */ /* now follow queueing mode specific data elements */ union { /* different data elements based on queue type (qType) */ struct { |