summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-06-04 12:15:59 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-06-04 12:15:59 +0200
commit9e434f19a9baa4a6f411808b5cb6bc22d6a32781 (patch)
tree36b6ee12c665a9eb781e1984d479002bf4680ef1 /runtime/queue.c
parent768836ab79268e1091fc3af8bb920c03287db91a (diff)
downloadrsyslog-9e434f19a9baa4a6f411808b5cb6bc22d6a32781.tar.gz
rsyslog-9e434f19a9baa4a6f411808b5cb6bc22d6a32781.tar.xz
rsyslog-9e434f19a9baa4a6f411808b5cb6bc22d6a32781.zip
cleaned up stream class ...
... and also made it callable via an rsyslog interface rather then relying on the OS loader (important if we go for using it inside loadbale modules, which we soon possible will)
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c90
1 files changed, 46 insertions, 44 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 4e017e84..3f14b535 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -59,6 +59,7 @@
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(glbl)
+DEFobjCurrIf(strm)
/* forward-definitions */
rsRetVal qqueueChkPersist(qqueue_t *pThis);
@@ -667,7 +668,7 @@ qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pTh
DEFiRet;
ISOBJ_TYPE_assert(pStrm, strm);
ISOBJ_TYPE_assert(pThis, qqueue);
- CHKiRet(strmSetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
finalize_it:
RETiRet;
}
@@ -744,11 +745,11 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
/* If we reach this point, we have a .qi file */
- CHKiRet(strmConstruct(&psQIF));
- CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_READ));
- CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE));
- CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam));
- CHKiRet(strmConstructFinalize(psQIF));
+ CHKiRet(strm.Construct(&psQIF));
+ CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_READ));
+ CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE));
+ CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam));
+ CHKiRet(strm.ConstructFinalize(psQIF));
/* first, we try to read the property bag for ourselfs */
CHKiRet(obj.DeserializePropBag((obj_t*) pThis, psQIF));
@@ -770,8 +771,8 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
- CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
- CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead));
+ CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pWrite));
+ CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pRead));
/* OK, we could successfully read the file, so we now can request that it be
* deleted when we are done with the persisted information.
@@ -780,7 +781,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
finalize_it:
if(psQIF != NULL)
- strmDestruct(&psQIF);
+ strm.Destruct(&psQIF);
if(iRet != RS_RET_OK) {
dbgoprint((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n",
@@ -815,24 +816,24 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
if(bRestarted == 1) {
;
} else {
- CHKiRet(strmConstruct(&pThis->tVars.disk.pWrite));
- CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
- CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
- CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
- CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
- CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite));
+ CHKiRet(strm.Construct(&pThis->tVars.disk.pWrite));
+ CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
+ CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
+ CHKiRet(strm.SetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pWrite));
- CHKiRet(strmConstruct(&pThis->tVars.disk.pRead));
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
- CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
- CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000));
- CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ));
- CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR));
- CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead));
+ CHKiRet(strm.Construct(&pThis->tVars.disk.pRead));
+ CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
+ CHKiRet(strm.SetDir(pThis->tVars.disk.pRead, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pRead, 10000000));
+ CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ));
+ CHKiRet(strm.SetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pRead));
- CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix));
- CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(strm.SetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(strm.SetFName(pThis->tVars.disk.pRead, pThis->pszFilePrefix, pThis->lenFilePrefix));
}
/* now we set (and overwrite in case of a persisted restart) some parameters which
@@ -840,8 +841,8 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
* for example file name generation must not be changed as that would break the
* ability to read existing queue files. -- rgerhards, 2008-01-12
*/
- CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize));
- CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize));
+ CHKiRet(strm.SetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize));
+ CHKiRet(strm.SetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize));
finalize_it:
RETiRet;
@@ -854,8 +855,8 @@ static rsRetVal qDestructDisk(qqueue_t *pThis)
ASSERT(pThis != NULL);
- strmDestruct(&pThis->tVars.disk.pWrite);
- strmDestruct(&pThis->tVars.disk.pRead);
+ strm.Destruct(&pThis->tVars.disk.pWrite);
+ strm.Destruct(&pThis->tVars.disk.pRead);
RETiRet;
}
@@ -867,10 +868,10 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
ASSERT(pThis != NULL);
- CHKiRet(strmSetWCntr(pThis->tVars.disk.pWrite, &nWriteCount));
+ CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, &nWriteCount));
CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite));
- CHKiRet(strmFlush(pThis->tVars.disk.pWrite));
- CHKiRet(strmSetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */
+ CHKiRet(strm.Flush(pThis->tVars.disk.pWrite));
+ CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */
pThis->tVars.disk.sizeOnDisk += nWriteCount;
@@ -894,9 +895,9 @@ static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr)
int64 offsIn;
int64 offsOut;
- CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsIn));
+ CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pRead, &offsIn));
CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pRead, NULL, NULL));
- CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsOut));
+ CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pRead, &offsOut));
/* This time it is a bit tricky: we free disk space only upon file deletion. So we need
* to keep track of what we have read until we get an out-offset that is lower than the
@@ -1917,16 +1918,16 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
pThis->bNeedDelQIF = 0;
}
/* indicate spool file needs to be deleted */
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
+ CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
FINALIZE; /* nothing left to do, so be happy */
}
- CHKiRet(strmConstruct(&psQIF));
- CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE));
- CHKiRet(strmSetiAddtlOpenFlags(psQIF, O_TRUNC));
- CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE));
- CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam));
- CHKiRet(strmConstructFinalize(psQIF));
+ CHKiRet(strm.Construct(&psQIF));
+ CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE));
+ CHKiRet(strm.SetiAddtlOpenFlags(psQIF, O_TRUNC));
+ CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE));
+ CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam));
+ CHKiRet(strm.ConstructFinalize(psQIF));
/* first, write the property bag for ourselfs
* And, surprisingly enough, we currently need to persist only the size of the
@@ -1951,14 +1952,14 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
}
/* now persist the stream info */
- CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
- CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF));
+ CHKiRet(strm.Serialize(pThis->tVars.disk.pWrite, psQIF));
+ CHKiRet(strm.Serialize(pThis->tVars.disk.pRead, psQIF));
/* tell the input file object that it must not delete the file on close if the queue
* is non-empty - but only if we are not during a simple checkpoint
*/
if(bIsCheckpoint != QUEUE_CHECKPOINT) {
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
+ CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
}
/* we have persisted the queue object. So whenever it comes to an empty queue,
@@ -1968,7 +1969,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
finalize_it:
if(psQIF != NULL)
- strmDestruct(&psQIF);
+ strm.Destruct(&psQIF);
RETiRet;
}
@@ -2340,6 +2341,7 @@ rsRetVal qqueueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(strm, CORE_COMPONENT));
/* now set our own handlers */
OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty);