summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-29 17:36:19 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-29 17:36:19 +0000
commite12e53cfbb8b8764dcbd4da3ff143ad9d46a8795 (patch)
tree18d6567c396b258ca4fb373421168de1588a640a /queue.c
parent7adc80509c80239802716233f8168d566e278873 (diff)
downloadrsyslog-e12e53cfbb8b8764dcbd4da3ff143ad9d46a8795.tar.gz
rsyslog-e12e53cfbb8b8764dcbd4da3ff143ad9d46a8795.tar.xz
rsyslog-e12e53cfbb8b8764dcbd4da3ff143ad9d46a8795.zip
added ability to re-enqueue objects into the queue when a worker thread is
cancelled
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c192
1 files changed, 174 insertions, 18 deletions
diff --git a/queue.c b/queue.c
index f69b0042..ce5ed9d9 100644
--- a/queue.c
+++ b/queue.c
@@ -58,6 +58,7 @@ static int queueChkStopWrkrDA(queue_t *pThis);
static int queueIsIdleDA(queue_t *pThis);
static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2);
+static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex);
/* methods */
@@ -135,8 +136,6 @@ queueTurnOffDAMode(queue_t *pThis)
* during the lifetime of DA-mode, depending on how often the DA worker receives an
* inactivity timeout. -- rgerhards, 2008-01-25
*/
-RUNLOG_VAR("%p", pThis->pqDA);
-RUNLOG_VAR("%d", pThis->pqDA->iQueueSize);
if(pThis->pqDA->iQueueSize == 0) {
pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
@@ -171,7 +170,6 @@ queueChkIsDA(queue_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, queue);
-RUNLOG_VAR("%s", pThis->pszFilePrefix);
if(pThis->pszFilePrefix != NULL) {
pThis->bIsDA = 1;
dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n");
@@ -444,6 +442,62 @@ static rsRetVal qDelFixedArray(queue_t *pThis, void **out)
/* -------------------- linked list -------------------- */
+
+/* first some generic functions which are also used for the unget linked list */
+
+static inline rsRetVal queueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr)
+{
+ DEFiRet;
+ qLinkedList_t *pEntry;
+
+ ASSERT(ppRoot != NULL);
+ ASSERT(ppLast != NULL);
+
+ if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) {
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+
+ pEntry->pNext = NULL;
+ pEntry->pUsr = pUsr;
+
+ if(*ppRoot == NULL) {
+ *ppRoot = *ppLast = pEntry;
+ } else {
+ (*ppLast)->pNext = pEntry;
+ *ppLast = pEntry;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr)
+{
+ DEFiRet;
+ qLinkedList_t *pEntry;
+
+ ASSERT(ppRoot != NULL);
+ ASSERT(ppLast != NULL);
+ ASSERT(ppUsr != NULL);
+ ASSERT(*ppRoot != NULL);
+
+ pEntry = *ppRoot;
+ *ppUsr = pEntry->pUsr;
+
+ if(*ppRoot == *ppLast) {
+ *ppRoot = NULL;
+ *ppLast = NULL;
+ } else {
+ *ppRoot = pEntry->pNext;
+ }
+ free(pEntry);
+
+ RETiRet;
+}
+
+/* end generic functions which are also used for the unget linked list */
+
+
static rsRetVal qConstructLinkedList(queue_t *pThis)
{
DEFiRet;
@@ -475,6 +529,9 @@ static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis)
static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr)
{
DEFiRet;
+
+ iRet = queueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
+#if 0
qLinkedList_t *pEntry;
ASSERT(pThis != NULL);
@@ -493,12 +550,15 @@ static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr)
}
finalize_it:
+#endif
RETiRet;
}
-static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
+static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
+ iRet = queueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
+#if 0
qLinkedList_t *pEntry;
ASSERT(pThis != NULL);
@@ -515,6 +575,7 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
}
free(pEntry);
+#endif
RETiRet;
}
@@ -583,6 +644,8 @@ queueTryLoadPersistedInfo(queue_t *pThis)
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
struct stat stat_buf;
+ int iUngottenObjs;
+ obj_t *pUsr;
ISOBJ_TYPE_assert(pThis, queue);
@@ -613,6 +676,19 @@ queueTryLoadPersistedInfo(queue_t *pThis)
/* first, we try to read the property bag for ourselfs */
CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF));
+ /* then the ungotten object queue */
+ iUngottenObjs = pThis->iUngottenObjs;
+ pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */
+
+RUNLOG_VAR("%d", iUngottenObjs);
+ while(iUngottenObjs > 0) {
+RUNLOG_VAR("%d", iUngottenObjs);
+ /* fill the queue from disk */
+ CHKiRet(objDeserialize((void*) &pUsr, OBJMsg, psQIF, NULL, NULL));
+ queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
+ --iUngottenObjs; /* one less */
+ }
+
/* and now the stream objects (some order as when persisted!) */
CHKiRet(objDeserialize(&pThis->tVars.disk.pWrite, OBJstrm, psQIF,
(rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
@@ -771,6 +847,67 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__
/* --------------- end type-specific handlers -------------------- */
+/* get the overall queue size, which includes ungotten objects. Must only be called
+ * while mutex is locked!
+ * rgerhards, 2008-01-29
+ */
+static inline int
+queueGetOverallQueueSize(queue_t *pThis)
+{
+BEGINfunc
+RUNLOG_VAR("%d", pThis->iQueueSize);
+RUNLOG_VAR("%d", pThis->iUngottenObjs);
+ENDfunc
+ return pThis->iQueueSize + pThis->iUngottenObjs;
+}
+
+
+/* unget a user pointer that has been dequeued. This functionality is especially important
+ * for consumer cancel cleanup handlers. To support it, a short list of ungotten user pointers
+ * is maintened in memory.
+ * rgerhards, 2008-01-20
+ */
+static rsRetVal
+queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex)
+{
+ DEFiRet;
+ DEFVARS_mutexProtection;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_assert(pUsr);
+
+ dbgoprint((obj_t*) pThis, "ungetting user object %s\n", objGetName(pUsr));
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
+ iRet = queueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
+ ++pThis->iUngottenObjs; /* indicate one more */
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+
+ RETiRet;
+}
+
+
+/* dequeues a user pointer from the ungotten queue. Pointers from there should always be
+ * dequeued first.
+ *
+ * This function must only be called when the mutex is locked!
+ *
+ * rgerhards, 2008-01-29
+ */
+static rsRetVal
+queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ ASSERT(ppUsr != NULL);
+
+ iRet = queueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
+ --pThis->iUngottenObjs; /* indicate one less */
+ dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", objGetName(*ppUsr));
+
+ RETiRet;
+}
+
/* generic code to add a queue entry */
static rsRetVal
@@ -790,7 +927,10 @@ finalize_it:
}
-/* generic code to remove a queue entry */
+/* generic code to remove a queue entry
+ * rgerhards, 2008-01-29: we must first see if there is any object in the
+ * ungotten list and, if so, dequeue it first.
+ */
static rsRetVal
queueDel(queue_t *pThis, void *pUsr)
{
@@ -803,8 +943,12 @@ queueDel(queue_t *pThis, void *pUsr)
* If we decrement, however, we may lose a message. But that is better than
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
- iRet = pThis->qDel(pThis, pUsr);
- --pThis->iQueueSize;
+ if(pThis->iUngottenObjs > 0) {
+ iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr);
+ } else {
+ iRet = pThis->qDel(pThis, pUsr);
+ --pThis->iQueueSize;
+ }
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
iRet, pThis->iQueueSize);
@@ -1117,10 +1261,13 @@ queueConsumerCancelCleanup(void *arg1, void *arg2)
ISOBJ_TYPE_assert(pThis, queue);
- dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n");
+ if(pUsr != NULL) {
+ /* make sure the data element is not lost */
+ dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
+ CHKiRet(queueUngetObj(pThis, pUsr, LOCK_MUTEX));
+ }
- /* TODO: re-enqueue the data element! This will also make the compiler warning go away... */
-
+finalize_it:
RETiRet;
}
@@ -1181,7 +1328,7 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
/* dequeue element (still protected from mutex) */
iRet = queueDel(pThis, &pUsr);
queueChkPersist(pThis);
- iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */
+ iQueueSize = queueGetOverallQueueSize(pThis); /* cache this for after mutex release */
bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
pWti->pUsrp = pUsr; /* save it for the cancel cleanup handler */
d_pthread_mutex_unlock(pThis->mut);
@@ -1318,10 +1465,8 @@ static int
queueIsIdleDA(queue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
- BEGINfunc
/* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
- ENDfunc
- return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
+ return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
}
/* must only be called when the queue mutex is locked, else results
* are not stable! Regular queue version
@@ -1329,7 +1474,7 @@ queueIsIdleDA(queue_t *pThis)
static int
queueIsIdleReg(queue_t *pThis)
{
- return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
+ return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
}
@@ -1483,6 +1628,7 @@ static rsRetVal queuePersist(queue_t *pThis)
strm_t *psQIF = NULL; /* Queue Info File */
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
+ obj_t *pUsr;
ASSERT(pThis != NULL);
@@ -1529,14 +1675,22 @@ static rsRetVal queuePersist(queue_t *pThis)
*/
CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
+ objSerializeSCALAR(psQIF, iUngottenObjs, INT);
CHKiRet(objEndSerialize(psQIF));
- /* this is disk specific and must be moved to a function */
+ /* now we must persist all objects on the ungotten queue - they can not go to
+ * to the regular files. -- rgerhards, 2008-01-29
+ */
+ while(pThis->iUngottenObjs > 0) {
+ CHKiRet(queueGetUngottenObj(pThis, &pUsr));
+ CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
+ objDestruct(pUsr);
+ }
+
+ /* now persist the stream info */
CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF));
- /* persist queue object itself */
-
/* tell the input file object that it must not delete the file on close if the queue is non-empty */
CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
@@ -1859,6 +2013,8 @@ static rsRetVal queueSetProperty(queue_t *pThis, property_t *pProp)
if(isProp("iQueueSize")) {
pThis->iQueueSize = pProp->val.vInt;
+ } else if(isProp("iUngottenObjs")) {
+ pThis->iUngottenObjs = pProp->val.vInt;
} else if(isProp("qType")) {
if(pThis->qType != pProp->val.vLong)
ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH);