From 16ecb90c3ac88bb2261c31c990d88f97f1a1b32f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Jun 2009 13:44:51 +0200 Subject: omfile buffers are now synchronized after inactivity This is the first shot at this functionality. Currently, we run off a fixed counter in the rsyslogd mainloop, which needs to be restructured. But this code works, so it is a good time for a commit. --- runtime/apc.c | 400 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 400 insertions(+) create mode 100644 runtime/apc.c (limited to 'runtime/apc.c') diff --git a/runtime/apc.c b/runtime/apc.c new file mode 100644 index 00000000..b0b5f298 --- /dev/null +++ b/runtime/apc.c @@ -0,0 +1,400 @@ +/* apc.c - asynchronous procedure call support + * + * An asynchronous procedure call (APC) is a procedure call (guess what) that is potentially run + * asynchronously to its main thread. It can be scheduled to occur at a caller-provided time. + * As long as the procedure has not been called, the APC entry may be modified by the caller + * or deleted. It is the caller's purpose to make sure proper synchronization is in place. + * The APC object only case about APC's own control structures (which *are* properly + * guarded by synchronization primitives). + * + * Module begun 2009-06-15 by Rainer Gerhards + * + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ + +#include "config.h" +#include +#include +#include +#include + +#include "rsyslog.h" +#include "obj.h" +#include "apc.h" +#include "srUtils.h" + +/* static data */ +DEFobjStaticHelpers + +/* following is a used to implement a monotonically increasing id for the apcs. That + * ID can be used to cancel an apc request. Note that the ID is generated with modulo + * arithmetic, so at some point, it will wrap. Howerver, this happens at 2^32-1 at + * earliest, so this is not considered a problem. + */ +apc_id_t apcID = 0; + +/* private data structures */ + +/* the apc list and its entries + * This is a doubly-linked list as we need to be able to do inserts + * and deletes right in the middle of the list. It is inspired by the + * Unix callout mechanism. + * Note that we support two generic caller-provided parameters as + * experience shows that at most two are often used. This causes very + * little overhead, but simplifies caller code in cases where exactly + * two parameters are needed. We hope this is a useful optimizaton. + * rgerhards, 2009-06-15 + */ +typedef struct apc_list_s { + struct apc_list_s *pNext; + struct apc_list_s *pPrev; + apc_id_t id; + apc_t *pApc; /* pointer to the APC object to be scheduled */ +} apc_list_t; + +apc_list_t *apcListRoot = NULL; +apc_list_t *apcListTail = NULL; +pthread_mutex_t listMutex; /* needs to be locked for all list operations */ + + +/* destructor for the apc object */ +BEGINobjDestruct(apc) /* be sure to specify the object type also in END and CODESTART macros! */ +CODESTARTobjDestruct(apc) +ENDobjDestruct(apc) + + +/* ------------------------------ APC list handling functions ------------------------------ */ + +/* Function that handles changes to the list root. Most importantly, this function + * needs to schedule a new timer. It is OK to call this function with an empty list. + */ +static rsRetVal +listRootChanged(void) +{ + DEFiRet; + + if(apcListRoot == NULL) + FINALIZE; + + // TODO: implement! + +finalize_it: + RETiRet; +} + + +/* insert an apc entry into the APC list. The same entry MUST NOT already be present! + */ +static rsRetVal +insertApc(apc_t *pThis, apc_id_t *pID) +{ + apc_list_t *pCurr; + apc_list_t *pNew; + DEFiRet; + + CHKmalloc(pNew = (apc_list_t*) calloc(1, sizeof(apc_list_t))); + pNew->pApc = pThis; + pNew->id = *pID = apcID++; +dbgprintf("insert apc %p, id %ld\n", pThis, pNew->id); + + /* find right list location */ + if(apcListRoot == NULL) { + /* no need to search, list is empty */ + apcListRoot = pNew; + apcListTail = pNew; + CHKiRet(listRootChanged()); + } else { + for(pCurr = apcListRoot ; pCurr != NULL ; pCurr = pCurr->pNext) { + if(pCurr->pApc->ttExec > pThis->ttExec) + break; + } + + if(pCurr == NULL) { + /* insert at tail */ + pNew->pPrev = apcListTail; + apcListTail->pNext = pNew; + apcListTail = pNew; + } else { + if(pCurr == apcListRoot) { + /* new first entry */ + pCurr->pPrev = pNew; + pNew->pNext = pCurr; + apcListRoot = pNew; + CHKiRet(listRootChanged()); + } else { + /* in the middle of the list */ + pCurr->pPrev = pNew; + pNew->pNext = pCurr; + } + } + } + + +finalize_it: + RETiRet; +} + + +/* Delete an apc entry from the APC list. It is OK if the entry is not found, + * in this case we assume it already has been processed. + */ +static rsRetVal +deleteApc(apc_id_t id) +{ + apc_list_t *pCurr; + DEFiRet; + +dbgprintf("trying to delete apc %ld\n", id); + for(pCurr = apcListRoot ; pCurr != NULL ; pCurr = pCurr->pNext) { + if(pCurr->id == id) { +RUNLOG_STR("apc id found, now deleting!\n"); + if(pCurr == apcListRoot) { + apcListRoot = pCurr->pNext; + CHKiRet(listRootChanged()); + } else { + pCurr->pPrev->pNext = pCurr->pNext; + } + if(pCurr->pNext == NULL) { + apcListTail = pCurr->pPrev; + } else { + pCurr->pNext->pPrev = pCurr->pPrev; + } + free(pCurr); + pCurr = NULL; + break; + } + } + +finalize_it: + RETiRet; +} + + +/* unlist all elements up to the current timestamp. Return this as a seperate list + * to the caller. Returns an empty (NULL ptr) list if there are no such elements. + * The caller must handle that gracefully. The list is returned in the parameter. + */ +static rsRetVal +unlistCurrent(apc_list_t **ppList) +{ + apc_list_t *pCurr; + time_t tCurr; + DEFiRet; + assert(ppList != NULL); + + time(&tCurr); + + if(apcListRoot == NULL || apcListRoot->pApc->ttExec > tCurr) { + *ppList = NULL; + FINALIZE; + } + + *ppList = apcListRoot; + /* now search up to which entry we need to execute */ + for(pCurr = apcListRoot ; pCurr != NULL && pCurr->pApc->ttExec <= tCurr ; pCurr = pCurr->pNext) { + /*JUST SKIP TO LAST ELEMENT*/; + } + + if(pCurr == NULL) { + /* all elements can be unlisted */ + apcListRoot = NULL; + apcListTail = NULL; + } else { + /* need to set a new root */ + pCurr->pPrev->pNext = NULL; /* terminate newly unlisted list */ + pCurr->pPrev = NULL; /* we are the new root */ + apcListRoot = pCurr; + } + +finalize_it: + RETiRet; +} + + +/* ------------------------------ END APC list handling functions ------------------------------ */ + + +/* execute all list elements that are currently scheduled for execution. We do this in two phases. + * In the first phase, we look the list mutex and move everything from the head of the queue to + * the current timestamp to a new to-be-executed list. Then we unlock the mutex and do the actual + * exec (which may take some time). + * Note that the caller is responsible for proper + * caller-level synchronization. The caller may schedule another Apc, this module must + * ensure that (and it does so by not locking the list mutex while we call the Apc). + * Note: this function "consumes" the apc_t, so it is no longer existing after this + * function returns. + */ +// TODO make static and associated with our own pthread-based timer +rsRetVal +execScheduled(void) +{ + apc_list_t *pExecList; + apc_list_t *pCurr; + apc_list_t *pNext; + DEFVARS_mutexProtection_uncond; + DEFiRet; + + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + iRet = unlistCurrent(&pExecList); + END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + CHKiRet(iRet); + + DBGPRINTF("running apc scheduler - we have %s to execute\n", + pExecList == NULL ? "nothing" : "something"); + for(pCurr = pExecList ; pCurr != NULL ; pCurr = pNext) { +dbgprintf("executing apc list entry %p, apc %p\n", pCurr, pCurr->pApc); + pNext = pCurr->pNext; + pCurr->pApc->pProc(pCurr->pApc->param1, pCurr->pApc->param2); + apcDestruct(&pCurr->pApc); + free(pCurr); + } + +finalize_it: + RETiRet; +} + + +/* Standard-Constructor + */ +BEGINobjConstruct(apc) /* be sure to specify the object type also in END macro! */ +ENDobjConstruct(apc) + + +/* ConstructionFinalizer + * Note that we use a non-standard calling interface: pID returns the current APC + * id. This is the only way to handle the situation without the need for extra + * locking. + * rgerhards, 2008-01-09 + */ +static rsRetVal +apcConstructFinalize(apc_t *pThis, apc_id_t *pID) +{ + DEFVARS_mutexProtection_uncond; + DEFiRet; + ISOBJ_TYPE_assert(pThis, apc); + assert(pID != NULL); + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + insertApc(pThis, pID); + END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); +RUNLOG_STR("apcConstructFinalize post mutex unlock\n"); + RETiRet; +} + + +/* some set methods */ +static rsRetVal +SetProcedure(apc_t *pThis, void (*pProc)(void*, void*)) +{ + ISOBJ_TYPE_assert(pThis, apc); + pThis->pProc = pProc; + return RS_RET_OK; +} +static rsRetVal +SetParam1(apc_t *pThis, void *param1) +{ + ISOBJ_TYPE_assert(pThis, apc); + pThis->param1 = param1; + return RS_RET_OK; +} +static rsRetVal +SetParam2(apc_t *pThis, void *param2) +{ + ISOBJ_TYPE_assert(pThis, apc); + pThis->param1 = param2; + return RS_RET_OK; +} + + +/* cancel an Apc request, ID is provided. It is OK if the ID can not be found, this may + * happen if the Apc was executed in the mean time. So it is safe to call CancelApc() at + * any time. + */ +static rsRetVal +CancelApc(apc_id_t id) +{ + DEFVARS_mutexProtection_uncond; + + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + deleteApc(id); + END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + return RS_RET_OK; +} + + +/* debugprint for the apc object */ +BEGINobjDebugPrint(apc) /* be sure to specify the object type also in END and CODESTART macros! */ +CODESTARTobjDebugPrint(apc) + dbgoprint((obj_t*) pThis, "APC module, currently no state info available\n"); +ENDobjDebugPrint(apc) + + +/* queryInterface function + */ +BEGINobjQueryInterface(apc) +CODESTARTobjQueryInterface(apc) + if(pIf->ifVersion != apcCURR_IF_VERSION) { /* check for current version, increment on each change */ + ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED); + } + + /* ok, we have the right interface, so let's fill it + * Please note that we may also do some backwards-compatibility + * work here (if we can support an older interface version - that, + * of course, also affects the "if" above). + */ + pIf->Construct = apcConstruct; + pIf->ConstructFinalize = apcConstructFinalize; + pIf->Destruct = apcDestruct; + pIf->DebugPrint = apcDebugPrint; + pIf->CancelApc = CancelApc; + pIf->SetProcedure = SetProcedure; + pIf->SetParam1 = SetParam1; + pIf->SetParam2 = SetParam2; +finalize_it: +ENDobjQueryInterface(apc) + + +/* Exit the apc class. + * rgerhards, 2009-04-06 + */ +BEGINObjClassExit(apc, OBJ_IS_CORE_MODULE) /* class, version */ + //objRelease(apcstk, CORE_COMPONENT); + pthread_mutex_destroy(&listMutex); +ENDObjClassExit(apc) + + +/* Initialize the apc class. Must be called as the very first method + * before anything else is called inside this class. + * rgerhards, 2008-02-19 + */ +BEGINObjClassInit(apc, 1, OBJ_IS_CORE_MODULE) /* class, version */ + /* request objects we use */ + //CHKiRet(objUse(apcstk, CORE_COMPONENT)); + + /* set our own handlers */ + OBJSetMethodHandler(objMethod_DEBUGPRINT, apcDebugPrint); + OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, apcConstructFinalize); + + /* do other initializations */ + pthread_mutex_init(&listMutex, NULL); +ENDObjClassInit(apc) + +/* vi:set ai: + */ -- cgit