diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-08 13:37:19 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-08 13:37:19 +0000 |
commit | 8d0a174a86d29dbec6412cb1bd38f87b3b3c059b (patch) | |
tree | bcaf1c0cdbdd015eb3366d9af5ec4f9a2ddf9e4a | |
parent | 47ccbe9c67c0b3ca518449d80be387ca09904026 (diff) | |
download | rsyslog-8d0a174a86d29dbec6412cb1bd38f87b3b3c059b.tar.gz rsyslog-8d0a174a86d29dbec6412cb1bd38f87b3b3c059b.tar.xz rsyslog-8d0a174a86d29dbec6412cb1bd38f87b3b3c059b.zip |
- first implementation of "disk" queue mode finished. It still needs some
work and the deserializer needs also to be expanded, but the queue at
least performs well now.
- fixed a race condition that could occur when input modules were
terminated
-rw-r--r-- | ChangeLog | 2 | ||||
-rw-r--r-- | msg.c | 13 | ||||
-rw-r--r-- | obj.c | 82 | ||||
-rw-r--r-- | obj.h | 5 | ||||
-rw-r--r-- | queue.c | 37 | ||||
-rw-r--r-- | queue.h | 1 | ||||
-rw-r--r-- | syslogd.c | 13 |
7 files changed, 128 insertions, 25 deletions
@@ -2,6 +2,8 @@ Version 3.10.1 (rgerhards), 2008-01-?? - performance-optimized string class, should bring an overall improvement - fixed a memory leak in imudp -- thanks to varmojfekoj for the patch +- fixed a race condition that could lead to a rsyslogd hang when during + HUP or termination --------------------------------------------------------------------------- Version 3.10.0 (rgerhards), 2008-01-07 - implemented input module interface and initial input modules @@ -2122,6 +2122,18 @@ rsRetVal MsgSetProperty(msg_t *pThis, property_t *pProp) #undef isProp +/* This is a construction finalizer that must be called after all properties + * have been set. It does some final work on the message object. After this + * is done, the object is considered ready for full processing. + * rgerhards, 2008-07-08 + */ +static rsRetVal MsgConstructFinalizer(msg_t *pThis) +{ + MsgPrepareEnqueue(pThis); + return RS_RET_OK; +} + + /* Initialize the message class. Must be called as the very first method * before anything else is called inside this class. * rgerhards, 2008-01-04 @@ -2129,6 +2141,7 @@ rsRetVal MsgSetProperty(msg_t *pThis, property_t *pProp) BEGINObjClassInit(Msg, 1) OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, MsgSetProperty); + OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, MsgConstructFinalizer); /* initially, we have no need to lock message objects */ funcLock = MsgLockingDummy; funcUnlock = MsgLockingDummy; @@ -44,9 +44,10 @@ static objInfo_t *arrObjInfo[OBJ_NUM_IDS]; /* array with object information poin /* some defines */ /* cookies for serialized lines */ -#define COOKIE_OBJLINE '<' -#define COOKIE_PROPLINE '+' -#define COOKIE_ENDLINE '>' +#define COOKIE_OBJLINE '<' +#define COOKIE_PROPLINE '+' +#define COOKIE_ENDLINE '>' +#define COOKIE_BLANKLINE '.' /* methods */ @@ -60,6 +61,11 @@ static rsRetVal objInfoNotImplementedDummy(void __attribute__((unused)) *pThis) return RS_RET_NOT_IMPLEMENTED; } +/* and now the macro to check if something is not implemented + * must be provided an objInfo_t pointer. + */ +#define objInfoIsImplemented(pThis, method) \ + (pThis->objMethods[method] != objInfoNotImplementedDummy) /* construct an object Info object. Each class shall do this on init. The * resulting object shall be cached during the lifetime of the class and each @@ -182,6 +188,7 @@ rsRetVal objSerializeProp(rsCStrObj *pCStr, uchar *pszPropName, propertyType_t p lenBuf = rsCStrLen((rsCStrObj*) pUsr); break; case PROPTYPE_SYSLOGTIME: + //lenBuf = snprintf((char*) szBuf, sizeof(szBuf), "%d:%d:%d:%d:%d:%d:%d:%d:%d:%c:%d:%d", lenBuf = snprintf((char*) szBuf, sizeof(szBuf), "%d %d %d %d %d %d %d %d %d %c %d %d", ((struct syslogTime*)pUsr)->timeType, ((struct syslogTime*)pUsr)->year, @@ -279,7 +286,9 @@ rsRetVal objEndSerialize(rsCStrObj **ppCStr, obj_t *pObj) CHKiRet(rsCStrAppendStrWithLen(pCStr, rsCStrGetBufBeg(*ppCStr), rsCStrLen(*ppCStr))); CHKiRet(rsCStrAppendChar(pCStr, COOKIE_ENDLINE)); - CHKiRet(rsCStrAppendStr(pCStr, (uchar*) "EndObj\n\n")); + CHKiRet(rsCStrAppendStr(pCStr, (uchar*) "EndObj\n")); + CHKiRet(rsCStrAppendChar(pCStr, COOKIE_BLANKLINE)); + CHKiRet(rsCStrAppendChar(pCStr, '\n')); CHKiRet(rsCStrFinish(pCStr)); rsCStrDestruct(*ppCStr); @@ -485,14 +494,54 @@ static rsRetVal objDeserializeTrailer(serialStore_t *pSerStore) NEXTC; if(c != 'b') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); NEXTC; if(c != 'j') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); NEXTC; if(c != '\n') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); + NEXTC; if(c != COOKIE_BLANKLINE) ABORT_FINALIZE(RS_RET_INVALID_TRAILER); NEXTC; if(c != '\n') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); -dbgprintf("obj trailer OK\n"); finalize_it: return iRet; } + +/* This method tries to recover a serial store if it got out of sync. + * To do so, it scans the line beginning cookies and waits for the object + * cookie. If that is found, control is returned. If the store is exhausted, + * we will receive an RS_RET_EOF error as part of NEXTC, which will also + * terminate this function. So we may either return with somehting that + * looks like a valid object or end of store. + * rgerhards, 2008-01-07 + */ +static rsRetVal objDeserializeTryRecover(serialStore_t *pSerStore) +{ + DEFiRet; + uchar c; + int bWasNL; + int bRun; + + assert(pSerStore != NULL); + bRun = 1; + bWasNL = 0; + + while(bRun) { + NEXTC; + if(c == '\n') + bWasNL = 1; + else { + if(bWasNL == 1 && c == COOKIE_OBJLINE) + bRun = 0; /* we found it! */ + else + bWasNL = 0; + } + } + + CHKiRet(serialStoreUngetChar(pSerStore, c)); + +finalize_it: + dbgprintf("deserializer has possibly been able to re-sync and recover, state %d\n", iRet); + return iRet; +} + + /* De-Serialize an object. * Params: Pointer to object Pointer (pObj) (like a obj_t**, but can not do that due to compiler warning) * expected object ID (to check against) @@ -504,6 +553,7 @@ finalize_it: rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, serialStore_t *pSerStore) { DEFiRet; + rsRetVal iRetLocal; obj_t *pObj = NULL; property_t propBuf; objID_t oID = 0; /* this assignment is just to supress a compiler warning - this saddens me */ @@ -513,7 +563,20 @@ rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, serialStore_t *pSe assert(objTypeExpected > 0 && objTypeExpected < OBJ_NUM_IDS); assert(pSerStore != NULL); - CHKiRet(objDeserializeHeader(&oID, &oVers, pSerStore)); + /* we de-serialize the header. if all goes well, we are happy. However, if + * we experience a problem, we try to recover. We do this by skipping to + * the next object header. This is defined via the line-start cookies. In + * worst case, we exhaust the queue, but then we receive EOF return state, + * from objDeserializeTryRecover(), what will cause us to ultimately give up. + * rgerhards, 2008-07-08 + */ + do { + iRetLocal = objDeserializeHeader(&oID, &oVers, pSerStore); + if(iRetLocal != RS_RET_OK) { + dbgprintf("objDeserialize error %d during header processing - trying to recover\n", iRetLocal); + CHKiRet(objDeserializeTryRecover(pSerStore)); + } + } while(iRetLocal != RS_RET_OK); if(oID != objTypeExpected) ABORT_FINALIZE(RS_RET_INVALID_OID); @@ -529,12 +592,13 @@ rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, serialStore_t *pSe if(iRet != RS_RET_NO_PROPLINE) FINALIZE; -dbgprintf("good propline loop exit\n"); CHKiRet(objDeserializeTrailer(pSerStore)); /* do trailer checks */ -// TODO: call constuction finalizer! -// + /* we have a valid object, let's finalize our work and return */ + if(objInfoIsImplemented(arrObjInfo[oID], objMethod_CONSTRUCTION_FINALIZER)) + CHKiRet(arrObjInfo[oID]->objMethods[objMethod_CONSTRUCTION_FINALIZER](pObj)); + *((obj_t**) ppObj) = pObj; finalize_it: @@ -66,9 +66,10 @@ typedef enum { /* IDs of base methods supported by all objects - used for jump t objMethod_SERIALIZE = 2, objMethod_DESERIALIZE = 3, objMethod_SETPROPERTY = 4, - objMethod_DEBUGPRINT = 5 + objMethod_CONSTRUCTION_FINALIZER = 5, + objMethod_DEBUGPRINT = 6 } objMethod_t; -#define OBJ_NUM_METHODS 6 /* must be updated to contain the max number of methods supported */ +#define OBJ_NUM_METHODS 7 /* must be updated to contain the max number of methods supported */ typedef struct objInfo_s { objID_t objID; @@ -1,3 +1,4 @@ +#include <stdio.h> // TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in // call consumer state. Facilitates retaining messages in queue until action could // be called! @@ -127,12 +128,10 @@ static rsRetVal qConstructLinkedList(queue_t *pThis) } -static rsRetVal qDestructLinkedList(queue_t *pThis) +static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis) { DEFiRet; - assert(pThis != NULL); - /* with the linked list type, there is nothing to do here. The * reason is that the Destructor is only called after all entries * have bene taken off the queue. In this case, there is nothing @@ -211,12 +210,14 @@ static rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int dbgprintf("Queue 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pFile->pszFileName, flags, pFile->fd); finalize_it: -dbgprintf("qDiskOpen iRet %d\n", iRet); return iRet; } -/* close a queue file */ +/* close a queue file + * Note that the bDeleteOnClose flag is honored. If it is set, the file will be + * deleted after close. This is in support for the qRead thread. + */ static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile) { DEFiRet; @@ -228,6 +229,10 @@ static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile) close(pFile->fd); // TODO: error check pFile->fd = -1; + if(pFile->bDeleteOnClose) { + unlink((char*) pThis->tVars.disk.fRead.pszFileName); // TODO: check returncode + } + if(pFile->pszFileName != NULL) { free(pFile->pszFileName); /* no longer needed in any case (just for open) */ pFile->pszFileName = NULL; @@ -375,17 +380,19 @@ static rsRetVal qConstructDisk(queue_t *pThis) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); pThis->tVars.disk.lenSpoolDir = strlen((char*)pThis->tVars.disk.pszSpoolDir); - pThis->tVars.disk.iMaxFileSize = 1024 * 3; // TODO: configurable! + pThis->tVars.disk.iMaxFileSize = 10240000; //1024 * 3; // TODO: configurable! pThis->tVars.disk.fWrite.iCurrFileNum = 1; pThis->tVars.disk.fWrite.iCurrOffs = 0; pThis->tVars.disk.fWrite.fd = -1; pThis->tVars.disk.fWrite.iUngetC = -1; + pThis->tVars.disk.fRead.bDeleteOnClose = 0; /* do *NOT* set this to 1! */ pThis->tVars.disk.fRead.iCurrFileNum = 1; pThis->tVars.disk.fRead.fd = -1; pThis->tVars.disk.fRead.iCurrOffs = 0; pThis->tVars.disk.fRead.iUngetC = -1; + pThis->tVars.disk.fRead.bDeleteOnClose = 1; finalize_it: return iRet; @@ -418,7 +425,7 @@ static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) assert(pThis != NULL); if(pThis->tVars.disk.fWrite.fd == -1) - CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fWrite, O_RDWR|O_CREAT, 0600)); // TODO: open modes! + CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes! CHKiRet((objSerialize(pUsr))(pUsr, &pCStr)); iWritten = write(pThis->tVars.disk.fWrite.fd, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr)); @@ -564,7 +571,7 @@ queueDel(queue_t *pThis, void *pUsr) * Please NOTE: * Having more than one worker requires considerable * additional code review in regard to thread-safety. - */ +*/ static void * queueWorker(void *arg) { @@ -774,16 +781,24 @@ rsRetVal queueEnqObj(queue_t *pThis, void *pUsr) { DEFiRet; + int iCancelStateSave; int i; struct timespec t; assert(pThis != NULL); - if(pThis->qType != QUEUETYPE_DIRECT) + /* Please note that this function is not cancel-safe and consequently + * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE + * during its execution. If that is not done, race conditions occur if the + * thread is canceled (most important use case is input module termination). + * rgerhards, 2008-01-08 + */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + if(pThis->pWorkerThreads != NULL) pthread_mutex_lock(pThis->mut); while(pThis->iQueueSize >= pThis->iMaxQueueSize) { - dbgprintf("enqueueMsg: queue 0x%lx FULL.\n", (unsigned long) pThis); + dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", (unsigned long) pThis); clock_gettime (CLOCK_REALTIME, &t); t.tv_sec += 2; /* TODO: configurable! */ @@ -805,6 +820,8 @@ finalize_it: dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); } + pthread_setcancelstate(iCancelStateSave, NULL); + return iRet; } /* @@ -40,6 +40,7 @@ typedef struct { int iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */ int iBufPtr; /* pointer into current buffer */ int iUngetC; /* char set via UngetChar() call or -1 if none set */ + int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */ } queueFileDescription_t; #define qFILE_IOBUF_SIZE 4096 /* size of the IO buffer */ @@ -2592,7 +2592,7 @@ die(int sig) char buf[256]; if (sig) { - dbgprintf(" exiting on signal %d\n", sig); + dbgprintf("exiting on signal %d\n", sig); (void) snprintf(buf, sizeof(buf) / sizeof(char), " [origin software=\"rsyslogd\" " "swVersion=\"" VERSION \ "\" x-pid=\"%d\"]" " exiting on signal %d.", @@ -2602,15 +2602,19 @@ die(int sig) } /* close the inputs */ + dbgprintf("Terminating input threads...\n"); thrdTerminateAll(); /* TODO: inputs only, please */ /* drain queue and stop worker thread */ + dbgprintf("Terminating main queue...\n"); queueDestruct(pMsgQueue); pMsgQueue = NULL; /* Free ressources and close connections */ + dbgprintf("Terminating outputs...\n"); freeSelectors(); + dbgprintf("all primary multi-thread sources have been terminated - now doing aux cleanp\n"); /* rger 2005-02-22 * now clean up the in-memory structures. OK, the OS * would also take care of that, but if we do it @@ -3361,6 +3365,7 @@ init(void) Initialized = 1; bHaveMainQueue = (MainMsgQueType == QUEUETYPE_DIRECT) ? 0 : 1; + dbgprintf("Main processing queue is initialized and running\n"); /* the output part and the queue is now ready to run. So it is a good time * to start the inputs. Please note that the net code above should be @@ -4398,7 +4403,7 @@ mainloop(void) if(restart) { dbgprintf("\nReceived SIGHUP, reloading rsyslogd.\n"); - /* worker thread is stopped as part of init() */ + /* main queue is stopped as part of init() */ init(); restart = 0; continue; @@ -4824,9 +4829,9 @@ int main(int argc, char **argv) dbgprintf("Compatibility Mode: %d\n", iCompatibilityMode); /* tuck my process id away */ - if ( !Debug ) + if (1) // !Debug ) { - dbgprintf("Writing pidfile.\n"); + dbgprintf("Writing pidfile %s.\n", PidFile); if (!check_pid(PidFile)) { if (!write_pid(PidFile)) |