diff options
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 31 |
1 files changed, 17 insertions, 14 deletions
@@ -253,8 +253,8 @@ queueStartDA(queue_t *pThis) CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer)); /* give it a name */ - snprintf((char*) pszDAQName, sizeof(pszDAQName)/sizeof(uchar), "%s[DA]", objGetName((obj_t*) pThis)); - objSetName((obj_t*) pThis->pqDA, pszDAQName); + snprintf((char*) pszDAQName, sizeof(pszDAQName)/sizeof(uchar), "%s[DA]", obj.GetName((obj_t*) pThis)); + obj.SetName((obj_t*) pThis->pqDA, pszDAQName); /* as the created queue is the same object class, we take the * liberty to access its properties directly. @@ -337,7 +337,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) * rgerhards, 2008-01-24 */ if(pThis->pWtpDA == NULL) { - lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", objGetName((obj_t*) pThis)); + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA)); @@ -720,7 +720,7 @@ queueTryLoadPersistedInfo(queue_t *pThis) CHKiRet(strmConstructFinalize(psQIF)); /* first, we try to read the property bag for ourselfs */ - CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF)); + CHKiRet(obj.DeserializePropBag((obj_t*) pThis, psQIF)); /* then the ungotten object queue */ iUngottenObjs = pThis->iUngottenObjs; @@ -728,15 +728,15 @@ queueTryLoadPersistedInfo(queue_t *pThis) while(iUngottenObjs > 0) { /* fill the queue from disk */ - CHKiRet(objDeserialize((void*) &pUsr, OBJmsg, psQIF, NULL, NULL)); + CHKiRet(obj.Deserialize((void*) &pUsr, OBJmsg, psQIF, NULL, NULL)); queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); --iUngottenObjs; /* one less */ } /* and now the stream objects (some order as when persisted!) */ - CHKiRet(objDeserialize(&pThis->tVars.disk.pWrite, OBJstrm, psQIF, + CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, OBJstrm, psQIF, (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis)); - CHKiRet(objDeserialize(&pThis->tVars.disk.pRead, OBJstrm, psQIF, + CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, OBJstrm, psQIF, (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis)); CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite)); @@ -861,7 +861,7 @@ static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) int64 offsOut; CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsIn)); - CHKiRet(objDeserialize(ppUsr, OBJmsg, pThis->tVars.disk.pRead, NULL, NULL)); + CHKiRet(obj.Deserialize(ppUsr, OBJmsg, pThis->tVars.disk.pRead, NULL, NULL)); CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsOut)); /* This time it is a bit tricky: we free disk space only upon file deletion. So we need @@ -935,7 +935,7 @@ queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex) ISOBJ_TYPE_assert(pThis, queue); ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least once -- race? 2008-02-28 */ - dbgoprint((obj_t*) pThis, "ungetting user object %s\n", objGetName(pUsr)); + dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr)); BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex); iRet = queueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr); ++pThis->iUngottenObjs; /* indicate one more */ @@ -962,7 +962,7 @@ queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr) iRet = queueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr); --pThis->iUngottenObjs; /* indicate one less */ - dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", objGetName(*ppUsr)); + dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr)); RETiRet; } @@ -1652,7 +1652,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ /* create worker thread pools for regular operation. The DA pool is created on an as-needed * basis, which potentially means never under most circumstances. */ - lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", objGetName((obj_t*) pThis)); + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg)); @@ -1759,12 +1759,12 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) * queue is re-created. Well, we'll also save the current queue type, just so that * we know when somebody has changed the queue type... -- rgerhards, 2008-01-11 */ - CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis)); + CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis)); objSerializeSCALAR(psQIF, iQueueSize, INT); objSerializeSCALAR(psQIF, iUngottenObjs, INT); objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64); objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64); - CHKiRet(objEndSerialize(psQIF)); + CHKiRet(obj.EndSerialize(psQIF)); /* now we must persist all objects on the ungotten queue - they can not go to * to the regular files. -- rgerhards, 2008-01-29 @@ -2110,7 +2110,10 @@ finalize_it: * before anything else is called inside this class. * rgerhards, 2008-01-09 */ -BEGINObjClassInit(queue, 1) +BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE) + /* request objects we use */ + + /* now set our own handlers */ OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty); ENDObjClassInit(queue) |