summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-08 13:37:19 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-08 13:37:19 +0000
commit8d0a174a86d29dbec6412cb1bd38f87b3b3c059b (patch)
treebcaf1c0cdbdd015eb3366d9af5ec4f9a2ddf9e4a
parent47ccbe9c67c0b3ca518449d80be387ca09904026 (diff)
downloadrsyslog-8d0a174a86d29dbec6412cb1bd38f87b3b3c059b.tar.gz
rsyslog-8d0a174a86d29dbec6412cb1bd38f87b3b3c059b.tar.xz
rsyslog-8d0a174a86d29dbec6412cb1bd38f87b3b3c059b.zip
- first implementation of "disk" queue mode finished. It still needs some
work and the deserializer needs also to be expanded, but the queue at least performs well now. - fixed a race condition that could occur when input modules were terminated
-rw-r--r--ChangeLog2
-rw-r--r--msg.c13
-rw-r--r--obj.c82
-rw-r--r--obj.h5
-rw-r--r--queue.c37
-rw-r--r--queue.h1
-rw-r--r--syslogd.c13
7 files changed, 128 insertions, 25 deletions
diff --git a/ChangeLog b/ChangeLog
index 96733925..8bf5ec32 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -2,6 +2,8 @@
Version 3.10.1 (rgerhards), 2008-01-??
- performance-optimized string class, should bring an overall improvement
- fixed a memory leak in imudp -- thanks to varmojfekoj for the patch
+- fixed a race condition that could lead to a rsyslogd hang when during
+ HUP or termination
---------------------------------------------------------------------------
Version 3.10.0 (rgerhards), 2008-01-07
- implemented input module interface and initial input modules
diff --git a/msg.c b/msg.c
index 3e34ac1c..ca9678b2 100644
--- a/msg.c
+++ b/msg.c
@@ -2122,6 +2122,18 @@ rsRetVal MsgSetProperty(msg_t *pThis, property_t *pProp)
#undef isProp
+/* This is a construction finalizer that must be called after all properties
+ * have been set. It does some final work on the message object. After this
+ * is done, the object is considered ready for full processing.
+ * rgerhards, 2008-07-08
+ */
+static rsRetVal MsgConstructFinalizer(msg_t *pThis)
+{
+ MsgPrepareEnqueue(pThis);
+ return RS_RET_OK;
+}
+
+
/* Initialize the message class. Must be called as the very first method
* before anything else is called inside this class.
* rgerhards, 2008-01-04
@@ -2129,6 +2141,7 @@ rsRetVal MsgSetProperty(msg_t *pThis, property_t *pProp)
BEGINObjClassInit(Msg, 1)
OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize);
OBJSetMethodHandler(objMethod_SETPROPERTY, MsgSetProperty);
+ OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, MsgConstructFinalizer);
/* initially, we have no need to lock message objects */
funcLock = MsgLockingDummy;
funcUnlock = MsgLockingDummy;
diff --git a/obj.c b/obj.c
index ca4481aa..b836c691 100644
--- a/obj.c
+++ b/obj.c
@@ -44,9 +44,10 @@ static objInfo_t *arrObjInfo[OBJ_NUM_IDS]; /* array with object information poin
/* some defines */
/* cookies for serialized lines */
-#define COOKIE_OBJLINE '<'
-#define COOKIE_PROPLINE '+'
-#define COOKIE_ENDLINE '>'
+#define COOKIE_OBJLINE '<'
+#define COOKIE_PROPLINE '+'
+#define COOKIE_ENDLINE '>'
+#define COOKIE_BLANKLINE '.'
/* methods */
@@ -60,6 +61,11 @@ static rsRetVal objInfoNotImplementedDummy(void __attribute__((unused)) *pThis)
return RS_RET_NOT_IMPLEMENTED;
}
+/* and now the macro to check if something is not implemented
+ * must be provided an objInfo_t pointer.
+ */
+#define objInfoIsImplemented(pThis, method) \
+ (pThis->objMethods[method] != objInfoNotImplementedDummy)
/* construct an object Info object. Each class shall do this on init. The
* resulting object shall be cached during the lifetime of the class and each
@@ -182,6 +188,7 @@ rsRetVal objSerializeProp(rsCStrObj *pCStr, uchar *pszPropName, propertyType_t p
lenBuf = rsCStrLen((rsCStrObj*) pUsr);
break;
case PROPTYPE_SYSLOGTIME:
+ //lenBuf = snprintf((char*) szBuf, sizeof(szBuf), "%d:%d:%d:%d:%d:%d:%d:%d:%d:%c:%d:%d",
lenBuf = snprintf((char*) szBuf, sizeof(szBuf), "%d %d %d %d %d %d %d %d %d %c %d %d",
((struct syslogTime*)pUsr)->timeType,
((struct syslogTime*)pUsr)->year,
@@ -279,7 +286,9 @@ rsRetVal objEndSerialize(rsCStrObj **ppCStr, obj_t *pObj)
CHKiRet(rsCStrAppendStrWithLen(pCStr, rsCStrGetBufBeg(*ppCStr), rsCStrLen(*ppCStr)));
CHKiRet(rsCStrAppendChar(pCStr, COOKIE_ENDLINE));
- CHKiRet(rsCStrAppendStr(pCStr, (uchar*) "EndObj\n\n"));
+ CHKiRet(rsCStrAppendStr(pCStr, (uchar*) "EndObj\n"));
+ CHKiRet(rsCStrAppendChar(pCStr, COOKIE_BLANKLINE));
+ CHKiRet(rsCStrAppendChar(pCStr, '\n'));
CHKiRet(rsCStrFinish(pCStr));
rsCStrDestruct(*ppCStr);
@@ -485,14 +494,54 @@ static rsRetVal objDeserializeTrailer(serialStore_t *pSerStore)
NEXTC; if(c != 'b') ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
NEXTC; if(c != 'j') ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
NEXTC; if(c != '\n') ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
+ NEXTC; if(c != COOKIE_BLANKLINE) ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
NEXTC; if(c != '\n') ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
-dbgprintf("obj trailer OK\n");
finalize_it:
return iRet;
}
+
+/* This method tries to recover a serial store if it got out of sync.
+ * To do so, it scans the line beginning cookies and waits for the object
+ * cookie. If that is found, control is returned. If the store is exhausted,
+ * we will receive an RS_RET_EOF error as part of NEXTC, which will also
+ * terminate this function. So we may either return with somehting that
+ * looks like a valid object or end of store.
+ * rgerhards, 2008-01-07
+ */
+static rsRetVal objDeserializeTryRecover(serialStore_t *pSerStore)
+{
+ DEFiRet;
+ uchar c;
+ int bWasNL;
+ int bRun;
+
+ assert(pSerStore != NULL);
+ bRun = 1;
+ bWasNL = 0;
+
+ while(bRun) {
+ NEXTC;
+ if(c == '\n')
+ bWasNL = 1;
+ else {
+ if(bWasNL == 1 && c == COOKIE_OBJLINE)
+ bRun = 0; /* we found it! */
+ else
+ bWasNL = 0;
+ }
+ }
+
+ CHKiRet(serialStoreUngetChar(pSerStore, c));
+
+finalize_it:
+ dbgprintf("deserializer has possibly been able to re-sync and recover, state %d\n", iRet);
+ return iRet;
+}
+
+
/* De-Serialize an object.
* Params: Pointer to object Pointer (pObj) (like a obj_t**, but can not do that due to compiler warning)
* expected object ID (to check against)
@@ -504,6 +553,7 @@ finalize_it:
rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, serialStore_t *pSerStore)
{
DEFiRet;
+ rsRetVal iRetLocal;
obj_t *pObj = NULL;
property_t propBuf;
objID_t oID = 0; /* this assignment is just to supress a compiler warning - this saddens me */
@@ -513,7 +563,20 @@ rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, serialStore_t *pSe
assert(objTypeExpected > 0 && objTypeExpected < OBJ_NUM_IDS);
assert(pSerStore != NULL);
- CHKiRet(objDeserializeHeader(&oID, &oVers, pSerStore));
+ /* we de-serialize the header. if all goes well, we are happy. However, if
+ * we experience a problem, we try to recover. We do this by skipping to
+ * the next object header. This is defined via the line-start cookies. In
+ * worst case, we exhaust the queue, but then we receive EOF return state,
+ * from objDeserializeTryRecover(), what will cause us to ultimately give up.
+ * rgerhards, 2008-07-08
+ */
+ do {
+ iRetLocal = objDeserializeHeader(&oID, &oVers, pSerStore);
+ if(iRetLocal != RS_RET_OK) {
+ dbgprintf("objDeserialize error %d during header processing - trying to recover\n", iRetLocal);
+ CHKiRet(objDeserializeTryRecover(pSerStore));
+ }
+ } while(iRetLocal != RS_RET_OK);
if(oID != objTypeExpected)
ABORT_FINALIZE(RS_RET_INVALID_OID);
@@ -529,12 +592,13 @@ rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, serialStore_t *pSe
if(iRet != RS_RET_NO_PROPLINE)
FINALIZE;
-dbgprintf("good propline loop exit\n");
CHKiRet(objDeserializeTrailer(pSerStore)); /* do trailer checks */
-// TODO: call constuction finalizer!
-//
+ /* we have a valid object, let's finalize our work and return */
+ if(objInfoIsImplemented(arrObjInfo[oID], objMethod_CONSTRUCTION_FINALIZER))
+ CHKiRet(arrObjInfo[oID]->objMethods[objMethod_CONSTRUCTION_FINALIZER](pObj));
+
*((obj_t**) ppObj) = pObj;
finalize_it:
diff --git a/obj.h b/obj.h
index 74fe5b7c..bf9beb2b 100644
--- a/obj.h
+++ b/obj.h
@@ -66,9 +66,10 @@ typedef enum { /* IDs of base methods supported by all objects - used for jump t
objMethod_SERIALIZE = 2,
objMethod_DESERIALIZE = 3,
objMethod_SETPROPERTY = 4,
- objMethod_DEBUGPRINT = 5
+ objMethod_CONSTRUCTION_FINALIZER = 5,
+ objMethod_DEBUGPRINT = 6
} objMethod_t;
-#define OBJ_NUM_METHODS 6 /* must be updated to contain the max number of methods supported */
+#define OBJ_NUM_METHODS 7 /* must be updated to contain the max number of methods supported */
typedef struct objInfo_s {
objID_t objID;
diff --git a/queue.c b/queue.c
index 1c371cd9..7bd64d41 100644
--- a/queue.c
+++ b/queue.c
@@ -1,3 +1,4 @@
+#include <stdio.h>
// TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in
// call consumer state. Facilitates retaining messages in queue until action could
// be called!
@@ -127,12 +128,10 @@ static rsRetVal qConstructLinkedList(queue_t *pThis)
}
-static rsRetVal qDestructLinkedList(queue_t *pThis)
+static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis)
{
DEFiRet;
- assert(pThis != NULL);
-
/* with the linked list type, there is nothing to do here. The
* reason is that the Destructor is only called after all entries
* have bene taken off the queue. In this case, there is nothing
@@ -211,12 +210,14 @@ static rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int
dbgprintf("Queue 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pFile->pszFileName, flags, pFile->fd);
finalize_it:
-dbgprintf("qDiskOpen iRet %d\n", iRet);
return iRet;
}
-/* close a queue file */
+/* close a queue file
+ * Note that the bDeleteOnClose flag is honored. If it is set, the file will be
+ * deleted after close. This is in support for the qRead thread.
+ */
static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile)
{
DEFiRet;
@@ -228,6 +229,10 @@ static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile)
close(pFile->fd); // TODO: error check
pFile->fd = -1;
+ if(pFile->bDeleteOnClose) {
+ unlink((char*) pThis->tVars.disk.fRead.pszFileName); // TODO: check returncode
+ }
+
if(pFile->pszFileName != NULL) {
free(pFile->pszFileName); /* no longer needed in any case (just for open) */
pFile->pszFileName = NULL;
@@ -375,17 +380,19 @@ static rsRetVal qConstructDisk(queue_t *pThis)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
pThis->tVars.disk.lenSpoolDir = strlen((char*)pThis->tVars.disk.pszSpoolDir);
- pThis->tVars.disk.iMaxFileSize = 1024 * 3; // TODO: configurable!
+ pThis->tVars.disk.iMaxFileSize = 10240000; //1024 * 3; // TODO: configurable!
pThis->tVars.disk.fWrite.iCurrFileNum = 1;
pThis->tVars.disk.fWrite.iCurrOffs = 0;
pThis->tVars.disk.fWrite.fd = -1;
pThis->tVars.disk.fWrite.iUngetC = -1;
+ pThis->tVars.disk.fRead.bDeleteOnClose = 0; /* do *NOT* set this to 1! */
pThis->tVars.disk.fRead.iCurrFileNum = 1;
pThis->tVars.disk.fRead.fd = -1;
pThis->tVars.disk.fRead.iCurrOffs = 0;
pThis->tVars.disk.fRead.iUngetC = -1;
+ pThis->tVars.disk.fRead.bDeleteOnClose = 1;
finalize_it:
return iRet;
@@ -418,7 +425,7 @@ static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
assert(pThis != NULL);
if(pThis->tVars.disk.fWrite.fd == -1)
- CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fWrite, O_RDWR|O_CREAT, 0600)); // TODO: open modes!
+ CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fWrite, O_RDWR|O_CREAT|O_TRUNC, 0600)); // TODO: open modes!
CHKiRet((objSerialize(pUsr))(pUsr, &pCStr));
iWritten = write(pThis->tVars.disk.fWrite.fd, rsCStrGetBufBeg(pCStr), rsCStrLen(pCStr));
@@ -564,7 +571,7 @@ queueDel(queue_t *pThis, void *pUsr)
* Please NOTE:
* Having more than one worker requires considerable
* additional code review in regard to thread-safety.
- */
+*/
static void *
queueWorker(void *arg)
{
@@ -774,16 +781,24 @@ rsRetVal
queueEnqObj(queue_t *pThis, void *pUsr)
{
DEFiRet;
+ int iCancelStateSave;
int i;
struct timespec t;
assert(pThis != NULL);
- if(pThis->qType != QUEUETYPE_DIRECT)
+ /* Please note that this function is not cancel-safe and consequently
+ * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
+ * during its execution. If that is not done, race conditions occur if the
+ * thread is canceled (most important use case is input module termination).
+ * rgerhards, 2008-01-08
+ */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ if(pThis->pWorkerThreads != NULL)
pthread_mutex_lock(pThis->mut);
while(pThis->iQueueSize >= pThis->iMaxQueueSize) {
- dbgprintf("enqueueMsg: queue 0x%lx FULL.\n", (unsigned long) pThis);
+ dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", (unsigned long) pThis);
clock_gettime (CLOCK_REALTIME, &t);
t.tv_sec += 2; /* TODO: configurable! */
@@ -805,6 +820,8 @@ finalize_it:
dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);
}
+ pthread_setcancelstate(iCancelStateSave, NULL);
+
return iRet;
}
/*
diff --git a/queue.h b/queue.h
index f7be9215..eb8ef8f6 100644
--- a/queue.h
+++ b/queue.h
@@ -40,6 +40,7 @@ typedef struct {
int iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */
int iBufPtr; /* pointer into current buffer */
int iUngetC; /* char set via UngetChar() call or -1 if none set */
+ int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */
} queueFileDescription_t;
#define qFILE_IOBUF_SIZE 4096 /* size of the IO buffer */
diff --git a/syslogd.c b/syslogd.c
index bb3866f4..83b63fc4 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -2592,7 +2592,7 @@ die(int sig)
char buf[256];
if (sig) {
- dbgprintf(" exiting on signal %d\n", sig);
+ dbgprintf("exiting on signal %d\n", sig);
(void) snprintf(buf, sizeof(buf) / sizeof(char),
" [origin software=\"rsyslogd\" " "swVersion=\"" VERSION \
"\" x-pid=\"%d\"]" " exiting on signal %d.",
@@ -2602,15 +2602,19 @@ die(int sig)
}
/* close the inputs */
+ dbgprintf("Terminating input threads...\n");
thrdTerminateAll(); /* TODO: inputs only, please */
/* drain queue and stop worker thread */
+ dbgprintf("Terminating main queue...\n");
queueDestruct(pMsgQueue);
pMsgQueue = NULL;
/* Free ressources and close connections */
+ dbgprintf("Terminating outputs...\n");
freeSelectors();
+ dbgprintf("all primary multi-thread sources have been terminated - now doing aux cleanp\n");
/* rger 2005-02-22
* now clean up the in-memory structures. OK, the OS
* would also take care of that, but if we do it
@@ -3361,6 +3365,7 @@ init(void)
Initialized = 1;
bHaveMainQueue = (MainMsgQueType == QUEUETYPE_DIRECT) ? 0 : 1;
+ dbgprintf("Main processing queue is initialized and running\n");
/* the output part and the queue is now ready to run. So it is a good time
* to start the inputs. Please note that the net code above should be
@@ -4398,7 +4403,7 @@ mainloop(void)
if(restart) {
dbgprintf("\nReceived SIGHUP, reloading rsyslogd.\n");
- /* worker thread is stopped as part of init() */
+ /* main queue is stopped as part of init() */
init();
restart = 0;
continue;
@@ -4824,9 +4829,9 @@ int main(int argc, char **argv)
dbgprintf("Compatibility Mode: %d\n", iCompatibilityMode);
/* tuck my process id away */
- if ( !Debug )
+ if (1) // !Debug )
{
- dbgprintf("Writing pidfile.\n");
+ dbgprintf("Writing pidfile %s.\n", PidFile);
if (!check_pid(PidFile))
{
if (!write_pid(PidFile))