summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--msg.c2
-rw-r--r--obj.c54
-rw-r--r--obj.h1
-rw-r--r--queue.c71
-rw-r--r--rsyslog.h2
-rw-r--r--stream.c42
-rw-r--r--stream.h2
7 files changed, 158 insertions, 16 deletions
diff --git a/msg.c b/msg.c
index dd635f58..9825ea77 100644
--- a/msg.c
+++ b/msg.c
@@ -394,7 +394,9 @@ static rsRetVal MsgSerialize(msg_t *pThis, strm_t *pStrm)
assert(pThis != NULL);
assert(pStrm != NULL);
+dbgprintf("MsgSerialize\n");
CHKiRet(objBeginSerialize(pStrm, (obj_t*) pThis));
+dbgprintf("post MsgSerialize\n");
objSerializeSCALAR(pStrm, iProtocolVersion, SHORT);
objSerializeSCALAR(pStrm, iSeverity, SHORT);
objSerializeSCALAR(pStrm, iFacility, SHORT);
diff --git a/obj.c b/obj.c
index e345a5d8..3d4acb06 100644
--- a/obj.c
+++ b/obj.c
@@ -89,6 +89,7 @@ rsRetVal objInfoConstruct(objInfo_t **ppThis, objID_t objID, uchar *pszName, int
pThis->pszName = pszName;
pThis->iObjVers = iObjVers;
+fprintf(stderr, "objid %d set for %s\n", objID, pszName);
pThis->objID = objID;
pThis->objMethods[0] = pConstruct;
@@ -109,7 +110,6 @@ rsRetVal objInfoSetMethod(objInfo_t *pThis, objMethod_t objMethod, rsRetVal (*pH
{
assert(pThis != NULL);
assert(objMethod > 0 && objMethod < OBJ_NUM_METHODS);
-
pThis->objMethods[objMethod] = pHandler;
return RS_RET_OK;
@@ -164,6 +164,7 @@ rsRetVal objBeginSerialize(strm_t *pStrm, obj_t *pObj)
{
DEFiRet;
+dbgprintf("objBeginSerialize obj type: %x\n", objGetObjID(pStrm));
ISOBJ_TYPE_assert(pStrm, strm);
ISOBJ_assert(pObj);
@@ -212,6 +213,7 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t prop
assert(pszPropName != NULL);
/*dbgprintf("objSerializeProp: strm %p, propName '%s', type %d, pUsr %p\n", pStrm, pszPropName, propType, pUsr);*/
+ dbgprintf("objSerializeProp: strm %p, propName '%s', type %d, pUsr %p\n", pStrm, pszPropName, propType, pUsr);
/* if we have no user pointer, there is no need to write this property.
* TODO: think if that's the righ point of view
* rgerhards, 2008-01-06
@@ -586,13 +588,14 @@ finalize_it:
* of the trailer. Header must already have been processed.
* rgerhards, 2008-01-11
*/
-rsRetVal objDeserializeProperties(obj_t *pObj, objID_t oID, strm_t *pStrm)
+static rsRetVal objDeserializeProperties(obj_t *pObj, objID_t oID, strm_t *pStrm)
{
DEFiRet;
property_t propBuf;
ISOBJ_assert(pObj);
ISOBJ_TYPE_assert(pStrm, strm);
+ assert(oID > 0 && oID < OBJ_NUM_IDS);
iRet = objDeserializeProperty(&propBuf, pStrm);
while(iRet == RS_RET_OK) {
@@ -627,7 +630,7 @@ rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pStrm)
assert(ppObj != NULL);
assert(objTypeExpected > 0 && objTypeExpected < OBJ_NUM_IDS);
- assert(pStrm != NULL);
+ ISOBJ_TYPE_assert(pStrm, strm);
/* we de-serialize the header. if all goes well, we are happy. However, if
* we experience a problem, we try to recover. We do this by skipping to
@@ -662,6 +665,50 @@ finalize_it:
}
+/* De-Serialize an object, but treat it as property bag.
+ * rgerhards, 2008-01-11
+ */
+rsRetVal objDeserializeObjAsPropBag(obj_t *pObj, strm_t *pStrm)
+{
+ DEFiRet;
+ rsRetVal iRetLocal;
+ objID_t oID = 0; /* this assignment is just to supress a compiler warning - this saddens me */
+ int oVers = 0; /* after all, it is totally useless but takes up some execution time... */
+
+dbgprintf("objDese...AsPropBag 0\n");
+ ISOBJ_assert(pObj);
+dbgprintf("objDese...AsPropBag 0a\n");
+ ISOBJ_TYPE_assert(pStrm, strm);
+dbgprintf("objDese...AsPropBag 1\n");
+
+ /* we de-serialize the header. if all goes well, we are happy. However, if
+ * we experience a problem, we try to recover. We do this by skipping to
+ * the next object header. This is defined via the line-start cookies. In
+ * worst case, we exhaust the queue, but then we receive EOF return state
+ * from objDeserializeTryRecover(), what will cause us to ultimately give up.
+ * rgerhards, 2008-07-08
+ */
+ do {
+ iRetLocal = objDeserializeHeader((uchar*) "Obj", &oID, &oVers, pStrm);
+ if(iRetLocal != RS_RET_OK) {
+ dbgprintf("objDeserializeObjAsPropBag error %d during header - trying to recover\n", iRetLocal);
+ CHKiRet(objDeserializeTryRecover(pStrm));
+ }
+ } while(iRetLocal != RS_RET_OK);
+
+dbgprintf("objDese...AsPropBag 2\n");
+ if(oID != objGetObjID(pObj))
+ ABORT_FINALIZE(RS_RET_INVALID_OID);
+
+ /* we got the object, now we need to fill the properties */
+ CHKiRet(objDeserializeProperties(pObj, oID, pStrm));
+
+finalize_it:
+ return iRet;
+}
+
+
+
/* De-Serialize an object property bag. As a property bag contains only partial properties,
* it is not instanciable. Thus, the caller must provide a pointer of an already-instanciated
* object of the correct type.
@@ -719,6 +766,7 @@ rsRetVal objRegisterObj(objID_t oID, objInfo_t *pInfo)
DEFiRet;
assert(pInfo != NULL);
+ assert(arrObjInfo[oID] == NULL);
if(oID < 1 || oID > OBJ_NUM_IDS)
ABORT_FINALIZE(RS_RET_INVALID_OID);
diff --git a/obj.h b/obj.h
index 43f531b0..776d2030 100644
--- a/obj.h
+++ b/obj.h
@@ -72,6 +72,7 @@
/* the next macro MUST be called in Constructors: */
#ifndef NDEBUG /* this means if debug... */
# define objConstructSetObjInfo(pThis) \
+ assert(pThis->pObjInfo == NULL); \
((obj_t*) (pThis))->pObjInfo = pObjInfoOBJ; \
((obj_t*) (pThis))->iObjCooCKiE = 0xBADEFEE
#else
diff --git a/queue.c b/queue.c
index 99919ca2..b31173d9 100644
--- a/queue.c
+++ b/queue.c
@@ -479,6 +479,68 @@ queueWorker(void *arg)
pthread_exit(0);
}
+
+/* This method checks if there is any persistent information on the
+ * queue and, if so, tries to load it. This method can only legally be
+ * called from the destructor (I moved it out from there to keep the
+ * Constructor code somewhat smaller). -- rgerhards, 2008-01-11
+ */
+static rsRetVal
+queueTryLoadPersistedInfo(queue_t *pThis)
+{
+ DEFiRet;
+ strm_t *psQIF = NULL;
+ uchar pszQIFNam[MAXFNAME];
+ size_t lenQIFNam;
+ struct stat stat_buf;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+ /* Construct file name */
+ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
+ (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
+
+ /* check if the file exists */
+dbgprintf("stat '%s'\n", pszQIFNam);
+ if(stat((char*) pszQIFNam, &stat_buf) == -1) {
+ if(errno == ENOENT) {
+ dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis));
+ ABORT_FINALIZE(RS_RET_OK);
+ } else {
+ dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno);
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ }
+ }
+
+ /* If we reach this point, we have a .qi file */
+
+ CHKiRet(strmConstruct(&psQIF));
+ CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
+ CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_READ));
+ CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE));
+ CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam));
+ CHKiRet(strmConstructFinalize(psQIF));
+
+ /* first, we try to read the property bag for ourselfs */
+ CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF));
+
+ /* and now the stream objects (some order as when persisted!) */
+ CHKiRet(objDeserializeObjAsPropBag(pThis->tVars.disk.pWrite, psQIF));
+ CHKiRet(objDeserializeObjAsPropBag(pThis->tVars.disk.pRead, psQIF));
+
+finalize_it:
+ if(psQIF != NULL)
+ strmDestruct(psQIF);
+
+ if(iRet != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx: error %d reading .qi file - can not start queue\n",
+ queueGetID(pThis), iRet);
+ }
+
+ return iRet;
+}
+
+
/* Constructor for the queue object
* This constructs the data structure, but does not yet start the queue. That
* is done by queueStart(). The reason is that we want to give the caller a chance
@@ -561,7 +623,7 @@ finalize_it:
/* start up the queue - it must have been constructed and parameters defined
* before.
*/
-rsRetVal queueStart(queue_t *pThis)
+rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
int iState;
@@ -569,6 +631,9 @@ rsRetVal queueStart(queue_t *pThis)
assert(pThis != NULL);
+ /* and now check if there is some persistent information that needs to be read in */
+ CHKiRet(queueTryLoadPersistedInfo(pThis));
+
dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", (unsigned long) pThis, pThis->qType,
pThis->iMaxFileSize);
@@ -650,10 +715,12 @@ static rsRetVal queuePersist(queue_t *pThis)
objSerializeSCALAR_VAR(psQIF, qType, INT, i);
objSerializeSCALAR(psQIF, iQueueSize, INT);
CHKiRet(objEndSerialize(psQIF));
+dbgprintf("queue serial 1\n");
/* this is disk specific and must be moved to a function */
CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF));
+dbgprintf("queue serial 2\n");
/* persist queue object itself */
@@ -852,7 +919,7 @@ BEGINObjClassInit(queue, 1)
//OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize);
OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
//OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize);
-ENDObjClassInit(strm)
+ENDObjClassInit(queue)
/*
* vi:set ai:
diff --git a/rsyslog.h b/rsyslog.h
index 93f6fe06..2937db88 100644
--- a/rsyslog.h
+++ b/rsyslog.h
@@ -107,6 +107,8 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_FILE_PREFIX_MISSING = -2036, /**< a required file prefix (parameter?) is missing */
RS_RET_INVALID_HEADER_RECTYPE = -2037, /**< invalid record type in header or invalid header */
RS_RET_QTYPE_MISMATCH = -2038, /**< different qType when reading back a property type */
+ RS_RET_NO_FILE_ACCESS = -2039, /**< covers EACCES error on file open() */
+ RS_RET_FILE_NOT_FOUND = -2040, /**< file not found */
RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */
RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */
RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */
diff --git a/stream.c b/stream.c
index 405f858f..59067041 100644
--- a/stream.c
+++ b/stream.c
@@ -88,6 +88,15 @@ static rsRetVal strmOpenFile(strm_t *pThis)
iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND;
pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode);
+ if(pThis->fd == -1) {
+ int ierrnoSave = errno;
+ dbgprintf("Stream 0x%lx: open error %d\n", (unsigned long) pThis, errno);
+ if(ierrnoSave == ENOENT)
+ ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
+ else
+ ABORT_FINALIZE(RS_RET_IO_ERROR);
+ }
+
pThis->iCurrOffs = 0;
dbgprintf("Stream 0x%lx: opened file '%s' for %s (0x%x) as %d\n", (unsigned long) pThis,
@@ -621,28 +630,39 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
int i;
long l;
- assert(pThis != NULL);
- assert(pStrm != NULL);
+ ISOBJ_TYPE_assert(pThis, strm);
+ ISOBJ_TYPE_assert(pStrm, strm);
+dbgprintf("strmSerialize 1\n");
CHKiRet(objBeginSerialize(pStrm, (obj_t*) pThis));
- i = pThis->sType;
- objSerializeSCALAR_VAR(pStrm, sType, INT, i);
+dbgprintf("strmSerialize 2\n");
objSerializeSCALAR(pStrm, iCurrFNum, INT);
objSerializePTR(pStrm, pszFName, PSZ);
+ objSerializeSCALAR(pStrm, iMaxFiles, INT);
+ objSerializeSCALAR(pStrm, iFileNumDigits, INT);
+ objSerializeSCALAR(pStrm, bDeleteOnClose, INT);
+
+ i = pThis->sType;
+ objSerializeSCALAR_VAR(pStrm, sType, INT, i);
+
i = pThis->tOperationsMode;
objSerializeSCALAR_VAR(pStrm, tOperationsMode, INT, i);
+
i = pThis->tOpenMode;
objSerializeSCALAR_VAR(pStrm, tOpenMode, INT, i);
- l = (long) pThis->iMaxFileSize;
- objSerializeSCALAR_VAR(pStrm, iMaxFileSize, LONG, l);
- objSerializeSCALAR(pStrm, iMaxFiles, INT);
- objSerializeSCALAR(pStrm, iFileNumDigits, INT);
- objSerializeSCALAR(pStrm, bDeleteOnClose, INT);
+
+ l = (long) pThis->iCurrOffs;
+ objSerializeSCALAR_VAR(pStrm, iCurrOffs, LONG, l);
+
+ // TODO: really serialize?
+ //l = (long) pThis->iMaxFileSize;
+ //objSerializeSCALAR_VAR(pStrm, iMaxFileSize, LONG, l);
CHKiRet(objEndSerialize(pStrm));
finalize_it:
+dbgprintf("strmSerialize out %d\n", iRet);
return iRet;
}
@@ -656,7 +676,7 @@ rsRetVal strmSetProperty(strm_t *pThis, property_t *pProp)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, Msg);
+ ISOBJ_TYPE_assert(pThis, strm);
assert(pProp != NULL);
if(isProp("sType")) {
@@ -669,6 +689,8 @@ rsRetVal strmSetProperty(strm_t *pThis, property_t *pProp)
CHKiRet(strmSettOperationsMode(pThis, pProp->val.vInt));
} else if(isProp("tOpenMode")) {
CHKiRet(strmSettOpenMode(pThis, pProp->val.vInt));
+ } else if(isProp("iCurrOffs")) {
+ pThis->iCurrOffs = pProp->val.vLong;
} else if(isProp("iMaxFileSize")) {
CHKiRet(strmSetiMaxFileSize(pThis, pProp->val.vLong));
} else if(isProp("iMaxFiles")) {
diff --git a/stream.h b/stream.h
index e48d354c..d5b207ce 100644
--- a/stream.h
+++ b/stream.h
@@ -73,12 +73,12 @@ typedef struct strm_s {
int iMaxFiles; /* maximum number of files if a circular mode is in use */
int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */
int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */
+ size_t iCurrOffs;/* current offset */
/* dynamic properties, valid only during file open, not to be persistet */
size_t sIOBufSize;/* size of IO buffer */
uchar *pszDir; /* Directory */
int lenDir;
int fd; /* the file descriptor, -1 if closed */
- size_t iCurrOffs;/* current offset */
uchar *pszCurrFName; /* name of current file (if open) */
uchar *pIOBuf; /* io Buffer */
size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */