diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 90 |
1 files changed, 46 insertions, 44 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 4e017e84..3f14b535 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -59,6 +59,7 @@ /* static data */ DEFobjStaticHelpers DEFobjCurrIf(glbl) +DEFobjCurrIf(strm) /* forward-definitions */ rsRetVal qqueueChkPersist(qqueue_t *pThis); @@ -667,7 +668,7 @@ qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pTh DEFiRet; ISOBJ_TYPE_assert(pStrm, strm); ISOBJ_TYPE_assert(pThis, qqueue); - CHKiRet(strmSetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strm.SetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); finalize_it: RETiRet; } @@ -744,11 +745,11 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) /* If we reach this point, we have a .qi file */ - CHKiRet(strmConstruct(&psQIF)); - CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_READ)); - CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam)); - CHKiRet(strmConstructFinalize(psQIF)); + CHKiRet(strm.Construct(&psQIF)); + CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_READ)); + CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); + CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strm.ConstructFinalize(psQIF)); /* first, we try to read the property bag for ourselfs */ CHKiRet(obj.DeserializePropBag((obj_t*) pThis, psQIF)); @@ -770,8 +771,8 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF, (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); - CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite)); - CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead)); + CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pWrite)); + CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pRead)); /* OK, we could successfully read the file, so we now can request that it be * deleted when we are done with the persisted information. @@ -780,7 +781,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) finalize_it: if(psQIF != NULL) - strmDestruct(&psQIF); + strm.Destruct(&psQIF); if(iRet != RS_RET_OK) { dbgoprint((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n", @@ -815,24 +816,24 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) if(bRestarted == 1) { ; } else { - CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite)); - CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); - CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); - CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); - CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); - CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite)); + CHKiRet(strm.Construct(&pThis->tVars.disk.pWrite)); + CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); + CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); + CHKiRet(strm.SetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); + CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pWrite)); - CHKiRet(strmConstruct(&pThis->tVars.disk.pRead)); - CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); - CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); - CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000)); - CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ)); - CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR)); - CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead)); + CHKiRet(strm.Construct(&pThis->tVars.disk.pRead)); + CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); + CHKiRet(strm.SetDir(pThis->tVars.disk.pRead, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pRead, 10000000)); + CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ)); + CHKiRet(strm.SetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR)); + CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pRead)); - CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix)); - CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(strm.SetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(strm.SetFName(pThis->tVars.disk.pRead, pThis->pszFilePrefix, pThis->lenFilePrefix)); } /* now we set (and overwrite in case of a persisted restart) some parameters which @@ -840,8 +841,8 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) * for example file name generation must not be changed as that would break the * ability to read existing queue files. -- rgerhards, 2008-01-12 */ - CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize)); - CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize)); + CHKiRet(strm.SetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize)); + CHKiRet(strm.SetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize)); finalize_it: RETiRet; @@ -854,8 +855,8 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) ASSERT(pThis != NULL); - strmDestruct(&pThis->tVars.disk.pWrite); - strmDestruct(&pThis->tVars.disk.pRead); + strm.Destruct(&pThis->tVars.disk.pWrite); + strm.Destruct(&pThis->tVars.disk.pRead); RETiRet; } @@ -867,10 +868,10 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) ASSERT(pThis != NULL); - CHKiRet(strmSetWCntr(pThis->tVars.disk.pWrite, &nWriteCount)); + CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, &nWriteCount)); CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite)); - CHKiRet(strmFlush(pThis->tVars.disk.pWrite)); - CHKiRet(strmSetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */ + CHKiRet(strm.Flush(pThis->tVars.disk.pWrite)); + CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */ pThis->tVars.disk.sizeOnDisk += nWriteCount; @@ -894,9 +895,9 @@ static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr) int64 offsIn; int64 offsOut; - CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsIn)); + CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pRead, &offsIn)); CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pRead, NULL, NULL)); - CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsOut)); + CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pRead, &offsOut)); /* This time it is a bit tricky: we free disk space only upon file deletion. So we need * to keep track of what we have read until we get an out-offset that is lower than the @@ -1917,16 +1918,16 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) pThis->bNeedDelQIF = 0; } /* indicate spool file needs to be deleted */ - CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); + CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); FINALIZE; /* nothing left to do, so be happy */ } - CHKiRet(strmConstruct(&psQIF)); - CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE)); - CHKiRet(strmSetiAddtlOpenFlags(psQIF, O_TRUNC)); - CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam)); - CHKiRet(strmConstructFinalize(psQIF)); + CHKiRet(strm.Construct(&psQIF)); + CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE)); + CHKiRet(strm.SetiAddtlOpenFlags(psQIF, O_TRUNC)); + CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); + CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strm.ConstructFinalize(psQIF)); /* first, write the property bag for ourselfs * And, surprisingly enough, we currently need to persist only the size of the @@ -1951,14 +1952,14 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) } /* now persist the stream info */ - CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF)); - CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF)); + CHKiRet(strm.Serialize(pThis->tVars.disk.pWrite, psQIF)); + CHKiRet(strm.Serialize(pThis->tVars.disk.pRead, psQIF)); /* tell the input file object that it must not delete the file on close if the queue * is non-empty - but only if we are not during a simple checkpoint */ if(bIsCheckpoint != QUEUE_CHECKPOINT) { - CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0)); + CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pRead, 0)); } /* we have persisted the queue object. So whenever it comes to an empty queue, @@ -1968,7 +1969,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) finalize_it: if(psQIF != NULL) - strmDestruct(&psQIF); + strm.Destruct(&psQIF); RETiRet; } @@ -2340,6 +2341,7 @@ rsRetVal qqueueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; } BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(strm, CORE_COMPONENT)); /* now set our own handlers */ OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty); |