From 16ecb90c3ac88bb2261c31c990d88f97f1a1b32f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 15 Jun 2009 13:44:51 +0200 Subject: omfile buffers are now synchronized after inactivity This is the first shot at this functionality. Currently, we run off a fixed counter in the rsyslogd mainloop, which needs to be restructured. But this code works, so it is a good time for a commit. --- runtime/Makefile.am | 2 + runtime/apc.c | 400 ++++++++++++++++++++++++++++++++++++++++++++++++++++ runtime/apc.h | 56 ++++++++ runtime/obj.c | 2 + runtime/srUtils.h | 13 ++ runtime/stream.c | 68 +++++++++ runtime/stream.h | 10 +- tools/omfile.c | 7 + tools/syslogd.c | 44 +----- 9 files changed, 559 insertions(+), 43 deletions(-) create mode 100644 runtime/apc.c create mode 100644 runtime/apc.h diff --git a/runtime/Makefile.am b/runtime/Makefile.am index de0daac4..c2ef7cfa 100644 --- a/runtime/Makefile.am +++ b/runtime/Makefile.am @@ -39,6 +39,8 @@ librsyslog_la_SOURCES = \ obj.h \ modules.c \ modules.h \ + apc.c \ + apc.h \ sync.c \ sync.h \ expr.c \ diff --git a/runtime/apc.c b/runtime/apc.c new file mode 100644 index 00000000..b0b5f298 --- /dev/null +++ b/runtime/apc.c @@ -0,0 +1,400 @@ +/* apc.c - asynchronous procedure call support + * + * An asynchronous procedure call (APC) is a procedure call (guess what) that is potentially run + * asynchronously to its main thread. It can be scheduled to occur at a caller-provided time. + * As long as the procedure has not been called, the APC entry may be modified by the caller + * or deleted. It is the caller's purpose to make sure proper synchronization is in place. + * The APC object only case about APC's own control structures (which *are* properly + * guarded by synchronization primitives). + * + * Module begun 2009-06-15 by Rainer Gerhards + * + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ + +#include "config.h" +#include +#include +#include +#include + +#include "rsyslog.h" +#include "obj.h" +#include "apc.h" +#include "srUtils.h" + +/* static data */ +DEFobjStaticHelpers + +/* following is a used to implement a monotonically increasing id for the apcs. That + * ID can be used to cancel an apc request. Note that the ID is generated with modulo + * arithmetic, so at some point, it will wrap. Howerver, this happens at 2^32-1 at + * earliest, so this is not considered a problem. + */ +apc_id_t apcID = 0; + +/* private data structures */ + +/* the apc list and its entries + * This is a doubly-linked list as we need to be able to do inserts + * and deletes right in the middle of the list. It is inspired by the + * Unix callout mechanism. + * Note that we support two generic caller-provided parameters as + * experience shows that at most two are often used. This causes very + * little overhead, but simplifies caller code in cases where exactly + * two parameters are needed. We hope this is a useful optimizaton. + * rgerhards, 2009-06-15 + */ +typedef struct apc_list_s { + struct apc_list_s *pNext; + struct apc_list_s *pPrev; + apc_id_t id; + apc_t *pApc; /* pointer to the APC object to be scheduled */ +} apc_list_t; + +apc_list_t *apcListRoot = NULL; +apc_list_t *apcListTail = NULL; +pthread_mutex_t listMutex; /* needs to be locked for all list operations */ + + +/* destructor for the apc object */ +BEGINobjDestruct(apc) /* be sure to specify the object type also in END and CODESTART macros! */ +CODESTARTobjDestruct(apc) +ENDobjDestruct(apc) + + +/* ------------------------------ APC list handling functions ------------------------------ */ + +/* Function that handles changes to the list root. Most importantly, this function + * needs to schedule a new timer. It is OK to call this function with an empty list. + */ +static rsRetVal +listRootChanged(void) +{ + DEFiRet; + + if(apcListRoot == NULL) + FINALIZE; + + // TODO: implement! + +finalize_it: + RETiRet; +} + + +/* insert an apc entry into the APC list. The same entry MUST NOT already be present! + */ +static rsRetVal +insertApc(apc_t *pThis, apc_id_t *pID) +{ + apc_list_t *pCurr; + apc_list_t *pNew; + DEFiRet; + + CHKmalloc(pNew = (apc_list_t*) calloc(1, sizeof(apc_list_t))); + pNew->pApc = pThis; + pNew->id = *pID = apcID++; +dbgprintf("insert apc %p, id %ld\n", pThis, pNew->id); + + /* find right list location */ + if(apcListRoot == NULL) { + /* no need to search, list is empty */ + apcListRoot = pNew; + apcListTail = pNew; + CHKiRet(listRootChanged()); + } else { + for(pCurr = apcListRoot ; pCurr != NULL ; pCurr = pCurr->pNext) { + if(pCurr->pApc->ttExec > pThis->ttExec) + break; + } + + if(pCurr == NULL) { + /* insert at tail */ + pNew->pPrev = apcListTail; + apcListTail->pNext = pNew; + apcListTail = pNew; + } else { + if(pCurr == apcListRoot) { + /* new first entry */ + pCurr->pPrev = pNew; + pNew->pNext = pCurr; + apcListRoot = pNew; + CHKiRet(listRootChanged()); + } else { + /* in the middle of the list */ + pCurr->pPrev = pNew; + pNew->pNext = pCurr; + } + } + } + + +finalize_it: + RETiRet; +} + + +/* Delete an apc entry from the APC list. It is OK if the entry is not found, + * in this case we assume it already has been processed. + */ +static rsRetVal +deleteApc(apc_id_t id) +{ + apc_list_t *pCurr; + DEFiRet; + +dbgprintf("trying to delete apc %ld\n", id); + for(pCurr = apcListRoot ; pCurr != NULL ; pCurr = pCurr->pNext) { + if(pCurr->id == id) { +RUNLOG_STR("apc id found, now deleting!\n"); + if(pCurr == apcListRoot) { + apcListRoot = pCurr->pNext; + CHKiRet(listRootChanged()); + } else { + pCurr->pPrev->pNext = pCurr->pNext; + } + if(pCurr->pNext == NULL) { + apcListTail = pCurr->pPrev; + } else { + pCurr->pNext->pPrev = pCurr->pPrev; + } + free(pCurr); + pCurr = NULL; + break; + } + } + +finalize_it: + RETiRet; +} + + +/* unlist all elements up to the current timestamp. Return this as a seperate list + * to the caller. Returns an empty (NULL ptr) list if there are no such elements. + * The caller must handle that gracefully. The list is returned in the parameter. + */ +static rsRetVal +unlistCurrent(apc_list_t **ppList) +{ + apc_list_t *pCurr; + time_t tCurr; + DEFiRet; + assert(ppList != NULL); + + time(&tCurr); + + if(apcListRoot == NULL || apcListRoot->pApc->ttExec > tCurr) { + *ppList = NULL; + FINALIZE; + } + + *ppList = apcListRoot; + /* now search up to which entry we need to execute */ + for(pCurr = apcListRoot ; pCurr != NULL && pCurr->pApc->ttExec <= tCurr ; pCurr = pCurr->pNext) { + /*JUST SKIP TO LAST ELEMENT*/; + } + + if(pCurr == NULL) { + /* all elements can be unlisted */ + apcListRoot = NULL; + apcListTail = NULL; + } else { + /* need to set a new root */ + pCurr->pPrev->pNext = NULL; /* terminate newly unlisted list */ + pCurr->pPrev = NULL; /* we are the new root */ + apcListRoot = pCurr; + } + +finalize_it: + RETiRet; +} + + +/* ------------------------------ END APC list handling functions ------------------------------ */ + + +/* execute all list elements that are currently scheduled for execution. We do this in two phases. + * In the first phase, we look the list mutex and move everything from the head of the queue to + * the current timestamp to a new to-be-executed list. Then we unlock the mutex and do the actual + * exec (which may take some time). + * Note that the caller is responsible for proper + * caller-level synchronization. The caller may schedule another Apc, this module must + * ensure that (and it does so by not locking the list mutex while we call the Apc). + * Note: this function "consumes" the apc_t, so it is no longer existing after this + * function returns. + */ +// TODO make static and associated with our own pthread-based timer +rsRetVal +execScheduled(void) +{ + apc_list_t *pExecList; + apc_list_t *pCurr; + apc_list_t *pNext; + DEFVARS_mutexProtection_uncond; + DEFiRet; + + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + iRet = unlistCurrent(&pExecList); + END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + CHKiRet(iRet); + + DBGPRINTF("running apc scheduler - we have %s to execute\n", + pExecList == NULL ? "nothing" : "something"); + for(pCurr = pExecList ; pCurr != NULL ; pCurr = pNext) { +dbgprintf("executing apc list entry %p, apc %p\n", pCurr, pCurr->pApc); + pNext = pCurr->pNext; + pCurr->pApc->pProc(pCurr->pApc->param1, pCurr->pApc->param2); + apcDestruct(&pCurr->pApc); + free(pCurr); + } + +finalize_it: + RETiRet; +} + + +/* Standard-Constructor + */ +BEGINobjConstruct(apc) /* be sure to specify the object type also in END macro! */ +ENDobjConstruct(apc) + + +/* ConstructionFinalizer + * Note that we use a non-standard calling interface: pID returns the current APC + * id. This is the only way to handle the situation without the need for extra + * locking. + * rgerhards, 2008-01-09 + */ +static rsRetVal +apcConstructFinalize(apc_t *pThis, apc_id_t *pID) +{ + DEFVARS_mutexProtection_uncond; + DEFiRet; + ISOBJ_TYPE_assert(pThis, apc); + assert(pID != NULL); + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + insertApc(pThis, pID); + END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); +RUNLOG_STR("apcConstructFinalize post mutex unlock\n"); + RETiRet; +} + + +/* some set methods */ +static rsRetVal +SetProcedure(apc_t *pThis, void (*pProc)(void*, void*)) +{ + ISOBJ_TYPE_assert(pThis, apc); + pThis->pProc = pProc; + return RS_RET_OK; +} +static rsRetVal +SetParam1(apc_t *pThis, void *param1) +{ + ISOBJ_TYPE_assert(pThis, apc); + pThis->param1 = param1; + return RS_RET_OK; +} +static rsRetVal +SetParam2(apc_t *pThis, void *param2) +{ + ISOBJ_TYPE_assert(pThis, apc); + pThis->param1 = param2; + return RS_RET_OK; +} + + +/* cancel an Apc request, ID is provided. It is OK if the ID can not be found, this may + * happen if the Apc was executed in the mean time. So it is safe to call CancelApc() at + * any time. + */ +static rsRetVal +CancelApc(apc_id_t id) +{ + DEFVARS_mutexProtection_uncond; + + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + deleteApc(id); + END_MTX_PROTECTED_OPERATIONS_UNCOND(&listMutex); + return RS_RET_OK; +} + + +/* debugprint for the apc object */ +BEGINobjDebugPrint(apc) /* be sure to specify the object type also in END and CODESTART macros! */ +CODESTARTobjDebugPrint(apc) + dbgoprint((obj_t*) pThis, "APC module, currently no state info available\n"); +ENDobjDebugPrint(apc) + + +/* queryInterface function + */ +BEGINobjQueryInterface(apc) +CODESTARTobjQueryInterface(apc) + if(pIf->ifVersion != apcCURR_IF_VERSION) { /* check for current version, increment on each change */ + ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED); + } + + /* ok, we have the right interface, so let's fill it + * Please note that we may also do some backwards-compatibility + * work here (if we can support an older interface version - that, + * of course, also affects the "if" above). + */ + pIf->Construct = apcConstruct; + pIf->ConstructFinalize = apcConstructFinalize; + pIf->Destruct = apcDestruct; + pIf->DebugPrint = apcDebugPrint; + pIf->CancelApc = CancelApc; + pIf->SetProcedure = SetProcedure; + pIf->SetParam1 = SetParam1; + pIf->SetParam2 = SetParam2; +finalize_it: +ENDobjQueryInterface(apc) + + +/* Exit the apc class. + * rgerhards, 2009-04-06 + */ +BEGINObjClassExit(apc, OBJ_IS_CORE_MODULE) /* class, version */ + //objRelease(apcstk, CORE_COMPONENT); + pthread_mutex_destroy(&listMutex); +ENDObjClassExit(apc) + + +/* Initialize the apc class. Must be called as the very first method + * before anything else is called inside this class. + * rgerhards, 2008-02-19 + */ +BEGINObjClassInit(apc, 1, OBJ_IS_CORE_MODULE) /* class, version */ + /* request objects we use */ + //CHKiRet(objUse(apcstk, CORE_COMPONENT)); + + /* set our own handlers */ + OBJSetMethodHandler(objMethod_DEBUGPRINT, apcDebugPrint); + OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, apcConstructFinalize); + + /* do other initializations */ + pthread_mutex_init(&listMutex, NULL); +ENDObjClassInit(apc) + +/* vi:set ai: + */ diff --git a/runtime/apc.h b/runtime/apc.h new file mode 100644 index 00000000..7c679b97 --- /dev/null +++ b/runtime/apc.h @@ -0,0 +1,56 @@ +/* The apc object. + * + * See apc.c for more information. + * + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ +#ifndef INCLUDED_APC_H +#define INCLUDED_APC_H + +/* the apc object */ +typedef struct apc_s { + BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ + time_t ttExec; /* when to call procedure (so far seconds...) */ + void (*pProc)(void*, void*); /* which procedure to call */ + void *param1; /* user-supplied parameters */ + void *param2; /* user-supplied parameters */ +} apc_t; + +typedef unsigned long apc_id_t; /* monotonically incrementing apc ID */ + +/* interfaces */ +BEGINinterface(apc) /* name must also be changed in ENDinterface macro! */ + INTERFACEObjDebugPrint(apc); + rsRetVal (*Construct)(apc_t **ppThis); + rsRetVal (*ConstructFinalize)(apc_t *pThis, apc_id_t *); + rsRetVal (*Destruct)(apc_t **ppThis); + rsRetVal (*SetProcedure)(apc_t *pThis, void (*pProc)(void*, void*)); + rsRetVal (*SetParam1)(apc_t *pThis, void *); + rsRetVal (*SetParam2)(apc_t *pThis, void *); + rsRetVal (*CancelApc)(apc_id_t); +ENDinterface(apc) +#define apcCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ + + +/* prototypes */ +PROTOTYPEObj(apc); + +#endif /* #ifndef INCLUDED_APC_H */ diff --git a/runtime/obj.c b/runtime/obj.c index f38b1d7f..41991853 100644 --- a/runtime/obj.c +++ b/runtime/obj.c @@ -88,6 +88,7 @@ #include "errmsg.h" #include "cfsysline.h" #include "unicode-helper.h" +#include "apc.h" /* static data */ DEFobjCurrIf(obj) /* we define our own interface, as this is expected by some macros! */ @@ -1313,6 +1314,7 @@ objClassInit(modInfo_t *pModInfo) CHKiRet(objGetObjInterface(&obj)); /* get ourselves ;) */ /* init classes we use (limit to as few as possible!) */ + CHKiRet(apcClassInit(pModInfo)); CHKiRet(errmsgClassInit(pModInfo)); CHKiRet(cfsyslineInit()); CHKiRet(varClassInit(pModInfo)); diff --git a/runtime/srUtils.h b/runtime/srUtils.h index 699f8527..b37559cf 100644 --- a/runtime/srUtils.h +++ b/runtime/srUtils.h @@ -125,4 +125,17 @@ rsRetVal getFileSize(uchar *pszName, off_t *pSize); d_pthread_mutex_unlock(mut); \ pthread_setcancelstate(iCancelStateSave, NULL); \ } + +/* The unconditional versions of the macro always lock the mutex. They are preferred in + * complex scenarios, where the simple ones might get mixed up by multiple calls. + */ +#define DEFVARS_mutexProtection_uncond\ + int iCancelStateSave +#define BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(mut) \ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); \ + d_pthread_mutex_lock(mut); +#define END_MTX_PROTECTED_OPERATIONS_UNCOND(mut) \ + d_pthread_mutex_unlock(mut); \ + pthread_setcancelstate(iCancelStateSave, NULL); + #endif diff --git a/runtime/stream.c b/runtime/stream.c index c8672aa2..773da319 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -39,6 +39,7 @@ #include #include /* required for HP UX */ #include +#include #include "rsyslog.h" #include "stringbuf.h" @@ -47,10 +48,12 @@ #include "stream.h" #include "unicode-helper.h" #include "module-template.h" +#include "apc.h" /* static data */ DEFobjStaticHelpers DEFobjCurrIf(zlibw) +DEFobjCurrIf(apc) /* forward definitions */ static rsRetVal strmFlush(strm_t *pThis); @@ -60,6 +63,20 @@ static rsRetVal strmCloseFile(strm_t *pThis); /* methods */ +/* async flush apc handler + */ +static void +flushApc(void *param1, void __attribute__((unused)) *param2) +{ + DEFVARS_mutexProtection_uncond; + strm_t *pThis = (strm_t*) param1; + ISOBJ_TYPE_assert(pThis, strm); + + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); + strmFlush(pThis); + END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); +} + /* Try to resolve a size limit situation. This is used to support custom-file size handlers * for omfile. It first runs the command, and then checks if we are still above the size @@ -583,6 +600,11 @@ static rsRetVal strmConstructFinalize(strm_t *pThis) } } + /* if we should call flush apc's, we need a mutex */ + if(pThis->iFlushInterval != 0) { + pthread_mutex_init(&pThis->mut, 0); + } + finalize_it: RETiRet; } @@ -602,6 +624,11 @@ CODESTARTobjDestruct(strm) objRelease(zlibw, LM_ZLIBW_FILENAME); } + if(pThis->iFlushInterval != 0) { + // TODO: check if there is an apc and remove it! + pthread_mutex_destroy(&pThis->mut); + } + free(pThis->pszDir); free(pThis->pIOBuf); free(pThis->pZipBuf); @@ -965,11 +992,33 @@ finalize_it: } +/* schedule an Apc flush request. + * rgerhards, 2009-06-15 + */ +static inline rsRetVal +scheduleFlushRequest(strm_t *pThis) +{ + apc_t *pApc; + DEFiRet; + + CHKiRet(apc.CancelApc(pThis->apcID)); +dbgprintf("XXX: requesting to add apc!\n"); + CHKiRet(apc.Construct(&pApc)); + CHKiRet(apc.SetProcedure(pApc, (void (*)(void*, void*))flushApc)); + CHKiRet(apc.SetParam1(pApc, pThis)); + CHKiRet(apc.ConstructFinalize(pApc, &pThis->apcID)); + +finalize_it: + RETiRet; +} + + /* write memory buffer to a stream object */ static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) { + DEFVARS_mutexProtection_uncond; DEFiRet; size_t iPartial; @@ -980,6 +1029,11 @@ dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n if(pThis->bDisabled) ABORT_FINALIZE(RS_RET_STREAM_DISABLED); +RUNLOG_VAR("%d", pThis->iFlushInterval); + if(pThis->iFlushInterval != 0) { + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); + } + /* check if the to-be-written data is larger than our buffer size */ if(lenBuf >= pThis->sIOBufSize) { /* it is - so we do a direct write, that is most efficient. @@ -1008,7 +1062,17 @@ dbgprintf("strmWrite(%p, '%65.65s', %ld);, disabled %d, sizelim %ld, size %lld\n } } + /* we ignore the outcome of scheduleFlushRequest(), as we will write the data always at + * termination. For Zip mode, it could be fatal if we write after each record. + */ + if(pThis->iFlushInterval != 0) + scheduleFlushRequest(pThis); + finalize_it: + if(pThis->iFlushInterval != 0) { + END_MTX_PROTECTED_OPERATIONS_UNCOND(&pThis->mut); + } + RETiRet; } @@ -1025,6 +1089,7 @@ DEFpropSetMeth(strm, iZipLevel, int) DEFpropSetMeth(strm, bSync, int) DEFpropSetMeth(strm, sIOBufSize, size_t) DEFpropSetMeth(strm, iSizeLimit, off_t) +DEFpropSetMeth(strm, iFlushInterval, int) DEFpropSetMeth(strm, pszSizeLimitCmd, uchar*) static rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) @@ -1308,6 +1373,7 @@ CODESTARTobjQueryInterface(strm) pIf->SetbSync = strmSetbSync; pIf->SetsIOBufSize = strmSetsIOBufSize; pIf->SetiSizeLimit = strmSetiSizeLimit; + pIf->SetiFlushInterval = strmSetiFlushInterval; pIf->SetpszSizeLimitCmd = strmSetpszSizeLimitCmd; finalize_it: ENDobjQueryInterface(strm) @@ -1319,6 +1385,8 @@ ENDobjQueryInterface(strm) */ BEGINObjClassInit(strm, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ + CHKiRet(objUse(apc, CORE_COMPONENT)); + OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, strmSetProperty); OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize); diff --git a/runtime/stream.h b/runtime/stream.h index 021c4792..e3ad32b1 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -70,6 +70,7 @@ #include "glbl.h" #include "stream.h" #include "zlibw.h" +#include "apc.h" /* stream types */ typedef enum { @@ -118,6 +119,10 @@ typedef struct strm_s { bool bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */ int iZipLevel; /* zip level (0..9). If 0, zip is completely disabled */ Bytef *pZipBuf; + /* support for async flush procesing */ + int iFlushInterval; /* flush in which interval - 0, no flushing */ + apc_id_t apcID; /* id of current Apc request (used for cancelling) */ + pthread_mutex_t mut;/* mutex for flush in async mode */ /* support for omfile size-limiting commands, special counters, NOT persisted! */ off_t iSizeLimit; /* file size limit, 0 = no limit */ uchar *pszSizeLimitCmd; /* command to carry out when size limit is reached */ @@ -127,7 +132,7 @@ typedef struct strm_s { /* interfaces */ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Construct)(strm_t **ppThis); - rsRetVal (*ConstructFinalize)(strm_t __attribute__((unused)) *pThis); + rsRetVal (*ConstructFinalize)(strm_t *pThis); rsRetVal (*Destruct)(strm_t **ppThis); rsRetVal (*SetMaxFileSize)(strm_t *pThis, int64 iMaxFileSize); rsRetVal (*SetFileName)(strm_t *pThis, uchar *pszName, size_t iLenName); @@ -157,9 +162,10 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */ INTERFACEpropSetMeth(strm, bSync, int); INTERFACEpropSetMeth(strm, sIOBufSize, size_t); INTERFACEpropSetMeth(strm, iSizeLimit, off_t); + INTERFACEpropSetMeth(strm, iFlushInterval, int); INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*); ENDinterface(strm) -#define strmCURR_IF_VERSION 1 /* increment whenever you change the interface structure! */ +#define strmCURR_IF_VERSION 2 /* increment whenever you change the interface structure! */ /* prototypes */ diff --git a/tools/omfile.c b/tools/omfile.c index 6d9eb096..675d313e 100644 --- a/tools/omfile.c +++ b/tools/omfile.c @@ -88,6 +88,7 @@ typedef struct s_dynaFileCacheEntry dynaFileCacheEntry; #define IOBUF_DFLT_SIZE 1024 /* default size for io buffers */ +#define FLUSH_INTRVL_DFLT 1 /* default buffer flush interval (in seconds) */ /* globals for default values */ static int iDynaFileCacheSize = 10; /* max cache for dynamic files */ @@ -103,6 +104,7 @@ static int bEnableSync = 0;/* enable syncing of files (no dash in front of pathn static int iZipLevel = 0; /* zip compression mode (0..9 as usual) */ static bool bFlushOnTXEnd = 0;/* flush write buffers when transaction has ended? */ static int iIOBufSize = IOBUF_DFLT_SIZE; /* size of an io buffer */ +static int iFlushInterval = FLUSH_INTRVL_DFLT; /* how often flush the output buffer on inactivity? */ static uchar *pszTplName = NULL; /* name of the default template to use */ /* end globals for default values */ @@ -132,6 +134,7 @@ typedef struct _instanceData { uchar *pszSizeLimitCmd; /* command to carry out when size limit is reached */ int iZipLevel; /* zip mode to use for this selector */ int iIOBufSize; /* size of associated io buffer */ + int iFlushInterval; /* how fast flush buffer on inactivity? */ bool bFlushOnTXEnd; /* flush write buffers when transaction has ended? */ } instanceData; @@ -400,6 +403,7 @@ prepareFile(instanceData *pData, uchar *newFileName) CHKiRet(strm.SetDir(pData->pStrm, szDirName, ustrlen(szDirName))); CHKiRet(strm.SetiZipLevel(pData->pStrm, pData->iZipLevel)); CHKiRet(strm.SetsIOBufSize(pData->pStrm, (size_t) pData->iIOBufSize)); + CHKiRet(strm.SetiFlushInterval(pData->pStrm, pData->iFlushInterval)); CHKiRet(strm.SettOperationsMode(pData->pStrm, STREAMMODE_WRITE_APPEND)); CHKiRet(strm.SettOpenMode(pData->pStrm, fCreateMode)); CHKiRet(strm.SetbSync(pData->pStrm, pData->bSyncFile)); @@ -678,6 +682,7 @@ CODESTARTparseSelectorAct pData->iZipLevel = iZipLevel; pData->bFlushOnTXEnd = bFlushOnTXEnd; pData->iIOBufSize = iIOBufSize; + pData->iFlushInterval = iFlushInterval; if(pData->bDynamicName == 0) { /* try open and emit error message if not possible. At this stage, we ignore the @@ -712,6 +717,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a iZipLevel = 0; bFlushOnTXEnd = 0; iIOBufSize = IOBUF_DFLT_SIZE; + iFlushInterval = FLUSH_INTRVL_DFLT; if(pszTplName != NULL) { free(pszTplName); pszTplName = NULL; @@ -760,6 +766,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(strm, CORE_COMPONENT)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"dynafilecachesize", 0, eCmdHdlrInt, (void*) setDynaFileCacheSize, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"omfileziplevel", 0, eCmdHdlrInt, NULL, &iZipLevel, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"omfileflushinterval", 0, eCmdHdlrInt, NULL, &iFlushInterval, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"omfileflushontxend", 0, eCmdHdlrBinary, NULL, &bFlushOnTXEnd, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"omfileiobuffersize", 0, eCmdHdlrSize, NULL, &iIOBufSize, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"dirowner", 0, eCmdHdlrUID, NULL, &dirUID, STD_LOADABLE_MODULE_ID)); diff --git a/tools/syslogd.c b/tools/syslogd.c index 99bf281d..2b29cd7c 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -2575,7 +2575,8 @@ mainloop(void) * powertop, for example). In that case, we primarily wait for a signal, * but a once-a-day wakeup should be quite acceptable. -- rgerhards, 2008-06-09 */ - tvSelectTimeout.tv_sec = (bReduceRepeatMsgs == 1) ? TIMERINTVL : 86400 /*1 day*/; + //tvSelectTimeout.tv_sec = (bReduceRepeatMsgs == 1) ? TIMERINTVL : 86400 /*1 day*/; +tvSelectTimeout.tv_sec = 5; // TESTING ONLY!!! TODO: change back!!! tvSelectTimeout.tv_usec = 0; select(1, NULL, NULL, NULL, &tvSelectTimeout); if(bFinished) @@ -2610,49 +2611,11 @@ mainloop(void) bHadHUP = 0; continue; } + execScheduled(); /* handle Apc calls (if any) */ } ENDfunc } -/* If user is not root, prints warnings or even exits - * TODO: check all dynafiles for write permission - * ... but it is probably better to wait here until we have - * a module interface - rgerhards, 2007-07-23 - */ -static void checkPermissions() -{ -#if 0 - /* TODO: this function must either be redone or removed - now with the input modules, - * there is no such simple check we can do. What we can check, however, is if there is - * any input module active and terminate, if not. -- rgerhards, 2007-12-26 - */ - /* we are not root */ - if (geteuid() != 0) - { - fputs("WARNING: Local messages will not be logged! If you want to log them, run rsyslog as root.\n",stderr); -#ifdef SYSLOG_INET - /* udp enabled and port number less than or equal to 1024 */ - if ( AcceptRemote && (atoi(LogPort) <= 1024) ) - fprintf(stderr, "WARNING: Will not listen on UDP port %s. Use port number higher than 1024 or run rsyslog as root!\n", LogPort); - - /* tcp enabled and port number less or equal to 1024 */ - if( bEnableTCP && (atoi(TCPLstnPort) <= 1024) ) - fprintf(stderr, "WARNING: Will not listen on TCP port %s. Use port number higher than 1024 or run rsyslog as root!\n", TCPLstnPort); - - /* Neither explicit high UDP port nor explicit high TCP port. - * It is useless to run anymore */ - if( !(AcceptRemote && (atoi(LogPort) > 1024)) && !( bEnableTCP && (atoi(TCPLstnPort) > 1024)) ) - { -#endif - fprintf(stderr, "ERROR: Nothing to log, no reason to run. Please run rsyslog as root.\n"); - exit(EXIT_FAILURE); -#ifdef SYSLOG_INET - } -#endif - } -#endif -} - /* load build-in modules * very first version begun on 2007-07-23 by rgerhards @@ -3053,7 +3016,6 @@ doGlblProcessInit(void) int i; DEFiRet; - checkPermissions(); thrdInit(); if( !(Debug || NoFork) ) -- cgit