summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-18 18:39:52 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-18 18:39:52 +0200
commitfe5bea77ac2533faab3b7b73bc253c4dc7d702bc (patch)
tree66df98c9369b733b023bfadcac3c52bafccaa676
parent93f873277bfe5ebb309ff5e92f5dc7244ebd9f1a (diff)
downloadrsyslog-fe5bea77ac2533faab3b7b73bc253c4dc7d702bc.tar.gz
rsyslog-fe5bea77ac2533faab3b7b73bc253c4dc7d702bc.tar.xz
rsyslog-fe5bea77ac2533faab3b7b73bc253c4dc7d702bc.zip
removed queue's UngetObj() call
... which is no longer needed thanks to the new queue design.
-rw-r--r--doc/design.tex8
-rw-r--r--runtime/queue.c110
-rw-r--r--runtime/queue.h6
3 files changed, 15 insertions, 109 deletions
diff --git a/doc/design.tex b/doc/design.tex
index c03e1fab..53d25313 100644
--- a/doc/design.tex
+++ b/doc/design.tex
@@ -433,11 +433,11 @@ message-caused. This is under the assumption that any reasonable responsive
admin will hopefully test his configuration at least once before turning it
into production. And config SQL errors should manifest immediately, so I
expect these to be fixed before a configuration runs in production. So it is
-the chore of the output module to interpret the return code it received from
-its API and decide whether this is more likely action-caused or
+the duty of the output module to interpret the return code it received from
+the API call and decide whether the failure is more likely action-caused or
message-caused. For database outputs, I would assume that it is always easy
-to classify failures that can only be action-caused, especially in the
-dominating case of a failed network connection or a failed server.
+to classify failures that must be action-caused, especially in the
+dominating cases of failed network connections or failed servers.
For other outputs it may not be as easy. But, for example, all stream network
outputs can detect a broken connection, so this also is a sure fit.
diff --git a/runtime/queue.c b/runtime/queue.c
index dc399066..8ef3e7db 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -73,7 +73,6 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static int qqueueIsIdleDA(qqueue_t *pThis);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2);
-static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -169,7 +168,7 @@ finalize_it:
/* methods */
-/* get the overall queue size, which includes ungotten objects. Must only be called
+/* get the overall queue size. Must only be called
* while mutex is locked!
* rgerhards, 2008-01-29
*/
@@ -178,11 +177,11 @@ qqueueGetOverallQueueSize(qqueue_t *pThis)
{
#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */
BEGINfunc
-dbgoprint((obj_t*) pThis, "queue size: %d (regular %d, ungotten %d)\n",
- pThis->iQueueSize + pThis->iUngottenObjs, pThis->iQueueSize, pThis->iUngottenObjs);
+dbgoprint((obj_t*) pThis, "queue size: %d (regular %d)\n",
+ pThis->iQueueSize, pThis->iQueueSize);
ENDfunc
#endif
- return pThis->iQueueSize + pThis->iUngottenObjs;
+ return pThis->iQueueSize;
}
@@ -837,8 +836,6 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
struct stat stat_buf;
- int iUngottenObjs;
- obj_t *pUsr;
ISOBJ_TYPE_assert(pThis, qqueue);
@@ -868,18 +865,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
/* first, we try to read the property bag for ourselfs */
CHKiRet(obj.DeserializePropBag((obj_t*) pThis, psQIF));
- /* then the ungotten object queue */
- iUngottenObjs = pThis->iUngottenObjs;
- pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */
-
- while(iUngottenObjs > 0) {
- /* fill the queue from disk */
- CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL));
- UngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
- --iUngottenObjs; /* one less */
- }
-
- /* and now the stream objects (some order as when persisted!) */
+ /* then the stream objects (same order as when persisted!) */
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF,
@@ -1122,57 +1108,6 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
/* --------------- end type-specific handlers -------------------- */
-/* unget a user pointer that has been dequeued. This functionality is especially important
- * for consumer cancel cleanup handlers. To support it, a short list of ungotten user pointers
- * is maintened in memory.
- * rgerhards, 2008-01-20
- */
-static rsRetVal
-UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
-{
- DEFiRet;
- DEFVARS_mutexProtection;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
- ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15
- The second time I noticed it the queue was in destruction with NO worker threads
- running. The pUsr ptr was totally off and provided no clue what it may be pointing
- at (except that it looked like the static data pool). Both times, the abort happend
- inside an action queue */
-
- dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr));
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
- iRet = qqueueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
- ++pThis->iUngottenObjs; /* indicate one more */
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
-
- RETiRet;
-}
-
-
-/* dequeues a user pointer from the ungotten queue. Pointers from there should always be
- * dequeued first.
- *
- * This function must only be called when the mutex is locked!
- *
- * rgerhards, 2008-01-29
- */
-static rsRetVal
-GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
- ASSERT(ppUsr != NULL);
-
- iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
- --pThis->iUngottenObjs; /* indicate one less */
- dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr));
-
- RETiRet;
-}
-
-
/* generic code to add a queue entry
* We use some specific code to most efficiently support direct mode
* queues. This is justified in spite of the gain and the need to do some
@@ -1198,8 +1133,6 @@ finalize_it:
/* generic code to remove a queue entry
- * rgerhards, 2008-01-29: we must first see if there is any object in the
- * ungotten list and, if so, dequeue it first.
*/
static rsRetVal
qqueueDel(qqueue_t *pThis, void *pUsr)
@@ -1213,13 +1146,8 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
* If we decrement, however, we may lose a message. But that is better than
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
- if(pThis->iUngottenObjs > 0) {
- iRet = GetUngottenObj(pThis, (obj_t**) pUsr);
- } else {
- iRet = pThis->qDeq(pThis, pUsr);
- // TODO: ULTRA iRet = pThis->qDel(pThis, pUsr);
- ATOMIC_DEC(pThis->iQueueSize);
- }
+ iRet = pThis->qDeq(pThis, pUsr);
+ ATOMIC_DEC(pThis->iQueueSize);
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
iRet, pThis->iQueueSize);
@@ -1528,6 +1456,8 @@ finalize_it:
static rsRetVal
ConsumerCancelCleanup(void *arg1, void *arg2)
{
+ //TODO: looks like we no longer need it!
+ /*
DEFiRet;
qqueue_t *pThis = (qqueue_t*) arg1;
@@ -1535,14 +1465,9 @@ ConsumerCancelCleanup(void *arg1, void *arg2)
ISOBJ_TYPE_assert(pThis, qqueue);
- if(pUsr != NULL) {
- /* make sure the data element is not lost */
- dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
- CHKiRet(UngetObj(pThis, pUsr, LOCK_MUTEX));
- }
-
-finalize_it:
RETiRet;
+ */
+ return RS_RET_OK;
}
@@ -2188,7 +2113,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
strm_t *psQIF = NULL; /* Queue Info File */
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
- obj_t *pUsr;
ASSERT(pThis != NULL);
@@ -2235,20 +2159,10 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
*/
CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
- objSerializeSCALAR(psQIF, iUngottenObjs, INT);
objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64);
objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64);
CHKiRet(obj.EndSerialize(psQIF));
- /* now we must persist all objects on the ungotten queue - they can not go to
- * to the regular files. -- rgerhards, 2008-01-29
- */
- while(pThis->iUngottenObjs > 0) {
- CHKiRet(GetUngottenObj(pThis, &pUsr));
- CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
- objDestruct(pUsr);
- }
-
/* now persist the stream info */
CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
CHKiRet(strmSerialize(pThis->tVars.disk.pReadDel, psQIF));
@@ -2615,8 +2529,6 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp)
if(isProp("iQueueSize")) {
pThis->iQueueSize = pProp->val.num;
- } else if(isProp("iUngottenObjs")) {
- pThis->iUngottenObjs = pProp->val.num;
} else if(isProp("tVars.disk.sizeOnDisk")) {
pThis->tVars.disk.sizeOnDisk = pProp->val.num;
} else if(isProp("tVars.disk.bytesRead")) {
diff --git a/runtime/queue.h b/runtime/queue.h
index 00cee419..e47b8762 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -152,12 +152,6 @@ typedef struct queue_s {
struct queue_s *pqDA; /* queue for disk-assisted modes */
struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
int bDAEnqOnly; /* EnqOnly setting for DA queue */
- /* some data elements for the queueUngetObj() functionality. This list should always be short
- * and is always kept in memory
- */
- qLinkedList_t *pUngetRoot;
- qLinkedList_t *pUngetLast;
- int iUngottenObjs; /* number of objects currently in the "ungotten" list */
/* now follow queueing mode specific data elements */
union { /* different data elements based on queue type (qType) */
struct {