summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-18 18:39:52 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-18 18:39:52 +0200
commitfe5bea77ac2533faab3b7b73bc253c4dc7d702bc (patch)
tree66df98c9369b733b023bfadcac3c52bafccaa676 /runtime/queue.c
parent93f873277bfe5ebb309ff5e92f5dc7244ebd9f1a (diff)
downloadrsyslog-fe5bea77ac2533faab3b7b73bc253c4dc7d702bc.tar.gz
rsyslog-fe5bea77ac2533faab3b7b73bc253c4dc7d702bc.tar.xz
rsyslog-fe5bea77ac2533faab3b7b73bc253c4dc7d702bc.zip
removed queue's UngetObj() call
... which is no longer needed thanks to the new queue design.
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c110
1 files changed, 11 insertions, 99 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index dc399066..8ef3e7db 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -73,7 +73,6 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static int qqueueIsIdleDA(qqueue_t *pThis);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2);
-static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -169,7 +168,7 @@ finalize_it:
/* methods */
-/* get the overall queue size, which includes ungotten objects. Must only be called
+/* get the overall queue size. Must only be called
* while mutex is locked!
* rgerhards, 2008-01-29
*/
@@ -178,11 +177,11 @@ qqueueGetOverallQueueSize(qqueue_t *pThis)
{
#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */
BEGINfunc
-dbgoprint((obj_t*) pThis, "queue size: %d (regular %d, ungotten %d)\n",
- pThis->iQueueSize + pThis->iUngottenObjs, pThis->iQueueSize, pThis->iUngottenObjs);
+dbgoprint((obj_t*) pThis, "queue size: %d (regular %d)\n",
+ pThis->iQueueSize, pThis->iQueueSize);
ENDfunc
#endif
- return pThis->iQueueSize + pThis->iUngottenObjs;
+ return pThis->iQueueSize;
}
@@ -837,8 +836,6 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
struct stat stat_buf;
- int iUngottenObjs;
- obj_t *pUsr;
ISOBJ_TYPE_assert(pThis, qqueue);
@@ -868,18 +865,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
/* first, we try to read the property bag for ourselfs */
CHKiRet(obj.DeserializePropBag((obj_t*) pThis, psQIF));
- /* then the ungotten object queue */
- iUngottenObjs = pThis->iUngottenObjs;
- pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */
-
- while(iUngottenObjs > 0) {
- /* fill the queue from disk */
- CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL));
- UngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
- --iUngottenObjs; /* one less */
- }
-
- /* and now the stream objects (some order as when persisted!) */
+ /* then the stream objects (same order as when persisted!) */
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF,
@@ -1122,57 +1108,6 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
/* --------------- end type-specific handlers -------------------- */
-/* unget a user pointer that has been dequeued. This functionality is especially important
- * for consumer cancel cleanup handlers. To support it, a short list of ungotten user pointers
- * is maintened in memory.
- * rgerhards, 2008-01-20
- */
-static rsRetVal
-UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
-{
- DEFiRet;
- DEFVARS_mutexProtection;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
- ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15
- The second time I noticed it the queue was in destruction with NO worker threads
- running. The pUsr ptr was totally off and provided no clue what it may be pointing
- at (except that it looked like the static data pool). Both times, the abort happend
- inside an action queue */
-
- dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr));
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
- iRet = qqueueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
- ++pThis->iUngottenObjs; /* indicate one more */
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
-
- RETiRet;
-}
-
-
-/* dequeues a user pointer from the ungotten queue. Pointers from there should always be
- * dequeued first.
- *
- * This function must only be called when the mutex is locked!
- *
- * rgerhards, 2008-01-29
- */
-static rsRetVal
-GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
- ASSERT(ppUsr != NULL);
-
- iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
- --pThis->iUngottenObjs; /* indicate one less */
- dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr));
-
- RETiRet;
-}
-
-
/* generic code to add a queue entry
* We use some specific code to most efficiently support direct mode
* queues. This is justified in spite of the gain and the need to do some
@@ -1198,8 +1133,6 @@ finalize_it:
/* generic code to remove a queue entry
- * rgerhards, 2008-01-29: we must first see if there is any object in the
- * ungotten list and, if so, dequeue it first.
*/
static rsRetVal
qqueueDel(qqueue_t *pThis, void *pUsr)
@@ -1213,13 +1146,8 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
* If we decrement, however, we may lose a message. But that is better than
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
- if(pThis->iUngottenObjs > 0) {
- iRet = GetUngottenObj(pThis, (obj_t**) pUsr);
- } else {
- iRet = pThis->qDeq(pThis, pUsr);
- // TODO: ULTRA iRet = pThis->qDel(pThis, pUsr);
- ATOMIC_DEC(pThis->iQueueSize);
- }
+ iRet = pThis->qDeq(pThis, pUsr);
+ ATOMIC_DEC(pThis->iQueueSize);
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
iRet, pThis->iQueueSize);
@@ -1528,6 +1456,8 @@ finalize_it:
static rsRetVal
ConsumerCancelCleanup(void *arg1, void *arg2)
{
+ //TODO: looks like we no longer need it!
+ /*
DEFiRet;
qqueue_t *pThis = (qqueue_t*) arg1;
@@ -1535,14 +1465,9 @@ ConsumerCancelCleanup(void *arg1, void *arg2)
ISOBJ_TYPE_assert(pThis, qqueue);
- if(pUsr != NULL) {
- /* make sure the data element is not lost */
- dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
- CHKiRet(UngetObj(pThis, pUsr, LOCK_MUTEX));
- }
-
-finalize_it:
RETiRet;
+ */
+ return RS_RET_OK;
}
@@ -2188,7 +2113,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
strm_t *psQIF = NULL; /* Queue Info File */
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
- obj_t *pUsr;
ASSERT(pThis != NULL);
@@ -2235,20 +2159,10 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
*/
CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
- objSerializeSCALAR(psQIF, iUngottenObjs, INT);
objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64);
objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64);
CHKiRet(obj.EndSerialize(psQIF));
- /* now we must persist all objects on the ungotten queue - they can not go to
- * to the regular files. -- rgerhards, 2008-01-29
- */
- while(pThis->iUngottenObjs > 0) {
- CHKiRet(GetUngottenObj(pThis, &pUsr));
- CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
- objDestruct(pUsr);
- }
-
/* now persist the stream info */
CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
CHKiRet(strmSerialize(pThis->tVars.disk.pReadDel, psQIF));
@@ -2615,8 +2529,6 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp)
if(isProp("iQueueSize")) {
pThis->iQueueSize = pProp->val.num;
- } else if(isProp("iUngottenObjs")) {
- pThis->iUngottenObjs = pProp->val.num;
} else if(isProp("tVars.disk.sizeOnDisk")) {
pThis->tVars.disk.sizeOnDisk = pProp->val.num;
} else if(isProp("tVars.disk.bytesRead")) {