From 366060a51de60c717886636d6ef646bf1959972c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 11 Jan 2008 19:38:09 +0000 Subject: partial ability to read a disk queue back in (not completed, but would like to save source for the weekend) --- msg.c | 2 ++ obj.c | 54 +++++++++++++++++++++++++++++++++++++++++++++--- obj.h | 1 + queue.c | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- rsyslog.h | 2 ++ stream.c | 42 ++++++++++++++++++++++++++++--------- stream.h | 2 +- 7 files changed, 158 insertions(+), 16 deletions(-) diff --git a/msg.c b/msg.c index dd635f58..9825ea77 100644 --- a/msg.c +++ b/msg.c @@ -394,7 +394,9 @@ static rsRetVal MsgSerialize(msg_t *pThis, strm_t *pStrm) assert(pThis != NULL); assert(pStrm != NULL); +dbgprintf("MsgSerialize\n"); CHKiRet(objBeginSerialize(pStrm, (obj_t*) pThis)); +dbgprintf("post MsgSerialize\n"); objSerializeSCALAR(pStrm, iProtocolVersion, SHORT); objSerializeSCALAR(pStrm, iSeverity, SHORT); objSerializeSCALAR(pStrm, iFacility, SHORT); diff --git a/obj.c b/obj.c index e345a5d8..3d4acb06 100644 --- a/obj.c +++ b/obj.c @@ -89,6 +89,7 @@ rsRetVal objInfoConstruct(objInfo_t **ppThis, objID_t objID, uchar *pszName, int pThis->pszName = pszName; pThis->iObjVers = iObjVers; +fprintf(stderr, "objid %d set for %s\n", objID, pszName); pThis->objID = objID; pThis->objMethods[0] = pConstruct; @@ -109,7 +110,6 @@ rsRetVal objInfoSetMethod(objInfo_t *pThis, objMethod_t objMethod, rsRetVal (*pH { assert(pThis != NULL); assert(objMethod > 0 && objMethod < OBJ_NUM_METHODS); - pThis->objMethods[objMethod] = pHandler; return RS_RET_OK; @@ -164,6 +164,7 @@ rsRetVal objBeginSerialize(strm_t *pStrm, obj_t *pObj) { DEFiRet; +dbgprintf("objBeginSerialize obj type: %x\n", objGetObjID(pStrm)); ISOBJ_TYPE_assert(pStrm, strm); ISOBJ_assert(pObj); @@ -212,6 +213,7 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t prop assert(pszPropName != NULL); /*dbgprintf("objSerializeProp: strm %p, propName '%s', type %d, pUsr %p\n", pStrm, pszPropName, propType, pUsr);*/ + dbgprintf("objSerializeProp: strm %p, propName '%s', type %d, pUsr %p\n", pStrm, pszPropName, propType, pUsr); /* if we have no user pointer, there is no need to write this property. * TODO: think if that's the righ point of view * rgerhards, 2008-01-06 @@ -586,13 +588,14 @@ finalize_it: * of the trailer. Header must already have been processed. * rgerhards, 2008-01-11 */ -rsRetVal objDeserializeProperties(obj_t *pObj, objID_t oID, strm_t *pStrm) +static rsRetVal objDeserializeProperties(obj_t *pObj, objID_t oID, strm_t *pStrm) { DEFiRet; property_t propBuf; ISOBJ_assert(pObj); ISOBJ_TYPE_assert(pStrm, strm); + assert(oID > 0 && oID < OBJ_NUM_IDS); iRet = objDeserializeProperty(&propBuf, pStrm); while(iRet == RS_RET_OK) { @@ -627,7 +630,7 @@ rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pStrm) assert(ppObj != NULL); assert(objTypeExpected > 0 && objTypeExpected < OBJ_NUM_IDS); - assert(pStrm != NULL); + ISOBJ_TYPE_assert(pStrm, strm); /* we de-serialize the header. if all goes well, we are happy. However, if * we experience a problem, we try to recover. We do this by skipping to @@ -662,6 +665,50 @@ finalize_it: } +/* De-Serialize an object, but treat it as property bag. + * rgerhards, 2008-01-11 + */ +rsRetVal objDeserializeObjAsPropBag(obj_t *pObj, strm_t *pStrm) +{ + DEFiRet; + rsRetVal iRetLocal; + objID_t oID = 0; /* this assignment is just to supress a compiler warning - this saddens me */ + int oVers = 0; /* after all, it is totally useless but takes up some execution time... */ + +dbgprintf("objDese...AsPropBag 0\n"); + ISOBJ_assert(pObj); +dbgprintf("objDese...AsPropBag 0a\n"); + ISOBJ_TYPE_assert(pStrm, strm); +dbgprintf("objDese...AsPropBag 1\n"); + + /* we de-serialize the header. if all goes well, we are happy. However, if + * we experience a problem, we try to recover. We do this by skipping to + * the next object header. This is defined via the line-start cookies. In + * worst case, we exhaust the queue, but then we receive EOF return state + * from objDeserializeTryRecover(), what will cause us to ultimately give up. + * rgerhards, 2008-07-08 + */ + do { + iRetLocal = objDeserializeHeader((uchar*) "Obj", &oID, &oVers, pStrm); + if(iRetLocal != RS_RET_OK) { + dbgprintf("objDeserializeObjAsPropBag error %d during header - trying to recover\n", iRetLocal); + CHKiRet(objDeserializeTryRecover(pStrm)); + } + } while(iRetLocal != RS_RET_OK); + +dbgprintf("objDese...AsPropBag 2\n"); + if(oID != objGetObjID(pObj)) + ABORT_FINALIZE(RS_RET_INVALID_OID); + + /* we got the object, now we need to fill the properties */ + CHKiRet(objDeserializeProperties(pObj, oID, pStrm)); + +finalize_it: + return iRet; +} + + + /* De-Serialize an object property bag. As a property bag contains only partial properties, * it is not instanciable. Thus, the caller must provide a pointer of an already-instanciated * object of the correct type. @@ -719,6 +766,7 @@ rsRetVal objRegisterObj(objID_t oID, objInfo_t *pInfo) DEFiRet; assert(pInfo != NULL); + assert(arrObjInfo[oID] == NULL); if(oID < 1 || oID > OBJ_NUM_IDS) ABORT_FINALIZE(RS_RET_INVALID_OID); diff --git a/obj.h b/obj.h index 43f531b0..776d2030 100644 --- a/obj.h +++ b/obj.h @@ -72,6 +72,7 @@ /* the next macro MUST be called in Constructors: */ #ifndef NDEBUG /* this means if debug... */ # define objConstructSetObjInfo(pThis) \ + assert(pThis->pObjInfo == NULL); \ ((obj_t*) (pThis))->pObjInfo = pObjInfoOBJ; \ ((obj_t*) (pThis))->iObjCooCKiE = 0xBADEFEE #else diff --git a/queue.c b/queue.c index 99919ca2..b31173d9 100644 --- a/queue.c +++ b/queue.c @@ -479,6 +479,68 @@ queueWorker(void *arg) pthread_exit(0); } + +/* This method checks if there is any persistent information on the + * queue and, if so, tries to load it. This method can only legally be + * called from the destructor (I moved it out from there to keep the + * Constructor code somewhat smaller). -- rgerhards, 2008-01-11 + */ +static rsRetVal +queueTryLoadPersistedInfo(queue_t *pThis) +{ + DEFiRet; + strm_t *psQIF = NULL; + uchar pszQIFNam[MAXFNAME]; + size_t lenQIFNam; + struct stat stat_buf; + + ISOBJ_TYPE_assert(pThis, queue); + + /* Construct file name */ + lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", + (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); + + /* check if the file exists */ +dbgprintf("stat '%s'\n", pszQIFNam); + if(stat((char*) pszQIFNam, &stat_buf) == -1) { + if(errno == ENOENT) { + dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis)); + ABORT_FINALIZE(RS_RET_OK); + } else { + dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + } + + /* If we reach this point, we have a .qi file */ + + CHKiRet(strmConstruct(&psQIF)); + CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); + CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_READ)); + CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE)); + CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strmConstructFinalize(psQIF)); + + /* first, we try to read the property bag for ourselfs */ + CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF)); + + /* and now the stream objects (some order as when persisted!) */ + CHKiRet(objDeserializeObjAsPropBag(pThis->tVars.disk.pWrite, psQIF)); + CHKiRet(objDeserializeObjAsPropBag(pThis->tVars.disk.pRead, psQIF)); + +finalize_it: + if(psQIF != NULL) + strmDestruct(psQIF); + + if(iRet != RS_RET_OK) { + dbgprintf("Queue 0x%lx: error %d reading .qi file - can not start queue\n", + queueGetID(pThis), iRet); + } + + return iRet; +} + + /* Constructor for the queue object * This constructs the data structure, but does not yet start the queue. That * is done by queueStart(). The reason is that we want to give the caller a chance @@ -561,7 +623,7 @@ finalize_it: /* start up the queue - it must have been constructed and parameters defined * before. */ -rsRetVal queueStart(queue_t *pThis) +rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; int iState; @@ -569,6 +631,9 @@ rsRetVal queueStart(queue_t *pThis) assert(pThis != NULL); + /* and now check if there is some persistent information that needs to be read in */ + CHKiRet(queueTryLoadPersistedInfo(pThis)); + dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", (unsigned long) pThis, pThis->qType, pThis->iMaxFileSize); @@ -650,10 +715,12 @@ static rsRetVal queuePersist(queue_t *pThis) objSerializeSCALAR_VAR(psQIF, qType, INT, i); objSerializeSCALAR(psQIF, iQueueSize, INT); CHKiRet(objEndSerialize(psQIF)); +dbgprintf("queue serial 1\n"); /* this is disk specific and must be moved to a function */ CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF)); CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF)); +dbgprintf("queue serial 2\n"); /* persist queue object itself */ @@ -852,7 +919,7 @@ BEGINObjClassInit(queue, 1) //OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty); //OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize); -ENDObjClassInit(strm) +ENDObjClassInit(queue) /* * vi:set ai: diff --git a/rsyslog.h b/rsyslog.h index 93f6fe06..2937db88 100644 --- a/rsyslog.h +++ b/rsyslog.h @@ -107,6 +107,8 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_FILE_PREFIX_MISSING = -2036, /**< a required file prefix (parameter?) is missing */ RS_RET_INVALID_HEADER_RECTYPE = -2037, /**< invalid record type in header or invalid header */ RS_RET_QTYPE_MISMATCH = -2038, /**< different qType when reading back a property type */ + RS_RET_NO_FILE_ACCESS = -2039, /**< covers EACCES error on file open() */ + RS_RET_FILE_NOT_FOUND = -2040, /**< file not found */ RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */ RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */ RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */ diff --git a/stream.c b/stream.c index 405f858f..59067041 100644 --- a/stream.c +++ b/stream.c @@ -88,6 +88,15 @@ static rsRetVal strmOpenFile(strm_t *pThis) iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND; pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); + if(pThis->fd == -1) { + int ierrnoSave = errno; + dbgprintf("Stream 0x%lx: open error %d\n", (unsigned long) pThis, errno); + if(ierrnoSave == ENOENT) + ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); + else + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + pThis->iCurrOffs = 0; dbgprintf("Stream 0x%lx: opened file '%s' for %s (0x%x) as %d\n", (unsigned long) pThis, @@ -621,28 +630,39 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) int i; long l; - assert(pThis != NULL); - assert(pStrm != NULL); + ISOBJ_TYPE_assert(pThis, strm); + ISOBJ_TYPE_assert(pStrm, strm); +dbgprintf("strmSerialize 1\n"); CHKiRet(objBeginSerialize(pStrm, (obj_t*) pThis)); - i = pThis->sType; - objSerializeSCALAR_VAR(pStrm, sType, INT, i); +dbgprintf("strmSerialize 2\n"); objSerializeSCALAR(pStrm, iCurrFNum, INT); objSerializePTR(pStrm, pszFName, PSZ); + objSerializeSCALAR(pStrm, iMaxFiles, INT); + objSerializeSCALAR(pStrm, iFileNumDigits, INT); + objSerializeSCALAR(pStrm, bDeleteOnClose, INT); + + i = pThis->sType; + objSerializeSCALAR_VAR(pStrm, sType, INT, i); + i = pThis->tOperationsMode; objSerializeSCALAR_VAR(pStrm, tOperationsMode, INT, i); + i = pThis->tOpenMode; objSerializeSCALAR_VAR(pStrm, tOpenMode, INT, i); - l = (long) pThis->iMaxFileSize; - objSerializeSCALAR_VAR(pStrm, iMaxFileSize, LONG, l); - objSerializeSCALAR(pStrm, iMaxFiles, INT); - objSerializeSCALAR(pStrm, iFileNumDigits, INT); - objSerializeSCALAR(pStrm, bDeleteOnClose, INT); + + l = (long) pThis->iCurrOffs; + objSerializeSCALAR_VAR(pStrm, iCurrOffs, LONG, l); + + // TODO: really serialize? + //l = (long) pThis->iMaxFileSize; + //objSerializeSCALAR_VAR(pStrm, iMaxFileSize, LONG, l); CHKiRet(objEndSerialize(pStrm)); finalize_it: +dbgprintf("strmSerialize out %d\n", iRet); return iRet; } @@ -656,7 +676,7 @@ rsRetVal strmSetProperty(strm_t *pThis, property_t *pProp) { DEFiRet; - ISOBJ_TYPE_assert(pThis, Msg); + ISOBJ_TYPE_assert(pThis, strm); assert(pProp != NULL); if(isProp("sType")) { @@ -669,6 +689,8 @@ rsRetVal strmSetProperty(strm_t *pThis, property_t *pProp) CHKiRet(strmSettOperationsMode(pThis, pProp->val.vInt)); } else if(isProp("tOpenMode")) { CHKiRet(strmSettOpenMode(pThis, pProp->val.vInt)); + } else if(isProp("iCurrOffs")) { + pThis->iCurrOffs = pProp->val.vLong; } else if(isProp("iMaxFileSize")) { CHKiRet(strmSetiMaxFileSize(pThis, pProp->val.vLong)); } else if(isProp("iMaxFiles")) { diff --git a/stream.h b/stream.h index e48d354c..d5b207ce 100644 --- a/stream.h +++ b/stream.h @@ -73,12 +73,12 @@ typedef struct strm_s { int iMaxFiles; /* maximum number of files if a circular mode is in use */ int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */ int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */ + size_t iCurrOffs;/* current offset */ /* dynamic properties, valid only during file open, not to be persistet */ size_t sIOBufSize;/* size of IO buffer */ uchar *pszDir; /* Directory */ int lenDir; int fd; /* the file descriptor, -1 if closed */ - size_t iCurrOffs;/* current offset */ uchar *pszCurrFName; /* name of current file (if open) */ uchar *pIOBuf; /* io Buffer */ size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */ -- cgit