summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-11 14:12:25 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-11 14:12:25 +0000
commit8dad3997505f71e6e9962892f79d7b7dad0a89ce (patch)
treece49da850c9d4deaf6143ef2232766e8c1ccdd35
parente095d1ab45b205b4849151b15592c2824f04373a (diff)
downloadrsyslog-8dad3997505f71e6e9962892f79d7b7dad0a89ce.tar.gz
rsyslog-8dad3997505f71e6e9962892f79d7b7dad0a89ce.tar.xz
rsyslog-8dad3997505f71e6e9962892f79d7b7dad0a89ce.zip
file stream objects are now persistet on immediate queue shutdown (queue
itself is not yet fully persisted)
-rw-r--r--obj-types.h25
-rw-r--r--obj.c8
-rw-r--r--obj.h10
-rw-r--r--queue.c90
-rw-r--r--queue.h3
-rw-r--r--stream.c43
-rw-r--r--stream.h10
7 files changed, 161 insertions, 28 deletions
diff --git a/obj-types.h b/obj-types.h
index 6da56db0..ca7e1c67 100644
--- a/obj-types.h
+++ b/obj-types.h
@@ -83,12 +83,34 @@ typedef struct objInfo_s {
typedef struct obj { /* the dummy struct that each derived class can be casted to */
objInfo_t *pObjInfo;
+#ifndef NDEBUG /* this means if debug... */
+ unsigned int iObjCooCKiE; /* must always be 0xBADEFEE for a valid object */
+#endif
} obj_t;
/* macros which must be gloablly-visible (because they are used during definition of
* other objects.
*/
-#define BEGINobjInstance objInfo_t *pObjInfo
+#ifndef NDEBUG /* this means if debug... */
+# define BEGINobjInstance \
+ objInfo_t *pObjInfo; \
+ unsigned int iObjCooCKiE; /* prevent name conflict, thus the strange name */
+# define ISOBJ_assert(pObj) \
+ { \
+ assert(pObj != NULL); \
+ assert((unsigned) pObj->iObjCooCKiE == (unsigned) 0xBADEFEE); \
+ }
+# define ISOBJ_TYPE_assert(pObj, objType) \
+ { \
+ assert(pObj != NULL); \
+ assert((unsigned) pObj->iObjCooCKiE == (unsigned) 0xBADEFEE); \
+ assert(objGetObjID(pObj) == OBJ##objType); \
+ }
+#else /* non-debug mode, no checks but much faster */
+# define BEGINobjInstance objInfo_t *pObjInfo;
+# define ISOBJ_TYPE_assert(pObj, objType)
+# define ISOBJ_assert(pObj, objType)
+#endif
#define DEFpropSetMeth(obj, prop, dataType)\
rsRetVal obj##Set##prop(obj##_t *pThis, dataType pVal)\
@@ -135,6 +157,7 @@ finalize_it: \
if((pThis = (obj##_t *)calloc(1, sizeof(obj##_t))) == NULL) { \
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); \
} \
+ objConstructSetObjInfo(pThis); \
\
obj##Initialize(pThis); \
\
diff --git a/obj.c b/obj.c
index 938e1ead..97fb3373 100644
--- a/obj.c
+++ b/obj.c
@@ -125,7 +125,7 @@ static rsRetVal objSerializeHeader(strm_t *pStrm, obj_t *pObj)
DEFiRet;
assert(pStrm != NULL);
- assert(pObj != NULL);
+ ISOBJ_assert(pObj);
/* object cookie and serializer version (so far always 1) */
CHKiRet(strmWriteChar(pStrm, COOKIE_OBJLINE));
@@ -186,6 +186,7 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t prop
assert(pStrm != NULL);
assert(pszPropName != NULL);
+ dbgprintf("objSerializeProp: strm %p, propName '%s', type %d, pUsr %p\n", pStrm, pszPropName, propType, pUsr);
/* if we have no user pointer, there is no need to write this property.
* TODO: think if that's the righ point of view
* rgerhards, 2008-01-06
@@ -194,6 +195,8 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t prop
ABORT_FINALIZE(RS_RET_OK);
}
+ /* TODO: use the stream functions for data conversion here - should be quicker */
+
switch(propType) {
case PROPTYPE_PSZ:
pszBuf = (uchar*) pUsr;
@@ -413,7 +416,6 @@ static rsRetVal objDeserializeHeader(objID_t *poID, int* poVers, strm_t *pStrm)
*poVers = oVers;
finalize_it:
-dbgprintf("DeserializeHeader oid: %ld, vers: %ld, iRet: %d\n", ioID, oVers, iRet);
return iRet;
}
@@ -632,7 +634,7 @@ rsRetVal objRegisterObj(objID_t oID, objInfo_t *pInfo)
assert(pInfo != NULL);
if(oID < 1 || oID > OBJ_NUM_IDS)
ABORT_FINALIZE(RS_RET_INVALID_OID);
-
+
arrObjInfo[oID] = pInfo;
finalize_it:
diff --git a/obj.h b/obj.h
index 31a08004..ae395a53 100644
--- a/obj.h
+++ b/obj.h
@@ -69,8 +69,14 @@
#define objGetName(pThis) (((obj_t*) (pThis))->pObjInfo->pszName)
#define objGetObjID(pThis) (((obj_t*) (pThis))->pObjInfo->objID)
#define objGetVersion(pThis) (((obj_t*) (pThis))->pObjInfo->iObjVers)
-/* must be called in Constructor: */
-#define objConstructSetObjInfo(pThis) ((obj_t*) (pThis))->pObjInfo = pObjInfoOBJ;
+/* the next macro MUST be called in Constructors: */
+#ifndef NDEBUG /* this means if debug... */
+# define objConstructSetObjInfo(pThis) \
+ ((obj_t*) (pThis))->pObjInfo = pObjInfoOBJ; \
+ ((obj_t*) (pThis))->iObjCooCKiE = 0xBADEFEE
+#else
+# define objConstructSetObjInfo(pThis) ((obj_t*) (pThis))->pObjInfo = pObjInfoOBJ
+#endif
#define objDestruct(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_DESTRUCT])(pThis)
#define objSerialize(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_SERIALIZE])
diff --git a/queue.c b/queue.c
index b582fb13..0d59a068 100644
--- a/queue.c
+++ b/queue.c
@@ -28,6 +28,7 @@
*/
#include "config.h"
+#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
@@ -206,6 +207,7 @@ static rsRetVal qConstructDisk(queue_t *pThis)
CHKiRet(strmSetDir(pThis->tVars.disk.pWrite, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
+ CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite));
CHKiRet(strmConstruct(&pThis->tVars.disk.pRead));
@@ -213,6 +215,7 @@ static rsRetVal qConstructDisk(queue_t *pThis)
CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000));
CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ));
+ CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR));
CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead));
finalize_it:
@@ -584,6 +587,70 @@ finalize_it:
return iRet;
}
+
+#if 0
+/* persist disk status on disk. This is necessary if we run either
+ * a disk queue or in a disk assisted mode.
+ */
+static rsRetVal queuePersistDskFilInfo(queue_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+
+
+finalize_it:
+ return iRet;
+}
+#endif
+
+
+
+/* persist the queue to disk. If we have something to persist, we first
+ * save the information on the queue properties itself and then we call
+ * the queue-type specific drivers.
+ * rgerhards, 2008-01-10
+ */
+static rsRetVal queuePersist(queue_t *pThis)
+{
+ DEFiRet;
+ strm_t *psQIF; /* Queue Info File */
+ uchar pszQIFNam[MAXFNAME];
+ size_t lenQIFNam;
+
+ assert(pThis != NULL);
+ if(pThis->iQueueSize == 0)
+ FINALIZE; /* nothing left to do, so be happy */
+
+ dbgprintf("Queue 0x%lx: persisting queue to disk, %d entries...\n", queueGetID(pThis), pThis->iQueueSize);
+ /* Construct file name */
+ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
+ (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
+ CHKiRet(strmConstruct(&psQIF));
+ CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
+ CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE));
+ CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE));
+ CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam));
+ CHKiRet(strmConstructFinalize(psQIF));
+
+ /* this is disk specific and must be moved to a function */
+ CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
+ CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF));
+
+ /* persist queue object itself */
+
+ /* ready with the queue, now call driver to persist queue data */
+ //iRet = ;
+
+ /* the the input file object that it must not delete the file on close */
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
+
+finalize_it:
+ strmDestruct(psQIF);
+ return iRet;
+}
+
+
/* destructor for the queue object */
rsRetVal queueDestruct(queue_t *pThis)
{
@@ -598,6 +665,13 @@ rsRetVal queueDestruct(queue_t *pThis)
pThis->pWrkThrds = NULL;
}
+ /* now check if we need to persist the queue */
+ if(pThis->bImmediateShutdown) {
+ CHKiRet_Hdlr(queuePersist(pThis)) {
+ dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet);
+ }
+ }
+
/* ... then free resources */
pthread_mutex_destroy(pThis->mut);
free(pThis->mut);
@@ -608,6 +682,9 @@ rsRetVal queueDestruct(queue_t *pThis)
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
+ if(pThis->pszFilePrefix != NULL)
+ free(pThis->pszFilePrefix);
+
/* and finally delete the queue objet itself */
free(pThis);
@@ -624,9 +701,18 @@ rsRetVal
queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
{
DEFiRet;
+
+ if(pThis->pszFilePrefix != NULL)
+ free(pThis->pszFilePrefix);
+
+ if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1);
+ pThis->lenFilePrefix = iLenPrefix;
+
if(pThis->qType == QUEUETYPE_DISK) {
- CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix));
- CHKiRet(strmSetFilePrefix(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix));
+ CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pszPrefix, iLenPrefix));
+ CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pszPrefix, iLenPrefix));
}
finalize_it:
return iRet;
diff --git a/queue.h b/queue.h
index 56a9daaa..56841faf 100644
--- a/queue.h
+++ b/queue.h
@@ -69,7 +69,7 @@ typedef enum {
typedef struct qWrkThrd_s {
pthread_t thrdID; /* thread ID */
- volatile qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
+ qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
} qWrkThrd_t; /* type for queue worker threads */
/* the queue object */
@@ -128,5 +128,6 @@ rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*));
PROTOTYPEpropSetMeth(queue, bImmediateShutdown, int);
+#define queueGetID(pThis) ((unsigned long) pThis)
#endif /* #ifndef QUEUE_H_INCLUDED */
diff --git a/stream.c b/stream.c
index 740ca9fb..7fb6aa74 100644
--- a/stream.c
+++ b/stream.c
@@ -47,7 +47,6 @@
/* static data */
DEFobjStaticHelpers
-
/* methods */
/* first, we define type-specific handlers. The provide a generic functionality,
@@ -74,8 +73,13 @@ static rsRetVal strmOpenFile(strm_t *pThis)
if(pThis->pszFName == NULL)
ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
- CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir,
- pThis->pszFName, pThis->lenFilePrefix, pThis->iCurrFNum, pThis->iFileNumDigits));
+ if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) {
+ CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir,
+ pThis->pszFName, pThis->lenFName, pThis->iCurrFNum, pThis->iFileNumDigits));
+ } else {
+ if((pThis->pszCurrFName = (uchar*) strdup((char*) pThis->pszFName)) == NULL)
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
/* compute which flags we need to provide to open */
if(pThis->tOperationsMode == STREAMMODE_READ)
@@ -274,7 +278,7 @@ BEGINobjConstruct(strm)
pThis->iCurrFNum = 1;
pThis->fd = -1;
pThis->iUngetC = -1;
- pThis->sType = STREAMTYPE_FILE;
+ pThis->sType = STREAMTYPE_FILE_SINGLE;
pThis->sIOBufSize = glblGetIOBufSize();
pThis->tOpenMode = 0600;
ENDobjConstruct(strm)
@@ -305,7 +309,10 @@ rsRetVal strmDestruct(strm_t *pThis)
{
DEFiRet;
- assert(pThis != NULL);
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ if(pThis->tOperationsMode == STREAMMODE_WRITE && pThis->iBufPtr > 0)
+ strmFlush(pThis);
/* ... then free resources */
if(pThis->fd != -1)
@@ -362,8 +369,8 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
CHKiRet(strmOpenFile(pThis));
iWritten = write(pThis->fd, pBuf, lenBuf);
- dbgprintf("Stream 0x%lx: write wrote %d bytes, errno: %d, err %s\n", (unsigned long) pThis,
- iWritten, errno, strerror(errno));
+ dbgprintf("Stream 0x%lx: write wrote %d bytes to file %d, errno: %d, err %s\n", (unsigned long) pThis,
+ iWritten, pThis->fd, errno, strerror(errno));
/* TODO: handle error case -- rgerhards, 2008-01-07 */
/* Now indicate buffer empty again. We do this in any case, because there
@@ -376,7 +383,9 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
*/
pThis->iBufPtr = 0;
pThis->iCurrOffs += iWritten;
- CHKiRet(strmCheckNextOutputFile(pThis));
+
+ if(pThis->sType == STREAMTYPE_FILE_CIRCULAR)
+ CHKiRet(strmCheckNextOutputFile(pThis));
finalize_it:
pThis->iBufPtr = 0; /* see comment above */
@@ -490,6 +499,7 @@ DEFpropSetMeth(strm, iMaxFileSize, int)
DEFpropSetMeth(strm, iFileNumDigits, int)
DEFpropSetMeth(strm, tOperationsMode, int)
DEFpropSetMeth(strm, tOpenMode, mode_t)
+DEFpropSetMeth(strm, sType, strmType_t);
rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal)
{
@@ -505,21 +515,21 @@ rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal)
* rgerhards, 2008-01-09
*/
rsRetVal
-strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
+strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName)
{
DEFiRet;
assert(pThis != NULL);
- assert(pszPrefix != NULL);
+ assert(pszName != NULL);
- if(iLenPrefix < 1)
+ if(iLenName < 1)
ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING);
- if((pThis->pszFName = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL)
+ if((pThis->pszFName = malloc(sizeof(uchar) * iLenName + 1)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- memcpy(pThis->pszFName, pszPrefix, iLenPrefix + 1); /* always think about the \0! */
- pThis->lenFilePrefix = iLenPrefix;
+ memcpy(pThis->pszFName, pszName, iLenName + 1); /* always think about the \0! */
+ pThis->lenFName = iLenName;
finalize_it:
return iRet;
@@ -616,6 +626,7 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
CHKiRet(objBeginSerialize(pStrm, (obj_t*) pThis));
+dbgprintf("strmSerialize in %p\n", pThis);
i = pThis->sType;
objSerializeSCALAR_VAR(pStrm, sType, INT, i);
objSerializeSCALAR(pStrm, iCurrFNum, INT);
@@ -624,7 +635,8 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
objSerializeSCALAR_VAR(pStrm, tOperationsMode, INT, i);
i = pThis->tOpenMode;
objSerializeSCALAR_VAR(pStrm, tOpenMode, INT, i);
- i = (long) pThis->iMaxFileSize;
+ l = (long) pThis->iMaxFileSize;
+dbgprintf("max file size %ld, l: %ld\n", pThis->iMaxFileSize, l);
objSerializeSCALAR_VAR(pStrm, iMaxFileSize, LONG, l);
objSerializeSCALAR(pStrm, iMaxFiles, INT);
objSerializeSCALAR(pStrm, iFileNumDigits, INT);
@@ -632,6 +644,7 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
CHKiRet(objEndSerialize(pStrm));
finalize_it:
+dbgprintf("strmSerialize out, iret %d\n", iRet);
return iRet;
}
diff --git a/stream.h b/stream.h
index ac02e66c..b1b93355 100644
--- a/stream.h
+++ b/stream.h
@@ -49,7 +49,8 @@
/* stream types */
typedef enum {
- STREAMTYPE_FILE = 0
+ STREAMTYPE_FILE_SINGLE = 0,
+ STREAMTYPE_FILE_CIRCULAR = 1
} strmType_t;
typedef enum {
@@ -65,7 +66,7 @@ typedef struct strm_s {
/* descriptive properties */
int iCurrFNum;/* current file number (NOT descriptor, but the number in the file name!) */
uchar *pszFName; /* prefix for generated filenames */
- int lenFilePrefix;
+ int lenFName;
strmMode_t tOperationsMode;
mode_t tOpenMode;
size_t iMaxFileSize;/* maximum size a file may grow to */
@@ -91,13 +92,13 @@ rsRetVal strmConstruct(strm_t **ppThis);
rsRetVal strmConstructFinalize(strm_t __attribute__((unused)) *pThis);
rsRetVal strmDestruct(strm_t *pThis);
rsRetVal strmSetMaxFileSize(strm_t *pThis, size_t iMaxFileSize);
-rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
+rsRetVal strmSetFileName(strm_t *pThis, uchar *pszName, size_t iLenName);
rsRetVal strmReadChar(strm_t *pThis, uchar *pC);
rsRetVal strmUnreadChar(strm_t *pThis, uchar c);
rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
rsRetVal strmWriteChar(strm_t *pThis, uchar c);
rsRetVal strmWriteLong(strm_t *pThis, long i);
-rsRetVal strmSetFilePrefix(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
+rsRetVal strmSetFName(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir);
rsRetVal strmFlush(strm_t *pThis);
rsRetVal strmRecordBegin(strm_t *pThis);
@@ -110,5 +111,6 @@ PROTOTYPEpropSetMeth(strm, iMaxFiles, int);
PROTOTYPEpropSetMeth(strm, iFileNumDigits, int);
PROTOTYPEpropSetMeth(strm, tOperationsMode, int);
PROTOTYPEpropSetMeth(strm, tOpenMode, mode_t);
+PROTOTYPEpropSetMeth(strm, sType, strmType_t);
#endif /* #ifndef STREAM_H_INCLUDED */