summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/batch.h8
-rw-r--r--runtime/datetime.h1
-rw-r--r--runtime/errmsg.h1
-rw-r--r--runtime/expr.h1
-rw-r--r--runtime/glbl.c37
-rw-r--r--runtime/modules.h1
-rw-r--r--runtime/msg.c10
-rw-r--r--runtime/msg.h3
-rw-r--r--runtime/queue.c22
-rw-r--r--runtime/queue.h1
-rw-r--r--runtime/rsyslog.c18
-rw-r--r--runtime/rsyslog.h8
-rw-r--r--runtime/ruleset.c23
-rw-r--r--runtime/stream.c142
-rw-r--r--runtime/stream.h5
-rw-r--r--runtime/stringbuf.c2
-rw-r--r--runtime/wtp.c6
17 files changed, 232 insertions, 57 deletions
diff --git a/runtime/batch.h b/runtime/batch.h
index d0504f2b..944889bd 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -136,11 +136,16 @@ batchIsValidElem(batch_t *pBatch, int i) {
/* copy one batch element to another.
* This creates a complete duplicate in those cases where
* it is needed. Use duplication only when absolutely necessary!
+ * Note that all working fields are reset to zeros. If that were
+ * not done, we would have potential problems with invalid
+ * or double pointer frees.
* rgerhards, 2010-06-10
*/
static inline void
batchCopyElem(batch_obj_t *pDest, batch_obj_t *pSrc) {
- memcpy(pDest, pSrc, sizeof(batch_obj_t));
+ memset(pDest, 0, sizeof(batch_obj_t));
+ pDest->pUsrp = pSrc->pUsrp;
+ pDest->state = pSrc->state;
}
@@ -171,6 +176,7 @@ batchFree(batch_t *pBatch) {
static inline rsRetVal
batchInit(batch_t *pBatch, int maxElem) {
DEFiRet;
+ pBatch->iDoneUpTo = 0;
pBatch->maxElem = maxElem;
CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t)));
// TODO: replace calloc by inidividual writes?
diff --git a/runtime/datetime.h b/runtime/datetime.h
index 82bd415b..70bbf416 100644
--- a/runtime/datetime.h
+++ b/runtime/datetime.h
@@ -28,6 +28,7 @@
/* the datetime object */
typedef struct datetime_s {
+ char dummy;
} datetime_t;
diff --git a/runtime/errmsg.h b/runtime/errmsg.h
index 799954fb..ac7018b3 100644
--- a/runtime/errmsg.h
+++ b/runtime/errmsg.h
@@ -30,6 +30,7 @@
/* the errmsg object */
typedef struct errmsg_s {
+ char dummy;
} errmsg_t;
diff --git a/runtime/expr.h b/runtime/expr.h
index 974b71ec..1afe1a1f 100644
--- a/runtime/expr.h
+++ b/runtime/expr.h
@@ -30,6 +30,7 @@
/* a node inside an expression tree */
typedef struct exprNode_s {
+ char dummy;
} exprNode_t;
diff --git a/runtime/glbl.c b/runtime/glbl.c
index 7dc17df4..0114b1ac 100644
--- a/runtime/glbl.c
+++ b/runtime/glbl.c
@@ -31,6 +31,9 @@
#include "config.h"
#include <stdlib.h>
#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
#include <assert.h>
#include "rsyslog.h"
@@ -40,6 +43,7 @@
#include "glbl.h"
#include "prop.h"
#include "atomic.h"
+#include "errmsg.h"
/* some defaults */
#ifndef DFLT_NETSTRM_DRVR
@@ -49,6 +53,7 @@
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(prop)
+DEFobjCurrIf(errmsg)
/* static data
* For this object, these variables are obviously what makes the "meat" of the
@@ -147,6 +152,35 @@ static void SetGlobalInputTermination(void)
}
+/* This function is used to set the global work directory name.
+ * It verifies that the provided directory actually exists and
+ * emits an error message if not.
+ * rgerhards, 2011-02-16
+ */
+static rsRetVal setWorkDir(void __attribute__((unused)) *pVal, uchar *pNewVal)
+{
+ DEFiRet;
+ struct stat sb;
+
+ if(stat((char*) pNewVal, &sb) != 0) {
+ errmsg.LogError(0, RS_RET_ERR_WRKDIR, "$WorkDirectory: %s can not be "
+ "accessed, probably does not exist - directive ignored", pNewVal);
+ ABORT_FINALIZE(RS_RET_ERR_WRKDIR);
+ }
+
+ if(!S_ISDIR(sb.st_mode)) {
+ errmsg.LogError(0, RS_RET_ERR_WRKDIR, "$WorkDirectory: %s not a directory - directive ignored",
+ pNewVal);
+ ABORT_FINALIZE(RS_RET_ERR_WRKDIR);
+ }
+
+ free(pszWorkDir);
+ pszWorkDir = pNewVal;
+
+finalize_it:
+ RETiRet;
+}
+
/* return our local hostname. if it is not set, "[localhost]" is returned
*/
static uchar*
@@ -354,9 +388,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
BEGINAbstractObjClassInit(glbl, 1, OBJ_IS_CORE_MODULE) /* class, version */
/* request objects we use */
CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
/* register config handlers (TODO: we need to implement a way to unregister them) */
- CHKiRet(regCfSysLineHdlr((uchar *)"workdirectory", 0, eCmdHdlrGetWord, NULL, &pszWorkDir, NULL, eConfObjGlobal));
+ CHKiRet(regCfSysLineHdlr((uchar *)"workdirectory", 0, eCmdHdlrGetWord, setWorkDir, NULL, NULL, eConfObjGlobal));
CHKiRet(regCfSysLineHdlr((uchar *)"dropmsgswithmaliciousdnsptrrecords", 0, eCmdHdlrBinary, NULL, &bDropMalPTRMsgs, NULL, eConfObjGlobal));
CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdriver", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvr, NULL, eConfObjGlobal));
CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdrivercafile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrCAF, NULL, eConfObjGlobal));
diff --git a/runtime/modules.h b/runtime/modules.h
index 9569512b..a78cd6f1 100644
--- a/runtime/modules.h
+++ b/runtime/modules.h
@@ -123,6 +123,7 @@ struct modInfo_s {
rsRetVal (*restoreScope)(void);
} om;
struct { /* data for library modules */
+ char dummy;
} lm;
struct { /* data for parser modules */
rsRetVal (*parse)(msg_t*);
diff --git a/runtime/msg.c b/runtime/msg.c
index ad045dec..70b20749 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -686,6 +686,7 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis)
/* initialize members in ORDER they appear in structure (think "cache line"!) */
pM->flowCtlType = 0;
pM->bDoLock = 0;
+ pM->bAlreadyFreed = 0;
pM->iRefCount = 1;
pM->iSeverity = -1;
pM->iFacility = -1;
@@ -813,6 +814,15 @@ CODESTARTobjDestruct(msg)
if(currRefCount == 0)
{
/* DEV Debugging Only! dbgprintf("msgDestruct\t0x%lx, RefCount now 0, doing DESTROY\n", (unsigned long)pThis); */
+ /* The if below is included to try to nail down a well-hidden bug causing
+ * segfaults. I hope that do to the test code the problem is sooner detected and
+ * thus we get better data for debugging and resolving it. -- rgerhards, 2011-02-23.
+ * TODO: remove when no longer needed.
+ */
+ if(pThis->bAlreadyFreed)
+ abort();
+ pThis->bAlreadyFreed = 1;
+ /* end debug code */
if(pThis->pszRawMsg != pThis->szRawMsg)
free(pThis->pszRawMsg);
freeTAG(pThis);
diff --git a/runtime/msg.h b/runtime/msg.h
index 1fd95994..01a1e059 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -63,7 +63,8 @@ struct msg {
once data has entered the queue, this property is no longer needed. */
pthread_mutex_t mut;
int iRefCount; /* reference counter (0 = unused) */
- sbool bDoLock; /* use the mutex? */
+ sbool bDoLock; /* use the mutex? */
+ sbool bAlreadyFreed; /* aid to help detect a well-hidden bad bug -- TODO: remove when no longer needed */
short iSeverity; /* the severity 0..7 */
short iFacility; /* Facility code 0 .. 23*/
short offAfterPRI; /* offset, at which raw message WITHOUT PRI part starts in pszRawMsg */
diff --git a/runtime/queue.c b/runtime/queue.c
index e4922f37..ef6e843b 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -246,6 +246,7 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis)
if(!pThis->bEnqOnly) {
if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) {
+ DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n");
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
} else {
if(getLogicalQueueSize(pThis) == 0) {
@@ -841,6 +842,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
batch_t singleBatch;
batch_obj_t batchObj;
+ int i;
DEFiRet;
//TODO: init batchObj (states _OK and new fields -- CHECK)
@@ -862,6 +864,10 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate);
+ /* delete the batch string params: TODO: create its own "class" for this */
+ for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
+ free(batchObj.staticActStrings[i]);
+ }
objDestruct(pUsr);
RETiRet;
@@ -1211,7 +1217,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
/* set some water marks so that we have useful defaults if none are set specifically */
pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */
pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */
-
pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir);
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
@@ -1497,7 +1502,7 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
* now that we dequeue batches of pointers, this is much less an issue...
* rgerhards, 2009-04-22
*/
- if(iQueueSize < pThis->iFullDlyMrk / 2) {
+ if(iQueueSize < pThis->iFullDlyMrk / 2 || glbl.GetGlobalInputTermState() == 1) {
pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk);
}
@@ -1819,6 +1824,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
uchar pszBuf[64];
+ int wrk;
uchar *qName;
size_t lenBuf;
@@ -1841,7 +1847,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
}
pthread_mutex_init(&pThis->mutThrdMgmt, NULL);
- pthread_cond_init (&pThis->condDAReady, NULL);
pthread_cond_init (&pThis->notFull, NULL);
pthread_cond_init (&pThis->notEmpty, NULL);
pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL);
@@ -1850,6 +1855,16 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
+ /* re-adjust some params if required */
+ if(pThis->bIsDA) {
+ /* if we are in DA mode, we must make sure full delayable messages do not
+ * initiate going to disk!
+ */
+ wrk = pThis->iHighWtrMrk - (pThis->iHighWtrMrk / 100) * 50; /* 50% of high water mark */
+ if(wrk < pThis->iFullDlyMrk)
+ pThis->iFullDlyMrk = wrk;
+ }
+
DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, "
"full delay %d, light delay %d, deq batch size %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
@@ -2129,7 +2144,6 @@ CODESTARTobjDestruct(qqueue)
free(pThis->mut);
}
pthread_mutex_destroy(&pThis->mutThrdMgmt);
- pthread_cond_destroy(&pThis->condDAReady);
pthread_cond_destroy(&pThis->notFull);
pthread_cond_destroy(&pThis->notEmpty);
pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
diff --git a/runtime/queue.h b/runtime/queue.h
index 38e248cd..97057180 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -124,7 +124,6 @@ struct queue_s {
pthread_cond_t notFull, notEmpty;
pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
- pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */
int bThrdStateChanged; /* at least one thread state has changed if 1 */
/* end sync variables */
/* the following variables are always present, because they
diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c
index 8baa2b59..bdb1c9ff 100644
--- a/runtime/rsyslog.c
+++ b/runtime/rsyslog.c
@@ -85,6 +85,12 @@
#include "statsobj.h"
#include "atomic.h"
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+struct sched_param default_sched_param;
+pthread_attr_t default_thread_attr;
+int default_thr_sched_policy;
+#endif
+
/* forward definitions */
static rsRetVal dfltErrLogger(int, uchar *errMsg);
@@ -139,6 +145,18 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF)
if(iRefCount == 0) {
/* init runtime only if not yet done */
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+ CHKiRet(pthread_getschedparam(pthread_self(),
+ &default_thr_sched_policy,
+ &default_sched_param));
+ CHKiRet(pthread_attr_init(&default_thread_attr));
+ CHKiRet(pthread_attr_setschedpolicy(&default_thread_attr,
+ default_thr_sched_policy));
+ CHKiRet(pthread_attr_setschedparam(&default_thread_attr,
+ &default_sched_param));
+ CHKiRet(pthread_attr_setinheritsched(&default_thread_attr,
+ PTHREAD_EXPLICIT_SCHED));
+#endif
if(ppErrObj != NULL) *ppErrObj = "obj";
CHKiRet(objClassInit(NULL)); /* *THIS* *MUST* always be the first class initilizer being called! */
CHKiRet(objGetObjInterface(pObjIF)); /* this provides the root pointer for all other queries */
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 17b20de2..9d3d0289 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -25,6 +25,7 @@
*/
#ifndef INCLUDED_RSYSLOG_H
#define INCLUDED_RSYSLOG_H
+#include <pthread.h>
#include "typedefs.h"
/* ############################################################# *
@@ -346,6 +347,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_ERR_HDFS_WRITE = -2178, /**< error writing to HDFS */
RS_RET_ERR_HDFS_OPEN = -2179, /**< error during hdfsOpen (e.g. file does not exist) */
RS_RET_FILE_NOT_SPECIFIED = -2180, /**< file name not configured where this was required */
+ RS_RET_ERR_WRKDIR = -2181, /**< problems with the rsyslog working directory */
RS_RET_INVLD_CONF_OBJ= -2200, /**< invalid config object (e.g. $Begin conf statement) */
RS_RET_ERR_LIBEE_INIT = -2201, /**< cannot obtain libee ctx */
@@ -427,6 +429,12 @@ typedef enum rsObjectID rsObjID;
#define RSFREEOBJ(x) {(x)->OID = OIDrsFreed; free(x);}
#endif
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+extern struct sched_param default_sched_param;
+extern pthread_attr_t default_thread_attr;
+extern int default_thr_sched_policy;
+#endif
+
/* for the time being, we do our own portability handling here. It
* looks like autotools either does not yet support checks for it, or
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 8162c752..f48c2c2d 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -171,35 +171,40 @@ processBatchMultiRuleset(batch_t *pBatch)
int i;
int iStart; /* start index of partial batch */
int iNew; /* index for new (temporary) batch */
+ int bHaveUnprocessed; /* do we (still) have unprocessed entries? (loop term predicate) */
DEFiRet;
- CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem));
- snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate;
-
- while(1) { /* loop broken inside */
+ do {
+ bHaveUnprocessed = 0;
/* search for first unprocessed element */
for(iStart = 0 ; iStart < pBatch->nElem && pBatch->pElem[iStart].state == BATCH_STATE_DISC ; ++iStart)
/* just search, no action */;
-
if(iStart == pBatch->nElem)
- FINALIZE; /* everything processed */
+ break; /* everything processed */
/* prepare temporary batch */
+ CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem));
+ snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate;
currRuleset = batchElemGetRuleset(pBatch, iStart);
iNew = 0;
for(i = iStart ; i < pBatch->nElem ; ++i) {
if(batchElemGetRuleset(pBatch, i) == currRuleset) {
- batchCopyElem(&(snglRuleBatch.pElem[iNew++]), &(pBatch->pElem[i]));
+ /* for performance reasons, we copy only those members that we actually need */
+ snglRuleBatch.pElem[iNew].pUsrp = pBatch->pElem[i].pUsrp;
+ snglRuleBatch.pElem[iNew].state = pBatch->pElem[i].state;
+ ++iNew;
/* We indicate the element also as done, so it will not be processed again */
pBatch->pElem[i].state = BATCH_STATE_DISC;
+ } else {
+ bHaveUnprocessed = 1;
}
}
snglRuleBatch.nElem = iNew; /* was left just right by the for loop */
batchSetSingleRuleset(&snglRuleBatch, 1);
/* process temp batch */
processBatch(&snglRuleBatch);
- }
- batchFree(&snglRuleBatch);
+ batchFree(&snglRuleBatch);
+ } while(bHaveUnprocessed == 1);
finalize_it:
RETiRet;
diff --git a/runtime/stream.c b/runtime/stream.c
index 260b59ef..24dbcc09 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -401,6 +401,12 @@ finalize_it:
* If we are monitoring a file, someone may have rotated it. In this case, we
* also need to close it and reopen it under the same name.
* rgerhards, 2008-02-13
+ * The previous code also did a check for file truncation, in which case the
+ * file was considered rewritten. However, this potential border case turned
+ * out to be a big trouble spot on busy systems. It caused massive message
+ * duplication (I guess stat() can return a too-low number under some
+ * circumstances). So starting as of now, we only check the inode number and
+ * a file change is detected only if the inode changes. -- rgerhards, 2011-01-10
*/
static rsRetVal
strmHandleEOFMonitor(strm_t *pThis)
@@ -410,23 +416,18 @@ strmHandleEOFMonitor(strm_t *pThis)
struct stat statName;
ISOBJ_TYPE_assert(pThis, strm);
- /* find inodes of both current descriptor as well as file now in file
- * system. If they are different, the file has been rotated (or
- * otherwise rewritten). We also check the size, because the inode
- * does not change if the file is truncated (this, BTW, is also a case
- * where we actually loose log lines, because we can not do anything
- * against truncation...). We do NOT rely on the time of last
- * modificaton because that may not be available under all
- * circumstances. -- rgerhards, 2008-02-13
- */
if(fstat(pThis->fd, &statOpen) == -1)
ABORT_FINALIZE(RS_RET_IO_ERROR);
if(stat((char*) pThis->pszCurrFName, &statName) == -1)
ABORT_FINALIZE(RS_RET_IO_ERROR);
- if(statOpen.st_ino == statName.st_ino && pThis->iCurrOffs == statName.st_size) {
+ DBGPRINTF("stream checking for file change on '%s', inode %u/%u",
+ pThis->pszCurrFName, (unsigned) statOpen.st_ino,
+ (unsigned) statName.st_ino);
+ if(statOpen.st_ino == statName.st_ino) {
ABORT_FINALIZE(RS_RET_EOF);
} else {
/* we had a file change! */
+ DBGPRINTF("we had a file change on '%s'\n", pThis->pszCurrFName);
CHKiRet(strmCloseFile(pThis));
CHKiRet(strmOpenFile(pThis));
}
@@ -561,39 +562,100 @@ static rsRetVal strmUnreadChar(strm_t *pThis, uchar c)
return RS_RET_OK;
}
-
-/* read a line from a strm file. A line is terminated by LF. The LF is read, but it
- * is not returned in the buffer (it is discared). The caller is responsible for
- * destruction of the returned CStr object! -- rgerhards, 2008-01-07
- * rgerhards, 2008-03-27: I now use the ppCStr directly, without any interim
- * string pointer. The reason is that this function my be called by inputs, which
- * are pthread_killed() upon termination. So if we use their native pointer, they
- * can cleanup (but only then).
+/* read a 'paragraph' from a strm file.
+ * A paragraph may be terminated by a LF, by a LFLF, or by LF<not whitespace> depending on the option set.
+ * The termination LF characters are read, but are
+ * not returned in the buffer (it is discared). The caller is responsible for
+ * destruction of the returned CStr object! -- dlang 2010-12-13
*/
static rsRetVal
-strmReadLine(strm_t *pThis, cstr_t **ppCStr)
+strmReadLine(strm_t *pThis, cstr_t **ppCStr, int mode)
{
- DEFiRet;
- uchar c;
-
- ASSERT(pThis != NULL);
- ASSERT(ppCStr != NULL);
-
- CHKiRet(cstrConstruct(ppCStr));
-
- /* now read the line */
- CHKiRet(strmReadChar(pThis, &c));
- while(c != '\n') {
- CHKiRet(cstrAppendChar(*ppCStr, c));
- CHKiRet(strmReadChar(pThis, &c));
+ /* mode = 0 single line mode (equivalent to ReadLine)
+ * mode = 1 LFLF mode (paragraph, blank line between entries)
+ * mode = 2 LF <not whitespace> mode, a log line starts at the beginning of a line, but following lines that are indented are part of the same log entry
+ * This modal interface is not nearly as flexible as being able to define a regex for when a new record starts, but it's also not nearly as hard (or as slow) to implement
+ */
+ DEFiRet;
+ uchar c;
+ uchar finished;
+
+ ASSERT(pThis != NULL);
+ ASSERT(ppCStr != NULL);
+
+ CHKiRet(cstrConstruct(ppCStr));
+
+ /* now read the line */
+ CHKiRet(strmReadChar(pThis, &c));
+ if (mode == 0){
+ while(c != '\n') {
+ CHKiRet(cstrAppendChar(*ppCStr, c));
+ CHKiRet(strmReadChar(pThis, &c));
+ }
+ CHKiRet(cstrFinalize(*ppCStr));
+ }
+ if (mode == 1){
+ finished=0;
+ while(finished == 0){
+ if(c != '\n') {
+ CHKiRet(cstrAppendChar(*ppCStr, c));
+ CHKiRet(strmReadChar(pThis, &c));
+ } else {
+ if ((((*ppCStr)->iStrLen) > 0) ){
+ if ((*ppCStr)->pBuf[(*ppCStr)->iStrLen -1 ] == '\n'){
+ rsCStrTruncate(*ppCStr,1); /* remove the prior newline */
+ finished=1;
+ } else {
+ CHKiRet(cstrAppendChar(*ppCStr, c));
+ CHKiRet(strmReadChar(pThis, &c));
+ }
+ } else {
+ finished=1; /* this is a blank line, a \n with nothing since the last complete record */
+ }
+ }
+ }
+ CHKiRet(cstrFinalize(*ppCStr));
+ }
+ if (mode == 2){
+ /* indented follow-up lines */
+ finished=0;
+ while(finished == 0){
+ if ((*ppCStr)->iStrLen == 0){
+ if(c != '\n') {
+ /* nothing in the buffer, and it's not a newline, add it to the buffer */
+ CHKiRet(cstrAppendChar(*ppCStr, c));
+ CHKiRet(strmReadChar(pThis, &c));
+ } else {
+ finished=1; /* this is a blank line, a \n with nothing since the last complete record */
+ }
+ } else {
+ if ((*ppCStr)->pBuf[(*ppCStr)->iStrLen -1 ] != '\n'){
+ /* not the first character after a newline, add it to the buffer */
+ CHKiRet(cstrAppendChar(*ppCStr, c));
+ CHKiRet(strmReadChar(pThis, &c));
+ } else {
+ if ((c == ' ') || (c == '\t')){
+ CHKiRet(cstrAppendChar(*ppCStr, c));
+ CHKiRet(strmReadChar(pThis, &c));
+ } else {
+ /* clean things up by putting the character we just read back into
+ * the input buffer and removing the LF character that is currently at the
+ * end of the output string */
+ CHKiRet(strmUnreadChar(pThis, c));
+ rsCStrTruncate(*ppCStr,1);
+ finished=1;
+ }
+ }
+ }
+ }
+ CHKiRet(cstrFinalize(*ppCStr));
}
- CHKiRet(cstrFinalize(*ppCStr));
finalize_it:
- if(iRet != RS_RET_OK && *ppCStr != NULL)
- cstrDestruct(ppCStr);
+ if(iRet != RS_RET_OK && *ppCStr != NULL)
+ cstrDestruct(ppCStr);
- RETiRet;
+ RETiRet;
}
@@ -669,7 +731,13 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
}
pThis->pIOBuf = pThis->asyncBuf[0].pBuf;
pThis->bStopWriter = 0;
- if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0)
+ if(pthread_create(&pThis->writerThreadID,
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+ &default_thread_attr,
+#else
+ NULL,
+#endif
+ asyncWriterThread, pThis) != 0)
DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis);
} else {
/* we work synchronously, so we need to alloc a fixed pIOBuf */
diff --git a/runtime/stream.h b/runtime/stream.h
index 37e9d570..60c68cb2 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -156,7 +156,6 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetFileName)(strm_t *pThis, uchar *pszName, size_t iLenName);
rsRetVal (*ReadChar)(strm_t *pThis, uchar *pC);
rsRetVal (*UnreadChar)(strm_t *pThis, uchar c);
- rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr);
rsRetVal (*SeekCurrOffs)(strm_t *pThis);
rsRetVal (*Write)(strm_t *pThis, uchar *pBuf, size_t lenBuf);
rsRetVal (*WriteChar)(strm_t *pThis, uchar c);
@@ -183,8 +182,10 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
INTERFACEpropSetMeth(strm, iSizeLimit, off_t);
INTERFACEpropSetMeth(strm, iFlushInterval, int);
INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*);
+ /* v6 added */
+ rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr, int mode);
ENDinterface(strm)
-#define strmCURR_IF_VERSION 5 /* increment whenever you change the interface structure! */
+#define strmCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */
/* prototypes */
diff --git a/runtime/stringbuf.c b/runtime/stringbuf.c
index f4a9caae..d8c5923b 100644
--- a/runtime/stringbuf.c
+++ b/runtime/stringbuf.c
@@ -185,7 +185,7 @@ rsRetVal
rsCStrExtendBuf(cstr_t *pThis, size_t iMinNeeded)
{
uchar *pNewBuf;
- unsigned short iNewSize;
+ size_t iNewSize;
DEFiRet;
/* first compute the new size needed */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index ece80911..e615fb19 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -90,6 +90,12 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pthread_mutex_init(&pThis->mutWtp, NULL);
pthread_cond_init(&pThis->condThrdTrm, NULL);
pthread_attr_init(&pThis->attrThrd);
+ /* Set thread scheduling policy to default */
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+ pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy);
+ pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param);
+ pthread_attr_setinheritsched(&pThis->attrThrd, PTHREAD_EXPLICIT_SCHED);
+#endif
pthread_attr_setdetachstate(&pThis->attrThrd, PTHREAD_CREATE_DETACHED);
/* set all function pointers to "not implemented" dummy so that we can safely call them */
pThis->pfChkStopWrkr = NotImplementedDummy;