From 2d29b5270c77a902be406c2591a64a509b2d842a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 2 Aug 2007 15:43:55 +0000 Subject: got a working version of suspension/resumption logic including omfwd.c --- omfwd.c | 135 +++++++++++++++++++++++++++----------------------------------- syslogd.c | 100 +++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 153 insertions(+), 82 deletions(-) diff --git a/omfwd.c b/omfwd.c index ce29e874..5dee002c 100644 --- a/omfwd.c +++ b/omfwd.c @@ -57,7 +57,8 @@ #include "module-template.h" #ifdef SYSLOG_INET -#define INET_SUSPEND_TIME 60 /* equal to 1 minute +//#define INET_SUSPEND_TIME 60 /* equal to 1 minute +#define INET_SUSPEND_TIME 2 /* equal to 1 minute * rgerhards, 2005-07-26: This was 3 minutes. As the * same timer is used for tcp based syslog, we have * reduced it. However, it might actually be worth @@ -542,83 +543,76 @@ static char *getFwdSyslogPt(instanceData *pData) } +/* try to resume connection if it is not ready + * rgerhards, 2007-08-02 + */ +static rsRetVal doTryResume(instanceData *pData) +{ + DEFiRet; + time_t fwd_suspend; + struct addrinfo *res; + struct addrinfo hints; + unsigned e; + + switch (pData->eDestState) { + case eDestFORW_SUSP: + iRet = RS_RET_OK; /* the actual check happens during doAction() only */ + pData->eDestState = eDestFORW; + break; + + case eDestFORW_UNKN: + /* The remote address is not yet known and needs to be obtained */ + dprintf(" %s\n", pData->f_hname); + memset(&hints, 0, sizeof(hints)); + /* port must be numeric, because config file syntax requests this */ + /* TODO: this code is a duplicate from cfline() - we should later create + * a common function. + */ + hints.ai_flags = AI_NUMERICSERV; + hints.ai_family = family; + hints.ai_socktype = pData->protocol == FORW_UDP ? SOCK_DGRAM : SOCK_STREAM; + if((e = getaddrinfo(pData->f_hname, + getFwdSyslogPt(pData), &hints, &res)) == 0) { + dprintf("%s found, resuming.\n", pData->f_hname); + pData->f_addr = res; + pData->iRtryCnt = 0; + pData->eDestState = eDestFORW; + } else { + iRet = RS_RET_SUSPENDED; + } + break; + } + + return iRet; +} + + BEGINtryResume CODESTARTtryResume -dprintf("###################### tryResume called\n"); + iRet = doTryResume(pData); +dprintf("tryResume returns %d\n", iRet); ENDtryResume BEGINdoAction char *psz; /* temporary buffering */ register unsigned l; + struct addrinfo *r; int i; - unsigned e, lsent = 0; + unsigned lsent = 0; int bSendSuccess; - time_t fwd_suspend; - struct addrinfo *res, *r; - struct addrinfo hints; CODESTARTdoAction switch (pData->eDestState) { case eDestFORW_SUSP: - fwd_suspend = time(NULL) - pData->ttSuspend; - if ( fwd_suspend >= INET_SUSPEND_TIME ) { - dprintf("\nForwarding suspension over, retrying FORW "); - pData->eDestState = eDestFORW; - goto f_forw; - } - else { - dprintf(" %s\n", pData->f_hname); - dprintf("Forwarding suspension not over, time left: %d.\n", - INET_SUSPEND_TIME - fwd_suspend); - } + dprintf("internal error in omfwd.c, eDestFORW_SUSP in doAction()!\n"); + iRet = RS_RET_SUSPENDED; break; - /* The trick is to wait some time, then retry to get the - * address. If that fails retry x times and then give up. - * - * You'll run into this problem mostly if the name server you - * need for resolving the address is on the same machine, but - * is started after syslogd. - */ case eDestFORW_UNKN: - /* The remote address is not yet known and needs to be obtained */ - dprintf(" %s\n", pData->f_hname); - fwd_suspend = time(NULL) - pData->ttSuspend; - if(fwd_suspend >= INET_SUSPEND_TIME) { - dprintf("Forwarding suspension to unknown over, retrying\n"); - memset(&hints, 0, sizeof(hints)); - /* port must be numeric, because config file syntax requests this */ - /* TODO: this code is a duplicate from cfline() - we should later create - * a common function. - */ - hints.ai_flags = AI_NUMERICSERV; - hints.ai_family = family; - hints.ai_socktype = pData->protocol == FORW_UDP ? SOCK_DGRAM : SOCK_STREAM; - if((e = getaddrinfo(pData->f_hname, - getFwdSyslogPt(pData), &hints, &res)) != 0) { - dprintf("Failure: %s\n", sys_h_errlist[h_errno]); - dprintf("Retries: %d\n", pData->iRtryCnt); - if ( --pData->iRtryCnt < 0 ) { - dprintf("Giving up.\n"); - iRet = RS_RET_DISABLE_ACTION; - } - else - dprintf("Left retries: %d\n", pData->iRtryCnt); - } - else { - dprintf("%s found, resuming.\n", pData->f_hname); - pData->f_addr = res; - pData->iRtryCnt = 0; - pData->eDestState = eDestFORW; - goto f_forw; - } - } - else - dprintf("Forwarding suspension not over, time " \ - "left: %d\n", INET_SUSPEND_TIME - fwd_suspend); + dprintf("doAction eDestFORW_UNKN\n"); + iRet = doTryResume(pData); break; case eDestFORW: - f_forw: dprintf(" %s:%s/%s\n", pData->f_hname, getFwdSyslogPt(pData), pData->protocol == FORW_UDP ? "udp" : "tcp"); if ( 0) // TODO: think about this strcmp(getHOSTNAME(f->f_pMsg), LocalHostName) && NoHops ) @@ -705,33 +699,22 @@ CODESTARTdoAction } /* finished looping */ if (bSendSuccess == FALSE) { - pData->eDestState = eDestFORW_SUSP; - errno = 0; - logerror("error forwarding via udp, suspending"); + dprintf("error forwarding via udp, suspending\n"); + iRet = RS_RET_SUSPENDED; } } } else { /* forward via TCP */ if(TCPSend(pData, psz, l) != 0) { /* error! */ - pData->eDestState = eDestFORW_SUSP; - errno = 0; - logerror("error forwarding via tcp, suspending..."); + dprintf("error forwarding via tcp, suspending\n"); + iRet = RS_RET_SUSPENDED; } } } break; } - - if(pData->eDestState != eDestFORW) { - /* TODO: think somewhat more about this code at the end of modularization. I think - * it is clean right now, but we could build a better interface for suspension. I - * think we will naturally re-visit this when we implement global suspension and - * queueing - I anticipate that the whole FORW_SUSP/FORW_UNKN goes away by then. - * rgerhards, 2007-07-26 - */ - iRet = RS_RET_SUSPENDED; - } +dprintf("doAction returns %d\n", iRet); ENDdoAction diff --git a/syslogd.c b/syslogd.c index c3d4fbd8..b495f385 100644 --- a/syslogd.c +++ b/syslogd.c @@ -558,6 +558,8 @@ struct action_s { time_t f_time; /* time this was last written */ short bEnabled; /* is the related action enabled (1) or disabled (0)? */ short bSuspended; /* is the related action temporarily suspended? */ + time_t ttResumeRtry; /* when is it time to retry the resume? */ + int iNbrResRtry; /* number of retries since last suspend */ struct moduleInfo *pMod;/* pointer to output module handling this selector */ void *pModData; /* pointer to module data - contents is module-specific */ int f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */ @@ -1889,6 +1891,90 @@ finalize_it: } +/* set an action back to active state -- rgerhards, 2007-08-02 + */ +static rsRetVal actionResume(action_t *pThis) +{ + DEFiRet; + +dprintf("actionResume\n"); + assert(pThis != NULL); + pThis->bSuspended = 0; + + return iRet; +} + + +#define ACTION_RESUME_INTERVAL 5 /* TODO: make this dynamic from conf file */ +/* suspend an action -- rgerhards, 2007-08-02 + */ +static rsRetVal actionSuspend(action_t *pThis) +{ + DEFiRet; + +dprintf("actionSuspend\n"); + assert(pThis != NULL); + pThis->bSuspended = 1; + pThis->ttResumeRtry = time(NULL) + ACTION_RESUME_INTERVAL; + pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */ + + return iRet; +} + +#if 1 +#define actionIsSuspended(pThis) ((pThis)->bSuspended == 1) +#else +static int actionIsSuspended(action_t *pThis) +{ + int i; + i = pThis->bSuspended == 1; + dprintf("in IsSuspend(), returns %d\n", i); + return i; +} +#endif + +/* try to resume an action -- rgerhards, 2007-08-02 + * returns RS_RET_OK if resumption worked, RS_RET_SUSPEND if the + * action is still suspended. + */ +static rsRetVal actionTryResume(action_t *pThis) +{ + DEFiRet; + time_t ttNow; + + assert(pThis != NULL); + + ttNow = time(NULL); /* do the system call just once */ + + /* first check if it is time for a re-try */ + if(ttNow > pThis->ttResumeRtry) { + iRet = pThis->pMod->tryResume(pThis->pModData); + if(iRet == RS_RET_SUSPENDED) { + /* set new tryResume time */ + ++pThis->iNbrResRtry; + /* if we have more than 10 retries, we prolong the + * retry interval. If something is really stalled, it will + * get re-tried only very, very seldom - but that saves + * CPU time. TODO: maybe a config option for that? + * rgerhards, 2007-08-02 + */ + pThis->ttResumeRtry = ttNow + ACTION_RESUME_INTERVAL * (pThis->iNbrResRtry / 10 + 1); + } + } else { + /* it's too early, we are still suspended --> indicate this */ + iRet = RS_RET_SUSPENDED; + } + + if(iRet == RS_RET_OK) + actionResume(pThis); + + dprintf("actionTryResume: iRet: %d, next retry (if applicable): %u [now %u]\n", + iRet, pThis->ttResumeRtry, (unsigned) ttNow); + + return iRet; +} + + /* debug-print the contents of an action object * rgerhards, 2007-08-02 */ @@ -2486,11 +2572,11 @@ static rsRetVal callAction(msg_t *pMsg, action_t *pAction) ABORT_FINALIZE(RS_RET_OK); } - if(pAction->bSuspended == 1) { - CHKiRet(pAction->pMod->tryResume(pAction->pModData)); - /* if we reach this point, we have resumed */ - pAction->bSuspended = 0; +dprintf("callAction, vor IsSusp()\n"); + if(actionIsSuspended(pAction)) { + CHKiRet(actionTryResume(pAction)); } +dprintf("callAction, after IsSusp()\n"); /* don't output marks to recently written files */ if ((pMsg->msgFlags & MARK) && (now - pAction->f_time) < MarkInterval / 2) { @@ -3354,7 +3440,7 @@ rsRetVal fprintlog(action_t *pAction) if(iRet == RS_RET_SUSPENDED) { dprintf("Action requested to be suspended, done that.\n"); - pAction->bSuspended = 1; /* message process, so we start a new cycle */ + actionSuspend(pAction); } if(iRet == RS_RET_OK) @@ -4900,9 +4986,11 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr dprintf("module is incompatible with RepeatedMsgReduction - turned off\n"); pAction->f_ReduceRepeated = 0; } - pAction->bSuspended = bSuspended; pAction->bEnabled = 1; /* action is enabled */ + if(bSuspended) + actionSuspend(pAction); + *ppAction = pAction; /* finally store the action pointer */ finalize_it: -- cgit