diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/queue.c | 110 | ||||
-rw-r--r-- | runtime/queue.h | 6 |
2 files changed, 11 insertions, 105 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index dc399066..8ef3e7db 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -73,7 +73,6 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2); -static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -169,7 +168,7 @@ finalize_it: /* methods */ -/* get the overall queue size, which includes ungotten objects. Must only be called +/* get the overall queue size. Must only be called * while mutex is locked! * rgerhards, 2008-01-29 */ @@ -178,11 +177,11 @@ qqueueGetOverallQueueSize(qqueue_t *pThis) { #if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */ BEGINfunc -dbgoprint((obj_t*) pThis, "queue size: %d (regular %d, ungotten %d)\n", - pThis->iQueueSize + pThis->iUngottenObjs, pThis->iQueueSize, pThis->iUngottenObjs); +dbgoprint((obj_t*) pThis, "queue size: %d (regular %d)\n", + pThis->iQueueSize, pThis->iQueueSize); ENDfunc #endif - return pThis->iQueueSize + pThis->iUngottenObjs; + return pThis->iQueueSize; } @@ -837,8 +836,6 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; struct stat stat_buf; - int iUngottenObjs; - obj_t *pUsr; ISOBJ_TYPE_assert(pThis, qqueue); @@ -868,18 +865,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) /* first, we try to read the property bag for ourselfs */ CHKiRet(obj.DeserializePropBag((obj_t*) pThis, psQIF)); - /* then the ungotten object queue */ - iUngottenObjs = pThis->iUngottenObjs; - pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */ - - while(iUngottenObjs > 0) { - /* fill the queue from disk */ - CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL)); - UngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); - --iUngottenObjs; /* one less */ - } - - /* and now the stream objects (some order as when persisted!) */ + /* then the stream objects (same order as when persisted!) */ CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF, (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF, @@ -1122,57 +1108,6 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) /* --------------- end type-specific handlers -------------------- */ -/* unget a user pointer that has been dequeued. This functionality is especially important - * for consumer cancel cleanup handlers. To support it, a short list of ungotten user pointers - * is maintened in memory. - * rgerhards, 2008-01-20 - */ -static rsRetVal -UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) -{ - DEFiRet; - DEFVARS_mutexProtection; - - ISOBJ_TYPE_assert(pThis, qqueue); - ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15 - The second time I noticed it the queue was in destruction with NO worker threads - running. The pUsr ptr was totally off and provided no clue what it may be pointing - at (except that it looked like the static data pool). Both times, the abort happend - inside an action queue */ - - dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr)); - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex); - iRet = qqueueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr); - ++pThis->iUngottenObjs; /* indicate one more */ - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - - RETiRet; -} - - -/* dequeues a user pointer from the ungotten queue. Pointers from there should always be - * dequeued first. - * - * This function must only be called when the mutex is locked! - * - * rgerhards, 2008-01-29 - */ -static rsRetVal -GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - ASSERT(ppUsr != NULL); - - iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr); - --pThis->iUngottenObjs; /* indicate one less */ - dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr)); - - RETiRet; -} - - /* generic code to add a queue entry * We use some specific code to most efficiently support direct mode * queues. This is justified in spite of the gain and the need to do some @@ -1198,8 +1133,6 @@ finalize_it: /* generic code to remove a queue entry - * rgerhards, 2008-01-29: we must first see if there is any object in the - * ungotten list and, if so, dequeue it first. */ static rsRetVal qqueueDel(qqueue_t *pThis, void *pUsr) @@ -1213,13 +1146,8 @@ qqueueDel(qqueue_t *pThis, void *pUsr) * If we decrement, however, we may lose a message. But that is better than * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ - if(pThis->iUngottenObjs > 0) { - iRet = GetUngottenObj(pThis, (obj_t**) pUsr); - } else { - iRet = pThis->qDeq(pThis, pUsr); - // TODO: ULTRA iRet = pThis->qDel(pThis, pUsr); - ATOMIC_DEC(pThis->iQueueSize); - } + iRet = pThis->qDeq(pThis, pUsr); + ATOMIC_DEC(pThis->iQueueSize); dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n", iRet, pThis->iQueueSize); @@ -1528,6 +1456,8 @@ finalize_it: static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2) { + //TODO: looks like we no longer need it! + /* DEFiRet; qqueue_t *pThis = (qqueue_t*) arg1; @@ -1535,14 +1465,9 @@ ConsumerCancelCleanup(void *arg1, void *arg2) ISOBJ_TYPE_assert(pThis, qqueue); - if(pUsr != NULL) { - /* make sure the data element is not lost */ - dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n"); - CHKiRet(UngetObj(pThis, pUsr, LOCK_MUTEX)); - } - -finalize_it: RETiRet; + */ + return RS_RET_OK; } @@ -2188,7 +2113,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) strm_t *psQIF = NULL; /* Queue Info File */ uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; - obj_t *pUsr; ASSERT(pThis != NULL); @@ -2235,20 +2159,10 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) */ CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis)); objSerializeSCALAR(psQIF, iQueueSize, INT); - objSerializeSCALAR(psQIF, iUngottenObjs, INT); objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64); objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64); CHKiRet(obj.EndSerialize(psQIF)); - /* now we must persist all objects on the ungotten queue - they can not go to - * to the regular files. -- rgerhards, 2008-01-29 - */ - while(pThis->iUngottenObjs > 0) { - CHKiRet(GetUngottenObj(pThis, &pUsr)); - CHKiRet((objSerialize(pUsr))(pUsr, psQIF)); - objDestruct(pUsr); - } - /* now persist the stream info */ CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF)); CHKiRet(strmSerialize(pThis->tVars.disk.pReadDel, psQIF)); @@ -2615,8 +2529,6 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp) if(isProp("iQueueSize")) { pThis->iQueueSize = pProp->val.num; - } else if(isProp("iUngottenObjs")) { - pThis->iUngottenObjs = pProp->val.num; } else if(isProp("tVars.disk.sizeOnDisk")) { pThis->tVars.disk.sizeOnDisk = pProp->val.num; } else if(isProp("tVars.disk.bytesRead")) { diff --git a/runtime/queue.h b/runtime/queue.h index 00cee419..e47b8762 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -152,12 +152,6 @@ typedef struct queue_s { struct queue_s *pqDA; /* queue for disk-assisted modes */ struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */ int bDAEnqOnly; /* EnqOnly setting for DA queue */ - /* some data elements for the queueUngetObj() functionality. This list should always be short - * and is always kept in memory - */ - qLinkedList_t *pUngetRoot; - qLinkedList_t *pUngetLast; - int iUngottenObjs; /* number of objects currently in the "ungotten" list */ /* now follow queueing mode specific data elements */ union { /* different data elements based on queue type (qType) */ struct { |