From 8d0a174a86d29dbec6412cb1bd38f87b3b3c059b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 8 Jan 2008 13:37:19 +0000 Subject: - first implementation of "disk" queue mode finished. It still needs some work and the deserializer needs also to be expanded, but the queue at least performs well now. - fixed a race condition that could occur when input modules were terminated --- obj.c | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 73 insertions(+), 9 deletions(-) (limited to 'obj.c') diff --git a/obj.c b/obj.c index ca4481aa..b836c691 100644 --- a/obj.c +++ b/obj.c @@ -44,9 +44,10 @@ static objInfo_t *arrObjInfo[OBJ_NUM_IDS]; /* array with object information poin /* some defines */ /* cookies for serialized lines */ -#define COOKIE_OBJLINE '<' -#define COOKIE_PROPLINE '+' -#define COOKIE_ENDLINE '>' +#define COOKIE_OBJLINE '<' +#define COOKIE_PROPLINE '+' +#define COOKIE_ENDLINE '>' +#define COOKIE_BLANKLINE '.' /* methods */ @@ -60,6 +61,11 @@ static rsRetVal objInfoNotImplementedDummy(void __attribute__((unused)) *pThis) return RS_RET_NOT_IMPLEMENTED; } +/* and now the macro to check if something is not implemented + * must be provided an objInfo_t pointer. + */ +#define objInfoIsImplemented(pThis, method) \ + (pThis->objMethods[method] != objInfoNotImplementedDummy) /* construct an object Info object. Each class shall do this on init. The * resulting object shall be cached during the lifetime of the class and each @@ -182,6 +188,7 @@ rsRetVal objSerializeProp(rsCStrObj *pCStr, uchar *pszPropName, propertyType_t p lenBuf = rsCStrLen((rsCStrObj*) pUsr); break; case PROPTYPE_SYSLOGTIME: + //lenBuf = snprintf((char*) szBuf, sizeof(szBuf), "%d:%d:%d:%d:%d:%d:%d:%d:%d:%c:%d:%d", lenBuf = snprintf((char*) szBuf, sizeof(szBuf), "%d %d %d %d %d %d %d %d %d %c %d %d", ((struct syslogTime*)pUsr)->timeType, ((struct syslogTime*)pUsr)->year, @@ -279,7 +286,9 @@ rsRetVal objEndSerialize(rsCStrObj **ppCStr, obj_t *pObj) CHKiRet(rsCStrAppendStrWithLen(pCStr, rsCStrGetBufBeg(*ppCStr), rsCStrLen(*ppCStr))); CHKiRet(rsCStrAppendChar(pCStr, COOKIE_ENDLINE)); - CHKiRet(rsCStrAppendStr(pCStr, (uchar*) "EndObj\n\n")); + CHKiRet(rsCStrAppendStr(pCStr, (uchar*) "EndObj\n")); + CHKiRet(rsCStrAppendChar(pCStr, COOKIE_BLANKLINE)); + CHKiRet(rsCStrAppendChar(pCStr, '\n')); CHKiRet(rsCStrFinish(pCStr)); rsCStrDestruct(*ppCStr); @@ -485,14 +494,54 @@ static rsRetVal objDeserializeTrailer(serialStore_t *pSerStore) NEXTC; if(c != 'b') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); NEXTC; if(c != 'j') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); NEXTC; if(c != '\n') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); + NEXTC; if(c != COOKIE_BLANKLINE) ABORT_FINALIZE(RS_RET_INVALID_TRAILER); NEXTC; if(c != '\n') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); -dbgprintf("obj trailer OK\n"); finalize_it: return iRet; } + +/* This method tries to recover a serial store if it got out of sync. + * To do so, it scans the line beginning cookies and waits for the object + * cookie. If that is found, control is returned. If the store is exhausted, + * we will receive an RS_RET_EOF error as part of NEXTC, which will also + * terminate this function. So we may either return with somehting that + * looks like a valid object or end of store. + * rgerhards, 2008-01-07 + */ +static rsRetVal objDeserializeTryRecover(serialStore_t *pSerStore) +{ + DEFiRet; + uchar c; + int bWasNL; + int bRun; + + assert(pSerStore != NULL); + bRun = 1; + bWasNL = 0; + + while(bRun) { + NEXTC; + if(c == '\n') + bWasNL = 1; + else { + if(bWasNL == 1 && c == COOKIE_OBJLINE) + bRun = 0; /* we found it! */ + else + bWasNL = 0; + } + } + + CHKiRet(serialStoreUngetChar(pSerStore, c)); + +finalize_it: + dbgprintf("deserializer has possibly been able to re-sync and recover, state %d\n", iRet); + return iRet; +} + + /* De-Serialize an object. * Params: Pointer to object Pointer (pObj) (like a obj_t**, but can not do that due to compiler warning) * expected object ID (to check against) @@ -504,6 +553,7 @@ finalize_it: rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, serialStore_t *pSerStore) { DEFiRet; + rsRetVal iRetLocal; obj_t *pObj = NULL; property_t propBuf; objID_t oID = 0; /* this assignment is just to supress a compiler warning - this saddens me */ @@ -513,7 +563,20 @@ rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, serialStore_t *pSe assert(objTypeExpected > 0 && objTypeExpected < OBJ_NUM_IDS); assert(pSerStore != NULL); - CHKiRet(objDeserializeHeader(&oID, &oVers, pSerStore)); + /* we de-serialize the header. if all goes well, we are happy. However, if + * we experience a problem, we try to recover. We do this by skipping to + * the next object header. This is defined via the line-start cookies. In + * worst case, we exhaust the queue, but then we receive EOF return state, + * from objDeserializeTryRecover(), what will cause us to ultimately give up. + * rgerhards, 2008-07-08 + */ + do { + iRetLocal = objDeserializeHeader(&oID, &oVers, pSerStore); + if(iRetLocal != RS_RET_OK) { + dbgprintf("objDeserialize error %d during header processing - trying to recover\n", iRetLocal); + CHKiRet(objDeserializeTryRecover(pSerStore)); + } + } while(iRetLocal != RS_RET_OK); if(oID != objTypeExpected) ABORT_FINALIZE(RS_RET_INVALID_OID); @@ -529,12 +592,13 @@ rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, serialStore_t *pSe if(iRet != RS_RET_NO_PROPLINE) FINALIZE; -dbgprintf("good propline loop exit\n"); CHKiRet(objDeserializeTrailer(pSerStore)); /* do trailer checks */ -// TODO: call constuction finalizer! -// + /* we have a valid object, let's finalize our work and return */ + if(objInfoIsImplemented(arrObjInfo[oID], objMethod_CONSTRUCTION_FINALIZER)) + CHKiRet(arrObjInfo[oID]->objMethods[objMethod_CONSTRUCTION_FINALIZER](pObj)); + *((obj_t**) ppObj) = pObj; finalize_it: -- cgit