summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c31
1 files changed, 17 insertions, 14 deletions
diff --git a/queue.c b/queue.c
index 1020dfcc..a6bcff9f 100644
--- a/queue.c
+++ b/queue.c
@@ -253,8 +253,8 @@ queueStartDA(queue_t *pThis)
CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
/* give it a name */
- snprintf((char*) pszDAQName, sizeof(pszDAQName)/sizeof(uchar), "%s[DA]", objGetName((obj_t*) pThis));
- objSetName((obj_t*) pThis->pqDA, pszDAQName);
+ snprintf((char*) pszDAQName, sizeof(pszDAQName)/sizeof(uchar), "%s[DA]", obj.GetName((obj_t*) pThis));
+ obj.SetName((obj_t*) pThis->pqDA, pszDAQName);
/* as the created queue is the same object class, we take the
* liberty to access its properties directly.
@@ -337,7 +337,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
* rgerhards, 2008-01-24
*/
if(pThis->pWtpDA == NULL) {
- lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", objGetName((obj_t*) pThis));
+ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA));
@@ -720,7 +720,7 @@ queueTryLoadPersistedInfo(queue_t *pThis)
CHKiRet(strmConstructFinalize(psQIF));
/* first, we try to read the property bag for ourselfs */
- CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF));
+ CHKiRet(obj.DeserializePropBag((obj_t*) pThis, psQIF));
/* then the ungotten object queue */
iUngottenObjs = pThis->iUngottenObjs;
@@ -728,15 +728,15 @@ queueTryLoadPersistedInfo(queue_t *pThis)
while(iUngottenObjs > 0) {
/* fill the queue from disk */
- CHKiRet(objDeserialize((void*) &pUsr, OBJmsg, psQIF, NULL, NULL));
+ CHKiRet(obj.Deserialize((void*) &pUsr, OBJmsg, psQIF, NULL, NULL));
queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
--iUngottenObjs; /* one less */
}
/* and now the stream objects (some order as when persisted!) */
- CHKiRet(objDeserialize(&pThis->tVars.disk.pWrite, OBJstrm, psQIF,
+ CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, OBJstrm, psQIF,
(rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
- CHKiRet(objDeserialize(&pThis->tVars.disk.pRead, OBJstrm, psQIF,
+ CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, OBJstrm, psQIF,
(rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
@@ -861,7 +861,7 @@ static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
int64 offsOut;
CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsIn));
- CHKiRet(objDeserialize(ppUsr, OBJmsg, pThis->tVars.disk.pRead, NULL, NULL));
+ CHKiRet(obj.Deserialize(ppUsr, OBJmsg, pThis->tVars.disk.pRead, NULL, NULL));
CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsOut));
/* This time it is a bit tricky: we free disk space only upon file deletion. So we need
@@ -935,7 +935,7 @@ queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex)
ISOBJ_TYPE_assert(pThis, queue);
ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least once -- race? 2008-02-28 */
- dbgoprint((obj_t*) pThis, "ungetting user object %s\n", objGetName(pUsr));
+ dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr));
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
iRet = queueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
++pThis->iUngottenObjs; /* indicate one more */
@@ -962,7 +962,7 @@ queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr)
iRet = queueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
--pThis->iUngottenObjs; /* indicate one less */
- dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", objGetName(*ppUsr));
+ dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr));
RETiRet;
}
@@ -1652,7 +1652,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
/* create worker thread pools for regular operation. The DA pool is created on an as-needed
* basis, which potentially means never under most circumstances.
*/
- lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", objGetName((obj_t*) pThis));
+ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg));
@@ -1759,12 +1759,12 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
* queue is re-created. Well, we'll also save the current queue type, just so that
* we know when somebody has changed the queue type... -- rgerhards, 2008-01-11
*/
- CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis));
+ CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
objSerializeSCALAR(psQIF, iUngottenObjs, INT);
objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64);
objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64);
- CHKiRet(objEndSerialize(psQIF));
+ CHKiRet(obj.EndSerialize(psQIF));
/* now we must persist all objects on the ungotten queue - they can not go to
* to the regular files. -- rgerhards, 2008-01-29
@@ -2110,7 +2110,10 @@ finalize_it:
* before anything else is called inside this class.
* rgerhards, 2008-01-09
*/
-BEGINObjClassInit(queue, 1)
+BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE)
+ /* request objects we use */
+
+ /* now set our own handlers */
OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
ENDObjClassInit(queue)