summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-09-29 10:09:51 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-09-29 10:09:51 +0200
commit8d8d1f01e1cd6ae372088a3ebddc27983e9a0185 (patch)
tree8aa389a25e899e4ebe26554d3228307020056c75 /runtime
parentb81c4252e808de51c022bbfda96a91ddc697e86c (diff)
parent8bab264ba168b5fee36a7b45020e5e2172c74224 (diff)
downloadrsyslog-8d8d1f01e1cd6ae372088a3ebddc27983e9a0185.tar.gz
rsyslog-8d8d1f01e1cd6ae372088a3ebddc27983e9a0185.tar.xz
rsyslog-8d8d1f01e1cd6ae372088a3ebddc27983e9a0185.zip
Merge branch 'v4-beta' into beta
Conflicts: ChangeLog configure.ac doc/manual.html
Diffstat (limited to 'runtime')
-rw-r--r--runtime/obj.c22
-rw-r--r--runtime/stream.c67
2 files changed, 60 insertions, 29 deletions
diff --git a/runtime/obj.c b/runtime/obj.c
index 6ca05cc4..aebea332 100644
--- a/runtime/obj.c
+++ b/runtime/obj.c
@@ -75,6 +75,7 @@
#include <string.h>
#include <ctype.h>
#include <assert.h>
+#include <pthread.h>
/* how many objects are supported by rsyslogd? */
#define OBJ_NUM_IDS 100 /* TODO change to a linked list? info: 16 were currently in use 2008-02-29 */
@@ -97,6 +98,7 @@ DEFobjCurrIf(module)
DEFobjCurrIf(errmsg)
DEFobjCurrIf(strm)
static objInfo_t *arrObjInfo[OBJ_NUM_IDS]; /* array with object information pointers */
+static pthread_mutex_t mutObjGlobalOp; /* mutex to guard global operations of the object system */
/* cookies for serialized lines */
@@ -1127,6 +1129,7 @@ UseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf)
/* DEV debug only: dbgprintf("source file %s requests object '%s', ifIsLoaded %d\n", srcFile, pObjName, pIf->ifIsLoaded); */
+ d_pthread_mutex_lock(&mutObjGlobalOp);
if(pIf->ifIsLoaded == 1) {
ABORT_FINALIZE(RS_RET_OK); /* we are already set */
@@ -1167,6 +1170,8 @@ UseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf)
pIf->ifIsLoaded = 1; /* we are happy */
finalize_it:
+ d_pthread_mutex_unlock(&mutObjGlobalOp);
+
if(pStr != NULL)
rsCStrDestruct(&pStr);
@@ -1188,15 +1193,16 @@ ReleaseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf)
/* dev debug only dbgprintf("source file %s releasing object '%s', ifIsLoaded %d\n", srcFile, pObjName, pIf->ifIsLoaded); */
+ d_pthread_mutex_lock(&mutObjGlobalOp);
if(pObjFile == NULL)
FINALIZE; /* if it is not a lodable module, we do not need to do anything... */
if(pIf->ifIsLoaded == 0) {
- ABORT_FINALIZE(RS_RET_OK); /* we are not loaded - this is perfectly OK... */
+ FINALIZE; /* we are not loaded - this is perfectly OK... */
} else if(pIf->ifIsLoaded == 2) {
pIf->ifIsLoaded = 0; /* clean up */
- ABORT_FINALIZE(RS_RET_OK); /* we had a load error and can not continue */
+ FINALIZE; /* we had a load error and can not/must not continue */
}
CHKiRet(rsCStrConstructFromszStr(&pStr, pObjName));
@@ -1208,6 +1214,8 @@ ReleaseObj(char *srcFile, uchar *pObjName, uchar *pObjFile, interface_t *pIf)
pIf->ifIsLoaded = 0; /* indicated "no longer valid" */
finalize_it:
+ d_pthread_mutex_unlock(&mutObjGlobalOp);
+
if(pStr != NULL)
rsCStrDestruct(&pStr);
@@ -1300,8 +1308,9 @@ objClassExit(void)
rsRetVal
objClassInit(modInfo_t *pModInfo)
{
- DEFiRet;
+ pthread_mutexattr_t mutAttr;
int i;
+ DEFiRet;
/* first, initialize the object system itself. This must be done
* before any other object is created.
@@ -1310,6 +1319,13 @@ objClassInit(modInfo_t *pModInfo)
arrObjInfo[i] = NULL;
}
+ /* the mutex must be recursive, because objects may call into other
+ * object identifieres recursively.
+ */
+ pthread_mutexattr_init(&mutAttr);
+ pthread_mutexattr_settype(&mutAttr, PTHREAD_MUTEX_RECURSIVE);
+ pthread_mutex_init(&mutObjGlobalOp, &mutAttr);
+
/* request objects we use */
CHKiRet(objGetObjInterface(&obj)); /* get ourselves ;) */
diff --git a/runtime/stream.c b/runtime/stream.c
index 2bc2fba2..58f16cce 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -7,6 +7,14 @@
*
* File begun on 2008-01-09 by RGerhards
* Large modifications in 2009-06 to support using it with omfile, including zip writer.
+ * Note that this file obtains the zlib wrapper object is needed, but it never frees it
+ * again. While this sounds like a leak (and one may argue it actually is), there is no
+ * harm associated with that. The reason is that strm is a core object, so it is terminated
+ * only when rsyslogd exists. As we could only release on termination (or else bear more
+ * overhead for keeping track of how many users we have), not releasing zlibw is OK, because
+ * it will be released when rsyslogd terminates. We may want to revisit this decision if
+ * it turns out to be problematic. Then, we need to quasi-refcount the number of accesses
+ * to the object.
*
* Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH.
*
@@ -598,9 +606,10 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
"without zip\n", localRet);
} else {
/* we use the same size as the original buf, as we would like
- * to make sure we can write out everyting with a SINGLE api call!
+ * to make sure we can write out everything with a SINGLE api call!
+ * We add another 128 bytes to take care of the gzip header and "all eventualities".
*/
- CHKmalloc(pThis->pZipBuf = (Bytef*) malloc(sizeof(uchar) * pThis->sIOBufSize));
+ CHKmalloc(pThis->pZipBuf = (Bytef*) malloc(sizeof(uchar) * pThis->sIOBufSize + 128));
}
}
@@ -671,20 +680,6 @@ CODESTARTobjDestruct(strm)
if(pThis->tOperationsMode != STREAMMODE_READ)
strmFlush(pThis);
-dbgprintf("XXX: destruct stream %p\n", pThis);
- /* ... then free resources */
- if(pThis->fd != -1)
- strmCloseFile(pThis);
-
- if(pThis->iZipLevel) { /* do we need a zip buf? */
- objRelease(zlibw, LM_ZLIBW_FILENAME);
- }
-
- free(pThis->pszDir);
- free(pThis->pZipBuf);
- free(pThis->pszCurrFName);
- free(pThis->pszFName);
-
if(pThis->bAsyncWrite) {
stopWriter(pThis);
pthread_mutex_destroy(&pThis->mut);
@@ -697,6 +692,19 @@ dbgprintf("XXX: destruct stream %p\n", pThis);
} else {
free(pThis->pIOBuf);
}
+
+ /* Finally, we can free the resources.
+ * IMPORTANT: we MUST free this only AFTER the ansyncWriter has been stopped, else
+ * we get random errors...
+ */
+ if(pThis->fd != -1)
+ strmCloseFile(pThis);
+
+ free(pThis->pszDir);
+ free(pThis->pZipBuf);
+ free(pThis->pszCurrFName);
+ free(pThis->pszFName);
+
ENDobjDestruct(strm)
@@ -849,7 +857,6 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf)
if(++pThis->iCnt == 1)
pthread_cond_signal(&pThis->notEmpty);
-finalize_it:
RETiRet;
}
@@ -1043,12 +1050,17 @@ finalize_it:
* add a config switch so that the user can decide the risk he is ready
* to take, but so far this is not yet implemented (not even requested ;)).
* rgerhards, 2009-06-04
+ * For the time being, we take a very conservative approach and do not run this
+ * method multithreaded. This is done in an effort to solve a segfault condition
+ * that seems to be related to the zip code. -- rgerhards, 2009-09-22
+ * TODO: make multithreaded again!
*/
static rsRetVal
doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
{
z_stream zstrm;
int zRet; /* zlib return state */
+ bool bzInitDone = FALSE;
DEFiRet;
assert(pThis != NULL);
assert(pBuf != NULL);
@@ -1057,18 +1069,19 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
zstrm.zalloc = Z_NULL;
zstrm.zfree = Z_NULL;
zstrm.opaque = Z_NULL;
+ zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */
/* see note in file header for the params we use with deflateInit2() */
zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
if(zRet != Z_OK) {
DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet);
ABORT_FINALIZE(RS_RET_ZLIB_ERR);
}
+ bzInitDone = TRUE;
/* now doing the compression */
+ zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */
zstrm.avail_in = lenBuf;
- zstrm.next_in = (Bytef*) pBuf;
- /* run deflate() on input until output buffer not full, finish
- compression if all of source has been read in */
+ /* run deflate() on buffer until everything has been compressed */
do {
DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in);
zstrm.avail_out = pThis->sIOBufSize;
@@ -1076,18 +1089,20 @@ doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */
DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out);
assert(zRet != Z_STREAM_ERROR); /* state not clobbered */
+ if(zstrm.avail_out == pThis->sIOBufSize)
+ break; /* this is valid, indicates end of compression --> see zlib howto */
CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out));
} while (zstrm.avail_out == 0);
assert(zstrm.avail_in == 0); /* all input will be used */
-
- zRet = zlibw.DeflateEnd(&zstrm);
- if(zRet != Z_OK) {
- DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
- ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+finalize_it:
+ if(bzInitDone) {
+ zRet = zlibw.DeflateEnd(&zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
+ }
}
-finalize_it:
RETiRet;
}