From bf8125f4e96a011ec28cc58b225bb815f72fc53c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 20 Jul 2009 12:18:20 +0200 Subject: bugfix: minor static memory leak while reading configuration This did NOT leak based on message volume. Also, did some cleanup during the commit. --- ChangeLog | 10 ++++++---- runtime/queue.c | 1 - runtime/wti.c | 7 ++----- runtime/wtp.c | 25 ++++++++++++------------- runtime/wtp.h | 1 + template.c | 5 ++++- 6 files changed, 25 insertions(+), 24 deletions(-) diff --git a/ChangeLog b/ChangeLog index 887aba00..a3316ddc 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,14 +1,16 @@ --------------------------------------------------------------------------- Version 5.1.3 [DEVEL] (rgerhards), 2009-07-?? +- architecture change: queue now always has at least one worker thread + if not running in direct mode. Previous versions could run without + any active workers. This simplifies the code at a very small expense. + See v5 compatibility note document for more in-depth discussion. - enhance: UDP spoofing supported via new output module omudpspoof See the omudpspoof documentation for details and samples - bugfix: message could be truncated after TAG, often when forwarding This was a result of an internal processing error if maximum field sizes had been specified in the property replacer. -- architecture change: queue now always has at least one worker thread - if not running in direct mode. Previous versions could run without - any active workers. This simplifies the code at a very small expense. - See v5 compatibility note document for more in-depth discussion. +- bugfix: minor static memory leak while reading configuration + did NOT leak based on message volume --------------------------------------------------------------------------- Version 5.1.2 [DEVEL] (rgerhards), 2009-07-08 - bugfix: properties inputname, fromhost, fromhost-ip, msg were lost when diff --git a/runtime/queue.c b/runtime/queue.c index 78859e8d..7590af18 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1956,7 +1956,6 @@ ChkStopWrkrReg(qqueue_t *pThis) { DEFiRet; if(pThis->bEnqOnly) { -dbgprintf("XXX: terminate_NOW queue:Reg worker: enqOnly! queue size %d\n", getPhysicalQueueSize(pThis)); iRet = RS_RET_TERMINATE_NOW; } else if(pThis->pqParent != NULL) { iRet = RS_RET_TERMINATE_WHEN_IDLE; diff --git a/runtime/wti.c b/runtime/wti.c index 91c63ffe..900e1cba 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -192,7 +192,7 @@ wtiWorkerCancelCleanup(void *arg) DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); - /* call user supplied handler (that one e.g. requeues the element) */ + /* call user supplied handler */ pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp); ENDfunc @@ -215,7 +215,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); d_pthread_mutex_lock(pWtp->pmutUsr); -RUNLOG_VAR("%d", pThis->bAlwaysRunning); if(pThis->bAlwaysRunning) { /* never shut down any started worker */ d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); @@ -321,7 +320,6 @@ wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg) if(pThis->pszDbgHdr != NULL) { free(pThis->pszDbgHdr); - pThis->pszDbgHdr = NULL; } if((pThis->pszDbgHdr = malloc(sizeof(uchar) * lenMsg + 1)) == NULL) @@ -355,6 +353,5 @@ BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most CHKiRet(objUse(glbl, CORE_COMPONENT)); ENDObjClassInit(wti) -/* - * vi:set ai: +/* vi:set ai: */ diff --git a/runtime/wtp.c b/runtime/wtp.c index 23726802..af4c7621 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -89,6 +89,8 @@ static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; } BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */ pthread_mutex_init(&pThis->mutWtp, NULL); pthread_cond_init(&pThis->condThrdTrm, NULL); + pthread_attr_init(&pThis->attrThrd); + pthread_attr_setdetachstate(&pThis->attrThrd, PTHREAD_CREATE_DETACHED); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; pThis->pfGetDeqBatchSize = NotImplementedDummy; @@ -152,6 +154,7 @@ CODESTARTobjDestruct(wtp) /* actual destruction */ pthread_cond_destroy(&pThis->condThrdTrm); pthread_mutex_destroy(&pThis->mutWtp); + pthread_attr_destroy(&pThis->attrThrd); free(pThis->pszDbgHdr); ENDobjDestruct(wtp) @@ -284,9 +287,9 @@ wtpCancelAll(wtp_t *pThis) } -/* cancellation cleanup handler for executing worker - * decrements the worker counter - * rgerhards, 2008-01-20 +/* cancellation cleanup handler for executing worker decrements the worker counter. + * This is also called when the the worker is normally shut down. + * rgerhards, 2009-07-20 */ static void wtpWrkrExecCancelCleanup(void *arg) @@ -299,6 +302,7 @@ wtpWrkrExecCancelCleanup(void *arg) pThis = pWti->pWtp; ISOBJ_TYPE_assert(pThis, wtp); + /* the order of the next two statements is important! */ wtiSetState(pWti, WRKTHRD_STOPPED); ATOMIC_DEC(pThis->iCurNumWrkThrd); pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ @@ -352,18 +356,16 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in /* start a new worker */ static rsRetVal -wtpStartWrkr(wtp_t *pThis, int bLockMutex) +wtpStartWrkr(wtp_t *pThis) { - DEFVARS_mutexProtection; wti_t *pWti; int i; int iState; - pthread_attr_t attr; DEFiRet; ISOBJ_TYPE_assert(pThis, wtp); - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mutWtp, bLockMutex); + d_pthread_mutex_lock(&pThis->mutWtp); /* find free spot in thread table. */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { @@ -381,17 +383,14 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) pWti = pThis->pWrkr[i]; wtiSetState(pWti, WRKTHRD_RUNNING); - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti); - pthread_attr_destroy(&attr); /* TODO: we could globally reuse such an attribute 2009-07-08 */ + iState = pthread_create(&(pWti->thrdID), &pThis->attrThrd, wtpWorker, (void*) pWti); ATOMIC_INC(pThis->iCurNumWrkThrd); /* we got one more! */ DBGPRINTF("%s: started with state %d, num workers now %d\n", wtpGetDbgHdr(pThis), iState, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd)); finalize_it: - END_MTX_PROTECTED_OPERATIONS(&pThis->mutWtp); + d_pthread_mutex_unlock(&pThis->mutWtp); RETiRet; } @@ -425,7 +424,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); /* start the rqtd nbr of workers */ for(i = 0 ; i < nMissing ; ++i) { - CHKiRet(wtpStartWrkr(pThis, LOCK_MUTEX)); + CHKiRet(wtpStartWrkr(pThis)); } } else { pthread_cond_signal(pThis->pcondBusy); diff --git a/runtime/wtp.h b/runtime/wtp.h index 358ced3a..0505b91c 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -55,6 +55,7 @@ struct wtp_s { /* end sync variables */ /* user objects */ void *pUsr; /* pointer to user object (in this case, the queue the wtp belongs to) */ + pthread_attr_t attrThrd;/* attribute for new threads (created just once and cached here) */ pthread_mutex_t *pmutUsr; pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */ rsRetVal (*pfChkStopWrkr)(void *pUsr, int); diff --git a/template.c b/template.c index 0116e782..f3a8e057 100644 --- a/template.c +++ b/template.c @@ -566,8 +566,11 @@ static int do_Parameter(unsigned char **pp, struct template *pTpl) /* got the name */ cstrFinalize(pStrB); - if(propNameToID(pStrB, &pTpe->data.field.propid) != RS_RET_OK) + if(propNameToID(pStrB, &pTpe->data.field.propid) != RS_RET_OK) { + cstrDestruct(&pStrB); return 1; + } + cstrDestruct(&pStrB); /* Check frompos, if it has an R, then topos should be a regex */ if(*p == ':') { -- cgit