From bb79e96dc300fa5a2182e7c047afb3b15c5dc870 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 12 May 2009 15:27:40 +0200 Subject: moving to a cleaner implementation of batches ... now that we know what we need from a theoretical POV. --- runtime/Makefile.am | 1 + runtime/batch.h | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++++ runtime/debug.c | 14 +++++++----- runtime/queue.c | 49 +++++++++++++++++++++++++---------------- runtime/queue.h | 5 +++-- runtime/rsyslog.h | 2 ++ runtime/wti.c | 10 ++++----- runtime/wti.h | 14 ++---------- 8 files changed, 114 insertions(+), 44 deletions(-) create mode 100644 runtime/batch.h (limited to 'runtime') diff --git a/runtime/Makefile.am b/runtime/Makefile.am index 2f0a1aa0..f0fb1cdd 100644 --- a/runtime/Makefile.am +++ b/runtime/Makefile.am @@ -8,6 +8,7 @@ librsyslog_la_SOURCES = \ rsyslog.c \ rsyslog.h \ atomic.h \ + batch.h \ syslogd-types.h \ module-template.h \ obj-types.h \ diff --git a/runtime/batch.h b/runtime/batch.h new file mode 100644 index 00000000..cb40cf42 --- /dev/null +++ b/runtime/batch.h @@ -0,0 +1,63 @@ +/* Definition of the batch_t data structure. + * I am not sure yet if this will become a full-blown object. For now, this header just + * includes the object definition and is not accompanied by code. + * + * Copyright 2009 by Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ + +#ifndef BATCH_H_INCLUDED +#define BATCH_H_INCLUDED + +/* enum for batch states. Actually, we violate a layer here, in that we assume that a batch is used + * for action processing. So far, this seems acceptable, the status is simply ignored inside the + * main message queue. But over time, it could potentially be useful to split the two. + * rgerhad, 2009-05-12 + */ +typedef enum { + BATCH_STATE_RDY = 0, /* object ready for processing */ + BATCH_STATE_BAD = 1, /* unrecoverable failure while processing, do NOT resubmit to same action */ + BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unkonwn */ + BATCH_STATE_DISC = 3, /* discarded - processed OK, but do not submit to any other action */ +} batch_state_t; + + +/* an object inside a batch, including any information (state!) needed for it to "life". + */ +struct batch_obj_s { + obj_t *pUsrp; /* pointer to user object (most often message) */ + batch_state_t state; /* associated state */ +}; + +/* the batch + * This object is used to dequeue multiple user pointers which are than handed over + * to processing. The size of elements is fixed after queue creation, but may be + * modified by config variables (better said: queue properties). + * Note that a "user pointer" in rsyslog context so far always is a message + * object. We stick to the more generic term because queues may potentially hold + * other types of objects, too. + * rgerhards, 2009-05-12 + */ +struct batch_s { + int nElem; /* actual number of element in this entry */ + batch_obj_t *pElem; /* batch elements */ +}; + +#endif /* #ifndef BATCH_H_INCLUDED */ diff --git a/runtime/debug.c b/runtime/debug.c index 4ee90226..4f45a1e3 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -1050,7 +1050,9 @@ int dbgEntrFunc(dbgFuncDB_t **ppFuncDB, const char *file, const char *func, int /* when we reach this point, we have a fully-initialized FuncDB! */ ATOMIC_INC(pFuncDB->nTimesCalled); if(bLogFuncFlow && dbgPrintNameIsInList((const uchar*)pFuncDB->file, printNameFileRoot)) - dbgprintf("%s:%d: %s: enter\n", pFuncDB->file, pFuncDB->line, pFuncDB->func); + if(strcmp(pFuncDB->file, "stringbuf.c")) { /* TODO: make configurable */ + dbgprintf("%s:%d: %s: enter\n", pFuncDB->file, pFuncDB->line, pFuncDB->func); + } if(pThrd->stackPtr >= (int) (sizeof(pThrd->callStack) / sizeof(dbgFuncDB_t*))) { dbgprintf("%s:%d: %s: debug module: call stack for this thread full, suspending call tracking\n", pFuncDB->file, pFuncDB->line, pFuncDB->func); @@ -1080,10 +1082,12 @@ void dbgExitFunc(dbgFuncDB_t *pFuncDB, int iStackPtrRestore, int iRet) dbgFuncDBPrintActiveMutexes(pFuncDB, "WARNING: mutex still owned by us as we exit function, mutex: ", pthread_self()); if(bLogFuncFlow && dbgPrintNameIsInList((const uchar*)pFuncDB->file, printNameFileRoot)) { - if(iRet == RS_RET_NO_IRET) - dbgprintf("%s:%d: %s: exit: (no iRet)\n", pFuncDB->file, pFuncDB->line, pFuncDB->func); - else - dbgprintf("%s:%d: %s: exit: %d\n", pFuncDB->file, pFuncDB->line, pFuncDB->func, iRet); + if(strcmp(pFuncDB->file, "stringbuf.c")) { /* TODO: make configurable */ + if(iRet == RS_RET_NO_IRET) + dbgprintf("%s:%d: %s: exit: (no iRet)\n", pFuncDB->file, pFuncDB->line, pFuncDB->func); + else + dbgprintf("%s:%d: %s: exit: %d\n", pFuncDB->file, pFuncDB->line, pFuncDB->func, iRet); + } } pThrd->stackPtr = iStackPtrRestore; if(pThrd->stackPtr < 0) { diff --git a/runtime/queue.c b/runtime/queue.c index c2df928b..c3a8e9d4 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -65,7 +65,7 @@ DEFobjStaticHelpers DEFobjCurrIf(glbl) /* forward-definitions */ -rsRetVal qqueueChkPersist(qqueue_t *pThis); +static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); @@ -896,8 +896,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) { - aUsrp_t aUsrp; - obj_t *pMsgp; + batch_t singleBatch; + batch_obj_t batchObj; DEFiRet; ASSERT(pThis != NULL); @@ -907,17 +907,19 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) * mode the consumer probably has a lot to convey (which get's lost in the other modes * because they are asynchronous. But direct mode is deliberately synchronous. * rgerhards, 2008-02-12 - * We use our knowledge about the aUsrp_t structure below, but without that, we + * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - pMsgp = (obj_t*) pUsr; - aUsrp.nElem = 1; /* there always is only one in direct mode */ - aUsrp.pUsrp = &pMsgp; - iRet = pThis->pConsumer(pThis->pUsr, &aUsrp); + batchObj.state = BATCH_STATE_RDY; + batchObj.pUsrp = (obj_t*) pUsr; + singleBatch.nElem = 1; /* there always is only one in direct mode */ + singleBatch.pElem = &batchObj; + iRet = pThis->pConsumer(pThis->pUsr, &singleBatch); RETiRet; } + static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) { return RS_RET_OK; @@ -1247,7 +1249,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*)) { DEFiRet; qqueue_t *pThis; @@ -1402,11 +1404,12 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize rsRetVal localRet; DEFiRet; + /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */ + nDequeued = 0; do { dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDel(pThis, &pUsr)); - qqueueChkPersist(pThis); /* is is questionable if we should really need to call this every time... */ iQueueSize = qqueueGetOverallQueueSize(pThis); /* check if we should discard this element */ @@ -1417,11 +1420,15 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); ABORT_FINALIZE(localRet); /* all well, use this element */ - pWti->paUsrp->pUsrp[nDequeued++] = pUsr; + pWti->batch.pElem[nDequeued].pUsrp = pUsr; + pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; + ++nDequeued; } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize); + qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */ + //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ - pWti->paUsrp->nElem = nDequeued; + pWti->batch.nElem = nDequeued; *iRemainingQueueSize = iQueueSize; finalize_it: @@ -1582,7 +1589,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp)); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -1619,8 +1626,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); /* iterate over returned results and enqueue them in DA queue */ - for(i = 0 ; i < pWti->paUsrp->nElem ; i++) - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i])); + for(i = 0 ; i < pWti->batch.nElem ; i++) + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp)); finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); @@ -1974,17 +1981,21 @@ finalize_it: /* check if we need to persist the current queue info. If an - * error occurs, thus should be ignored by caller (but we still + * error occurs, this should be ignored by caller (but we still * abide to our regular call interface)... * rgerhards, 2008-01-13 + * nUpdates is the number of updates since the last call to this function. + * It may be > 1 due to batches. -- rgerhards, 2009-05-12 */ -rsRetVal qqueueChkPersist(qqueue_t *pThis) +static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); + assert(nUpdates > 0); - if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) { + pThis->iUpdsSincePersist += nUpdates; + if(pThis->iPersistUpdCnt && pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) { qqueuePersist(pThis, QUEUE_CHECKPOINT); pThis->iUpdsSincePersist = 0; } @@ -2199,7 +2210,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* and finally enqueue the message */ CHKiRet(qqueueAdd(pThis, pUsr)); - qqueueChkPersist(pThis); + qqueueChkPersist(pThis, 1); finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { diff --git a/runtime/queue.h b/runtime/queue.h index 8a60254b..4a5f16a1 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -27,6 +27,7 @@ #include #include "obj.h" #include "wtp.h" +#include "batch.h" #include "stream.h" /* queue types */ @@ -100,7 +101,7 @@ typedef struct queue_s { * the user really wanted...). -- rgerhards, 2008-04-02 */ /* end dequeue time window */ - rsRetVal (*pConsumer)(void *,aUsrp_t*); /* user-supplied consumer function for dequeued messages */ + rsRetVal (*pConsumer)(void *,batch_t*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 * is pointer to an array of message message pointers) @@ -184,7 +185,7 @@ rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*)); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int); diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index ee941b2b..53a510b3 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -95,6 +95,8 @@ typedef struct permittedPeers_s permittedPeers_t; /* this should go away in the typedef struct permittedPeerWildcard_s permittedPeerWildcard_t; /* this should go away in the long term -- rgerhards, 2008-05-19 */ typedef struct tcpsrv_s tcpsrv_t; typedef struct vmstk_s vmstk_t; +typedef struct batch_obj_s batch_obj_t; +typedef struct batch_s batch_t; typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */ /* some universal 64 bit define... */ diff --git a/runtime/wti.c b/runtime/wti.c index 346ef7aa..c3fa127e 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -201,8 +201,7 @@ CODESTARTobjDestruct(wti) pthread_cond_destroy(&pThis->condExitDone); pthread_mutex_destroy(&pThis->mut); - free(pThis->paUsrp->pUsrp); - free(pThis->paUsrp); + free(pThis->batch.pElem); free(pThis->pszDbgHdr); ENDobjDestruct(wti) @@ -234,8 +233,7 @@ wtiConstructFinalize(wti_t *pThis) /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); - CHKmalloc(pThis->paUsrp = calloc(1, sizeof(aUsrp_t))); - CHKmalloc(pThis->paUsrp->pUsrp = calloc((size_t)iDeqBatchSize, sizeof(void*))); + CHKmalloc(pThis->batch.pElem = calloc((size_t)iDeqBatchSize, sizeof(batch_obj_t*))); finalize_it: RETiRet; @@ -322,7 +320,7 @@ wtiWorkerCancelCleanup(void *arg) /* call user supplied handler (that one e.g. requeues the element) */ // MULTIQUEUE: need to change here! - pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->paUsrp->pUsrp[0]); + pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pWtp->mut); @@ -374,7 +372,7 @@ wtiWorker(wti_t *pThis) ISOBJ_TYPE_assert(pWtp, wtp); dbgSetThrdName(pThis->pszDbgHdr); - pThis->paUsrp->nElem = 0; /* flag no elements present */ // MULTIQUEUE: do we really need this any longer (cnacel handeler)? + pThis->batch.nElem = 0; /* flag no elements present */ // MULTIQUEUE: do we really need this any longer (cnacel handeler)? pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); diff --git a/runtime/wti.h b/runtime/wti.h index 85c98fe6..0990941e 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -27,17 +27,7 @@ #include #include "wtp.h" #include "obj.h" - -/* the user pointer array object - * This object is used to dequeue multiple user pointers which are than handed over - * to processing. The size of elements is fixed after queue creation, but may be - * modified by config variables (better said: queue properties). - * rgerhards, 2009-04-22 - */ -struct aUsrp_s { - int nElem; /* actual number of element in this entry */ - obj_t **pUsrp; /* actual elements (array!) */ -}; +#include "batch.h" /* the worker thread instance class */ @@ -46,11 +36,11 @@ typedef struct wti_s { int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */ pthread_t thrdID; /* thread ID */ qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ - aUsrp_t *paUsrp; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ pthread_cond_t condExitDone; /* signaled when the thread exit is done (once per thread existance) */ pthread_mutex_t mut; int bShutdownRqtd; /* shutdown for this thread requested? 0 - no , 1 - yes */ + batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ } wti_t; -- cgit