diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/batch.h | 8 | ||||
-rw-r--r-- | runtime/datetime.h | 1 | ||||
-rw-r--r-- | runtime/errmsg.h | 1 | ||||
-rw-r--r-- | runtime/expr.h | 1 | ||||
-rw-r--r-- | runtime/glbl.c | 37 | ||||
-rw-r--r-- | runtime/modules.h | 1 | ||||
-rw-r--r-- | runtime/msg.c | 10 | ||||
-rw-r--r-- | runtime/msg.h | 3 | ||||
-rw-r--r-- | runtime/queue.c | 22 | ||||
-rw-r--r-- | runtime/queue.h | 1 | ||||
-rw-r--r-- | runtime/rsyslog.c | 18 | ||||
-rw-r--r-- | runtime/rsyslog.h | 8 | ||||
-rw-r--r-- | runtime/ruleset.c | 23 | ||||
-rw-r--r-- | runtime/stream.c | 142 | ||||
-rw-r--r-- | runtime/stream.h | 5 | ||||
-rw-r--r-- | runtime/stringbuf.c | 2 | ||||
-rw-r--r-- | runtime/wtp.c | 6 |
17 files changed, 232 insertions, 57 deletions
diff --git a/runtime/batch.h b/runtime/batch.h index d0504f2b..944889bd 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -136,11 +136,16 @@ batchIsValidElem(batch_t *pBatch, int i) { /* copy one batch element to another. * This creates a complete duplicate in those cases where * it is needed. Use duplication only when absolutely necessary! + * Note that all working fields are reset to zeros. If that were + * not done, we would have potential problems with invalid + * or double pointer frees. * rgerhards, 2010-06-10 */ static inline void batchCopyElem(batch_obj_t *pDest, batch_obj_t *pSrc) { - memcpy(pDest, pSrc, sizeof(batch_obj_t)); + memset(pDest, 0, sizeof(batch_obj_t)); + pDest->pUsrp = pSrc->pUsrp; + pDest->state = pSrc->state; } @@ -171,6 +176,7 @@ batchFree(batch_t *pBatch) { static inline rsRetVal batchInit(batch_t *pBatch, int maxElem) { DEFiRet; + pBatch->iDoneUpTo = 0; pBatch->maxElem = maxElem; CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t))); // TODO: replace calloc by inidividual writes? diff --git a/runtime/datetime.h b/runtime/datetime.h index 82bd415b..70bbf416 100644 --- a/runtime/datetime.h +++ b/runtime/datetime.h @@ -28,6 +28,7 @@ /* the datetime object */ typedef struct datetime_s { + char dummy; } datetime_t; diff --git a/runtime/errmsg.h b/runtime/errmsg.h index 799954fb..ac7018b3 100644 --- a/runtime/errmsg.h +++ b/runtime/errmsg.h @@ -30,6 +30,7 @@ /* the errmsg object */ typedef struct errmsg_s { + char dummy; } errmsg_t; diff --git a/runtime/expr.h b/runtime/expr.h index 974b71ec..1afe1a1f 100644 --- a/runtime/expr.h +++ b/runtime/expr.h @@ -30,6 +30,7 @@ /* a node inside an expression tree */ typedef struct exprNode_s { + char dummy; } exprNode_t; diff --git a/runtime/glbl.c b/runtime/glbl.c index 7dc17df4..0114b1ac 100644 --- a/runtime/glbl.c +++ b/runtime/glbl.c @@ -31,6 +31,9 @@ #include "config.h" #include <stdlib.h> #include <sys/socket.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> #include <assert.h> #include "rsyslog.h" @@ -40,6 +43,7 @@ #include "glbl.h" #include "prop.h" #include "atomic.h" +#include "errmsg.h" /* some defaults */ #ifndef DFLT_NETSTRM_DRVR @@ -49,6 +53,7 @@ /* static data */ DEFobjStaticHelpers DEFobjCurrIf(prop) +DEFobjCurrIf(errmsg) /* static data * For this object, these variables are obviously what makes the "meat" of the @@ -147,6 +152,35 @@ static void SetGlobalInputTermination(void) } +/* This function is used to set the global work directory name. + * It verifies that the provided directory actually exists and + * emits an error message if not. + * rgerhards, 2011-02-16 + */ +static rsRetVal setWorkDir(void __attribute__((unused)) *pVal, uchar *pNewVal) +{ + DEFiRet; + struct stat sb; + + if(stat((char*) pNewVal, &sb) != 0) { + errmsg.LogError(0, RS_RET_ERR_WRKDIR, "$WorkDirectory: %s can not be " + "accessed, probably does not exist - directive ignored", pNewVal); + ABORT_FINALIZE(RS_RET_ERR_WRKDIR); + } + + if(!S_ISDIR(sb.st_mode)) { + errmsg.LogError(0, RS_RET_ERR_WRKDIR, "$WorkDirectory: %s not a directory - directive ignored", + pNewVal); + ABORT_FINALIZE(RS_RET_ERR_WRKDIR); + } + + free(pszWorkDir); + pszWorkDir = pNewVal; + +finalize_it: + RETiRet; +} + /* return our local hostname. if it is not set, "[localhost]" is returned */ static uchar* @@ -354,9 +388,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a BEGINAbstractObjClassInit(glbl, 1, OBJ_IS_CORE_MODULE) /* class, version */ /* request objects we use */ CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); /* register config handlers (TODO: we need to implement a way to unregister them) */ - CHKiRet(regCfSysLineHdlr((uchar *)"workdirectory", 0, eCmdHdlrGetWord, NULL, &pszWorkDir, NULL, eConfObjGlobal)); + CHKiRet(regCfSysLineHdlr((uchar *)"workdirectory", 0, eCmdHdlrGetWord, setWorkDir, NULL, NULL, eConfObjGlobal)); CHKiRet(regCfSysLineHdlr((uchar *)"dropmsgswithmaliciousdnsptrrecords", 0, eCmdHdlrBinary, NULL, &bDropMalPTRMsgs, NULL, eConfObjGlobal)); CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdriver", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvr, NULL, eConfObjGlobal)); CHKiRet(regCfSysLineHdlr((uchar *)"defaultnetstreamdrivercafile", 0, eCmdHdlrGetWord, NULL, &pszDfltNetstrmDrvrCAF, NULL, eConfObjGlobal)); diff --git a/runtime/modules.h b/runtime/modules.h index 9569512b..a78cd6f1 100644 --- a/runtime/modules.h +++ b/runtime/modules.h @@ -123,6 +123,7 @@ struct modInfo_s { rsRetVal (*restoreScope)(void); } om; struct { /* data for library modules */ + char dummy; } lm; struct { /* data for parser modules */ rsRetVal (*parse)(msg_t*); diff --git a/runtime/msg.c b/runtime/msg.c index ad045dec..70b20749 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -686,6 +686,7 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis) /* initialize members in ORDER they appear in structure (think "cache line"!) */ pM->flowCtlType = 0; pM->bDoLock = 0; + pM->bAlreadyFreed = 0; pM->iRefCount = 1; pM->iSeverity = -1; pM->iFacility = -1; @@ -813,6 +814,15 @@ CODESTARTobjDestruct(msg) if(currRefCount == 0) { /* DEV Debugging Only! dbgprintf("msgDestruct\t0x%lx, RefCount now 0, doing DESTROY\n", (unsigned long)pThis); */ + /* The if below is included to try to nail down a well-hidden bug causing + * segfaults. I hope that do to the test code the problem is sooner detected and + * thus we get better data for debugging and resolving it. -- rgerhards, 2011-02-23. + * TODO: remove when no longer needed. + */ + if(pThis->bAlreadyFreed) + abort(); + pThis->bAlreadyFreed = 1; + /* end debug code */ if(pThis->pszRawMsg != pThis->szRawMsg) free(pThis->pszRawMsg); freeTAG(pThis); diff --git a/runtime/msg.h b/runtime/msg.h index 1fd95994..01a1e059 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -63,7 +63,8 @@ struct msg { once data has entered the queue, this property is no longer needed. */ pthread_mutex_t mut; int iRefCount; /* reference counter (0 = unused) */ - sbool bDoLock; /* use the mutex? */ + sbool bDoLock; /* use the mutex? */ + sbool bAlreadyFreed; /* aid to help detect a well-hidden bad bug -- TODO: remove when no longer needed */ short iSeverity; /* the severity 0..7 */ short iFacility; /* Facility code 0 .. 23*/ short offAfterPRI; /* offset, at which raw message WITHOUT PRI part starts in pszRawMsg */ diff --git a/runtime/queue.c b/runtime/queue.c index e4922f37..ef6e843b 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -246,6 +246,7 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis) if(!pThis->bEnqOnly) { if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) { + DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n"); wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } else { if(getLogicalQueueSize(pThis) == 0) { @@ -841,6 +842,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) { batch_t singleBatch; batch_obj_t batchObj; + int i; DEFiRet; //TODO: init batchObj (states _OK and new fields -- CHECK) @@ -862,6 +864,10 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate); + /* delete the batch string params: TODO: create its own "class" for this */ + for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { + free(batchObj.staticActStrings[i]); + } objDestruct(pUsr); RETiRet; @@ -1211,7 +1217,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread /* set some water marks so that we have useful defaults if none are set specifically */ pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */ pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */ - pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir); pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->iQueueSize = 0; @@ -1497,7 +1502,7 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) * now that we dequeue batches of pointers, this is much less an issue... * rgerhards, 2009-04-22 */ - if(iQueueSize < pThis->iFullDlyMrk / 2) { + if(iQueueSize < pThis->iFullDlyMrk / 2 || glbl.GetGlobalInputTermState() == 1) { pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk); } @@ -1819,6 +1824,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; uchar pszBuf[64]; + int wrk; uchar *qName; size_t lenBuf; @@ -1841,7 +1847,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ } pthread_mutex_init(&pThis->mutThrdMgmt, NULL); - pthread_cond_init (&pThis->condDAReady, NULL); pthread_cond_init (&pThis->notFull, NULL); pthread_cond_init (&pThis->notEmpty, NULL); pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL); @@ -1850,6 +1855,16 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ + /* re-adjust some params if required */ + if(pThis->bIsDA) { + /* if we are in DA mode, we must make sure full delayable messages do not + * initiate going to disk! + */ + wrk = pThis->iHighWtrMrk - (pThis->iHighWtrMrk / 100) * 50; /* 50% of high water mark */ + if(wrk < pThis->iFullDlyMrk) + pThis->iFullDlyMrk = wrk; + } + DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, " "full delay %d, light delay %d, deq batch size %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, @@ -2129,7 +2144,6 @@ CODESTARTobjDestruct(qqueue) free(pThis->mut); } pthread_mutex_destroy(&pThis->mutThrdMgmt); - pthread_cond_destroy(&pThis->condDAReady); pthread_cond_destroy(&pThis->notFull); pthread_cond_destroy(&pThis->notEmpty); pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); diff --git a/runtime/queue.h b/runtime/queue.h index 38e248cd..97057180 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -124,7 +124,6 @@ struct queue_s { pthread_cond_t notFull, notEmpty; pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ - pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */ int bThrdStateChanged; /* at least one thread state has changed if 1 */ /* end sync variables */ /* the following variables are always present, because they diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c index 8baa2b59..bdb1c9ff 100644 --- a/runtime/rsyslog.c +++ b/runtime/rsyslog.c @@ -85,6 +85,12 @@ #include "statsobj.h" #include "atomic.h" +#ifdef HAVE_PTHREAD_SETSCHEDPARAM +struct sched_param default_sched_param; +pthread_attr_t default_thread_attr; +int default_thr_sched_policy; +#endif + /* forward definitions */ static rsRetVal dfltErrLogger(int, uchar *errMsg); @@ -139,6 +145,18 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF) if(iRefCount == 0) { /* init runtime only if not yet done */ +#ifdef HAVE_PTHREAD_SETSCHEDPARAM + CHKiRet(pthread_getschedparam(pthread_self(), + &default_thr_sched_policy, + &default_sched_param)); + CHKiRet(pthread_attr_init(&default_thread_attr)); + CHKiRet(pthread_attr_setschedpolicy(&default_thread_attr, + default_thr_sched_policy)); + CHKiRet(pthread_attr_setschedparam(&default_thread_attr, + &default_sched_param)); + CHKiRet(pthread_attr_setinheritsched(&default_thread_attr, + PTHREAD_EXPLICIT_SCHED)); +#endif if(ppErrObj != NULL) *ppErrObj = "obj"; CHKiRet(objClassInit(NULL)); /* *THIS* *MUST* always be the first class initilizer being called! */ CHKiRet(objGetObjInterface(pObjIF)); /* this provides the root pointer for all other queries */ diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 17b20de2..9d3d0289 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -25,6 +25,7 @@ */ #ifndef INCLUDED_RSYSLOG_H #define INCLUDED_RSYSLOG_H +#include <pthread.h> #include "typedefs.h" /* ############################################################# * @@ -346,6 +347,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_ERR_HDFS_WRITE = -2178, /**< error writing to HDFS */ RS_RET_ERR_HDFS_OPEN = -2179, /**< error during hdfsOpen (e.g. file does not exist) */ RS_RET_FILE_NOT_SPECIFIED = -2180, /**< file name not configured where this was required */ + RS_RET_ERR_WRKDIR = -2181, /**< problems with the rsyslog working directory */ RS_RET_INVLD_CONF_OBJ= -2200, /**< invalid config object (e.g. $Begin conf statement) */ RS_RET_ERR_LIBEE_INIT = -2201, /**< cannot obtain libee ctx */ @@ -427,6 +429,12 @@ typedef enum rsObjectID rsObjID; #define RSFREEOBJ(x) {(x)->OID = OIDrsFreed; free(x);} #endif +#ifdef HAVE_PTHREAD_SETSCHEDPARAM +extern struct sched_param default_sched_param; +extern pthread_attr_t default_thread_attr; +extern int default_thr_sched_policy; +#endif + /* for the time being, we do our own portability handling here. It * looks like autotools either does not yet support checks for it, or diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 8162c752..f48c2c2d 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -171,35 +171,40 @@ processBatchMultiRuleset(batch_t *pBatch) int i; int iStart; /* start index of partial batch */ int iNew; /* index for new (temporary) batch */ + int bHaveUnprocessed; /* do we (still) have unprocessed entries? (loop term predicate) */ DEFiRet; - CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem)); - snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate; - - while(1) { /* loop broken inside */ + do { + bHaveUnprocessed = 0; /* search for first unprocessed element */ for(iStart = 0 ; iStart < pBatch->nElem && pBatch->pElem[iStart].state == BATCH_STATE_DISC ; ++iStart) /* just search, no action */; - if(iStart == pBatch->nElem) - FINALIZE; /* everything processed */ + break; /* everything processed */ /* prepare temporary batch */ + CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem)); + snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate; currRuleset = batchElemGetRuleset(pBatch, iStart); iNew = 0; for(i = iStart ; i < pBatch->nElem ; ++i) { if(batchElemGetRuleset(pBatch, i) == currRuleset) { - batchCopyElem(&(snglRuleBatch.pElem[iNew++]), &(pBatch->pElem[i])); + /* for performance reasons, we copy only those members that we actually need */ + snglRuleBatch.pElem[iNew].pUsrp = pBatch->pElem[i].pUsrp; + snglRuleBatch.pElem[iNew].state = pBatch->pElem[i].state; + ++iNew; /* We indicate the element also as done, so it will not be processed again */ pBatch->pElem[i].state = BATCH_STATE_DISC; + } else { + bHaveUnprocessed = 1; } } snglRuleBatch.nElem = iNew; /* was left just right by the for loop */ batchSetSingleRuleset(&snglRuleBatch, 1); /* process temp batch */ processBatch(&snglRuleBatch); - } - batchFree(&snglRuleBatch); + batchFree(&snglRuleBatch); + } while(bHaveUnprocessed == 1); finalize_it: RETiRet; diff --git a/runtime/stream.c b/runtime/stream.c index 260b59ef..24dbcc09 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -401,6 +401,12 @@ finalize_it: * If we are monitoring a file, someone may have rotated it. In this case, we * also need to close it and reopen it under the same name. * rgerhards, 2008-02-13 + * The previous code also did a check for file truncation, in which case the + * file was considered rewritten. However, this potential border case turned + * out to be a big trouble spot on busy systems. It caused massive message + * duplication (I guess stat() can return a too-low number under some + * circumstances). So starting as of now, we only check the inode number and + * a file change is detected only if the inode changes. -- rgerhards, 2011-01-10 */ static rsRetVal strmHandleEOFMonitor(strm_t *pThis) @@ -410,23 +416,18 @@ strmHandleEOFMonitor(strm_t *pThis) struct stat statName; ISOBJ_TYPE_assert(pThis, strm); - /* find inodes of both current descriptor as well as file now in file - * system. If they are different, the file has been rotated (or - * otherwise rewritten). We also check the size, because the inode - * does not change if the file is truncated (this, BTW, is also a case - * where we actually loose log lines, because we can not do anything - * against truncation...). We do NOT rely on the time of last - * modificaton because that may not be available under all - * circumstances. -- rgerhards, 2008-02-13 - */ if(fstat(pThis->fd, &statOpen) == -1) ABORT_FINALIZE(RS_RET_IO_ERROR); if(stat((char*) pThis->pszCurrFName, &statName) == -1) ABORT_FINALIZE(RS_RET_IO_ERROR); - if(statOpen.st_ino == statName.st_ino && pThis->iCurrOffs == statName.st_size) { + DBGPRINTF("stream checking for file change on '%s', inode %u/%u", + pThis->pszCurrFName, (unsigned) statOpen.st_ino, + (unsigned) statName.st_ino); + if(statOpen.st_ino == statName.st_ino) { ABORT_FINALIZE(RS_RET_EOF); } else { /* we had a file change! */ + DBGPRINTF("we had a file change on '%s'\n", pThis->pszCurrFName); CHKiRet(strmCloseFile(pThis)); CHKiRet(strmOpenFile(pThis)); } @@ -561,39 +562,100 @@ static rsRetVal strmUnreadChar(strm_t *pThis, uchar c) return RS_RET_OK; } - -/* read a line from a strm file. A line is terminated by LF. The LF is read, but it - * is not returned in the buffer (it is discared). The caller is responsible for - * destruction of the returned CStr object! -- rgerhards, 2008-01-07 - * rgerhards, 2008-03-27: I now use the ppCStr directly, without any interim - * string pointer. The reason is that this function my be called by inputs, which - * are pthread_killed() upon termination. So if we use their native pointer, they - * can cleanup (but only then). +/* read a 'paragraph' from a strm file. + * A paragraph may be terminated by a LF, by a LFLF, or by LF<not whitespace> depending on the option set. + * The termination LF characters are read, but are + * not returned in the buffer (it is discared). The caller is responsible for + * destruction of the returned CStr object! -- dlang 2010-12-13 */ static rsRetVal -strmReadLine(strm_t *pThis, cstr_t **ppCStr) +strmReadLine(strm_t *pThis, cstr_t **ppCStr, int mode) { - DEFiRet; - uchar c; - - ASSERT(pThis != NULL); - ASSERT(ppCStr != NULL); - - CHKiRet(cstrConstruct(ppCStr)); - - /* now read the line */ - CHKiRet(strmReadChar(pThis, &c)); - while(c != '\n') { - CHKiRet(cstrAppendChar(*ppCStr, c)); - CHKiRet(strmReadChar(pThis, &c)); + /* mode = 0 single line mode (equivalent to ReadLine) + * mode = 1 LFLF mode (paragraph, blank line between entries) + * mode = 2 LF <not whitespace> mode, a log line starts at the beginning of a line, but following lines that are indented are part of the same log entry + * This modal interface is not nearly as flexible as being able to define a regex for when a new record starts, but it's also not nearly as hard (or as slow) to implement + */ + DEFiRet; + uchar c; + uchar finished; + + ASSERT(pThis != NULL); + ASSERT(ppCStr != NULL); + + CHKiRet(cstrConstruct(ppCStr)); + + /* now read the line */ + CHKiRet(strmReadChar(pThis, &c)); + if (mode == 0){ + while(c != '\n') { + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } + CHKiRet(cstrFinalize(*ppCStr)); + } + if (mode == 1){ + finished=0; + while(finished == 0){ + if(c != '\n') { + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } else { + if ((((*ppCStr)->iStrLen) > 0) ){ + if ((*ppCStr)->pBuf[(*ppCStr)->iStrLen -1 ] == '\n'){ + rsCStrTruncate(*ppCStr,1); /* remove the prior newline */ + finished=1; + } else { + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } + } else { + finished=1; /* this is a blank line, a \n with nothing since the last complete record */ + } + } + } + CHKiRet(cstrFinalize(*ppCStr)); + } + if (mode == 2){ + /* indented follow-up lines */ + finished=0; + while(finished == 0){ + if ((*ppCStr)->iStrLen == 0){ + if(c != '\n') { + /* nothing in the buffer, and it's not a newline, add it to the buffer */ + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } else { + finished=1; /* this is a blank line, a \n with nothing since the last complete record */ + } + } else { + if ((*ppCStr)->pBuf[(*ppCStr)->iStrLen -1 ] != '\n'){ + /* not the first character after a newline, add it to the buffer */ + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } else { + if ((c == ' ') || (c == '\t')){ + CHKiRet(cstrAppendChar(*ppCStr, c)); + CHKiRet(strmReadChar(pThis, &c)); + } else { + /* clean things up by putting the character we just read back into + * the input buffer and removing the LF character that is currently at the + * end of the output string */ + CHKiRet(strmUnreadChar(pThis, c)); + rsCStrTruncate(*ppCStr,1); + finished=1; + } + } + } + } + CHKiRet(cstrFinalize(*ppCStr)); } - CHKiRet(cstrFinalize(*ppCStr)); finalize_it: - if(iRet != RS_RET_OK && *ppCStr != NULL) - cstrDestruct(ppCStr); + if(iRet != RS_RET_OK && *ppCStr != NULL) + cstrDestruct(ppCStr); - RETiRet; + RETiRet; } @@ -669,7 +731,13 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } pThis->pIOBuf = pThis->asyncBuf[0].pBuf; pThis->bStopWriter = 0; - if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0) + if(pthread_create(&pThis->writerThreadID, +#ifdef HAVE_PTHREAD_SETSCHEDPARAM + &default_thread_attr, +#else + NULL, +#endif + asyncWriterThread, pThis) != 0) DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis); } else { /* we work synchronously, so we need to alloc a fixed pIOBuf */ diff --git a/runtime/stream.h b/runtime/stream.h index 37e9d570..60c68cb2 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -156,7 +156,6 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */ rsRetVal (*SetFileName)(strm_t *pThis, uchar *pszName, size_t iLenName); rsRetVal (*ReadChar)(strm_t *pThis, uchar *pC); rsRetVal (*UnreadChar)(strm_t *pThis, uchar c); - rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr); rsRetVal (*SeekCurrOffs)(strm_t *pThis); rsRetVal (*Write)(strm_t *pThis, uchar *pBuf, size_t lenBuf); rsRetVal (*WriteChar)(strm_t *pThis, uchar c); @@ -183,8 +182,10 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */ INTERFACEpropSetMeth(strm, iSizeLimit, off_t); INTERFACEpropSetMeth(strm, iFlushInterval, int); INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*); + /* v6 added */ + rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr, int mode); ENDinterface(strm) -#define strmCURR_IF_VERSION 5 /* increment whenever you change the interface structure! */ +#define strmCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */ /* prototypes */ diff --git a/runtime/stringbuf.c b/runtime/stringbuf.c index f4a9caae..d8c5923b 100644 --- a/runtime/stringbuf.c +++ b/runtime/stringbuf.c @@ -185,7 +185,7 @@ rsRetVal rsCStrExtendBuf(cstr_t *pThis, size_t iMinNeeded) { uchar *pNewBuf; - unsigned short iNewSize; + size_t iNewSize; DEFiRet; /* first compute the new size needed */ diff --git a/runtime/wtp.c b/runtime/wtp.c index ece80911..e615fb19 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -90,6 +90,12 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pthread_mutex_init(&pThis->mutWtp, NULL); pthread_cond_init(&pThis->condThrdTrm, NULL); pthread_attr_init(&pThis->attrThrd); + /* Set thread scheduling policy to default */ +#ifdef HAVE_PTHREAD_SETSCHEDPARAM + pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy); + pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param); + pthread_attr_setinheritsched(&pThis->attrThrd, PTHREAD_EXPLICIT_SCHED); +#endif pthread_attr_setdetachstate(&pThis->attrThrd, PTHREAD_CREATE_DETACHED); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; |