summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2007-08-02 15:43:55 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2007-08-02 15:43:55 +0000
commit2d29b5270c77a902be406c2591a64a509b2d842a (patch)
tree2229a6443c83a03771fd281f2871056bab27812f
parent16aef4a3f37c30e6a068a8a07f84adb0540fb4c2 (diff)
downloadrsyslog-2d29b5270c77a902be406c2591a64a509b2d842a.tar.gz
rsyslog-2d29b5270c77a902be406c2591a64a509b2d842a.tar.xz
rsyslog-2d29b5270c77a902be406c2591a64a509b2d842a.zip
got a working version of suspension/resumption logic including omfwd.c
-rw-r--r--omfwd.c135
-rw-r--r--syslogd.c100
2 files changed, 153 insertions, 82 deletions
diff --git a/omfwd.c b/omfwd.c
index ce29e874..5dee002c 100644
--- a/omfwd.c
+++ b/omfwd.c
@@ -57,7 +57,8 @@
#include "module-template.h"
#ifdef SYSLOG_INET
-#define INET_SUSPEND_TIME 60 /* equal to 1 minute
+//#define INET_SUSPEND_TIME 60 /* equal to 1 minute
+#define INET_SUSPEND_TIME 2 /* equal to 1 minute
* rgerhards, 2005-07-26: This was 3 minutes. As the
* same timer is used for tcp based syslog, we have
* reduced it. However, it might actually be worth
@@ -542,83 +543,76 @@ static char *getFwdSyslogPt(instanceData *pData)
}
+/* try to resume connection if it is not ready
+ * rgerhards, 2007-08-02
+ */
+static rsRetVal doTryResume(instanceData *pData)
+{
+ DEFiRet;
+ time_t fwd_suspend;
+ struct addrinfo *res;
+ struct addrinfo hints;
+ unsigned e;
+
+ switch (pData->eDestState) {
+ case eDestFORW_SUSP:
+ iRet = RS_RET_OK; /* the actual check happens during doAction() only */
+ pData->eDestState = eDestFORW;
+ break;
+
+ case eDestFORW_UNKN:
+ /* The remote address is not yet known and needs to be obtained */
+ dprintf(" %s\n", pData->f_hname);
+ memset(&hints, 0, sizeof(hints));
+ /* port must be numeric, because config file syntax requests this */
+ /* TODO: this code is a duplicate from cfline() - we should later create
+ * a common function.
+ */
+ hints.ai_flags = AI_NUMERICSERV;
+ hints.ai_family = family;
+ hints.ai_socktype = pData->protocol == FORW_UDP ? SOCK_DGRAM : SOCK_STREAM;
+ if((e = getaddrinfo(pData->f_hname,
+ getFwdSyslogPt(pData), &hints, &res)) == 0) {
+ dprintf("%s found, resuming.\n", pData->f_hname);
+ pData->f_addr = res;
+ pData->iRtryCnt = 0;
+ pData->eDestState = eDestFORW;
+ } else {
+ iRet = RS_RET_SUSPENDED;
+ }
+ break;
+ }
+
+ return iRet;
+}
+
+
BEGINtryResume
CODESTARTtryResume
-dprintf("###################### tryResume called\n");
+ iRet = doTryResume(pData);
+dprintf("tryResume returns %d\n", iRet);
ENDtryResume
BEGINdoAction
char *psz; /* temporary buffering */
register unsigned l;
+ struct addrinfo *r;
int i;
- unsigned e, lsent = 0;
+ unsigned lsent = 0;
int bSendSuccess;
- time_t fwd_suspend;
- struct addrinfo *res, *r;
- struct addrinfo hints;
CODESTARTdoAction
switch (pData->eDestState) {
case eDestFORW_SUSP:
- fwd_suspend = time(NULL) - pData->ttSuspend;
- if ( fwd_suspend >= INET_SUSPEND_TIME ) {
- dprintf("\nForwarding suspension over, retrying FORW ");
- pData->eDestState = eDestFORW;
- goto f_forw;
- }
- else {
- dprintf(" %s\n", pData->f_hname);
- dprintf("Forwarding suspension not over, time left: %d.\n",
- INET_SUSPEND_TIME - fwd_suspend);
- }
+ dprintf("internal error in omfwd.c, eDestFORW_SUSP in doAction()!\n");
+ iRet = RS_RET_SUSPENDED;
break;
- /* The trick is to wait some time, then retry to get the
- * address. If that fails retry x times and then give up.
- *
- * You'll run into this problem mostly if the name server you
- * need for resolving the address is on the same machine, but
- * is started after syslogd.
- */
case eDestFORW_UNKN:
- /* The remote address is not yet known and needs to be obtained */
- dprintf(" %s\n", pData->f_hname);
- fwd_suspend = time(NULL) - pData->ttSuspend;
- if(fwd_suspend >= INET_SUSPEND_TIME) {
- dprintf("Forwarding suspension to unknown over, retrying\n");
- memset(&hints, 0, sizeof(hints));
- /* port must be numeric, because config file syntax requests this */
- /* TODO: this code is a duplicate from cfline() - we should later create
- * a common function.
- */
- hints.ai_flags = AI_NUMERICSERV;
- hints.ai_family = family;
- hints.ai_socktype = pData->protocol == FORW_UDP ? SOCK_DGRAM : SOCK_STREAM;
- if((e = getaddrinfo(pData->f_hname,
- getFwdSyslogPt(pData), &hints, &res)) != 0) {
- dprintf("Failure: %s\n", sys_h_errlist[h_errno]);
- dprintf("Retries: %d\n", pData->iRtryCnt);
- if ( --pData->iRtryCnt < 0 ) {
- dprintf("Giving up.\n");
- iRet = RS_RET_DISABLE_ACTION;
- }
- else
- dprintf("Left retries: %d\n", pData->iRtryCnt);
- }
- else {
- dprintf("%s found, resuming.\n", pData->f_hname);
- pData->f_addr = res;
- pData->iRtryCnt = 0;
- pData->eDestState = eDestFORW;
- goto f_forw;
- }
- }
- else
- dprintf("Forwarding suspension not over, time " \
- "left: %d\n", INET_SUSPEND_TIME - fwd_suspend);
+ dprintf("doAction eDestFORW_UNKN\n");
+ iRet = doTryResume(pData);
break;
case eDestFORW:
- f_forw:
dprintf(" %s:%s/%s\n", pData->f_hname, getFwdSyslogPt(pData),
pData->protocol == FORW_UDP ? "udp" : "tcp");
if ( 0) // TODO: think about this strcmp(getHOSTNAME(f->f_pMsg), LocalHostName) && NoHops )
@@ -705,33 +699,22 @@ CODESTARTdoAction
}
/* finished looping */
if (bSendSuccess == FALSE) {
- pData->eDestState = eDestFORW_SUSP;
- errno = 0;
- logerror("error forwarding via udp, suspending");
+ dprintf("error forwarding via udp, suspending\n");
+ iRet = RS_RET_SUSPENDED;
}
}
} else {
/* forward via TCP */
if(TCPSend(pData, psz, l) != 0) {
/* error! */
- pData->eDestState = eDestFORW_SUSP;
- errno = 0;
- logerror("error forwarding via tcp, suspending...");
+ dprintf("error forwarding via tcp, suspending\n");
+ iRet = RS_RET_SUSPENDED;
}
}
}
break;
}
-
- if(pData->eDestState != eDestFORW) {
- /* TODO: think somewhat more about this code at the end of modularization. I think
- * it is clean right now, but we could build a better interface for suspension. I
- * think we will naturally re-visit this when we implement global suspension and
- * queueing - I anticipate that the whole FORW_SUSP/FORW_UNKN goes away by then.
- * rgerhards, 2007-07-26
- */
- iRet = RS_RET_SUSPENDED;
- }
+dprintf("doAction returns %d\n", iRet);
ENDdoAction
diff --git a/syslogd.c b/syslogd.c
index c3d4fbd8..b495f385 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -558,6 +558,8 @@ struct action_s {
time_t f_time; /* time this was last written */
short bEnabled; /* is the related action enabled (1) or disabled (0)? */
short bSuspended; /* is the related action temporarily suspended? */
+ time_t ttResumeRtry; /* when is it time to retry the resume? */
+ int iNbrResRtry; /* number of retries since last suspend */
struct moduleInfo *pMod;/* pointer to output module handling this selector */
void *pModData; /* pointer to module data - contents is module-specific */
int f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */
@@ -1889,6 +1891,90 @@ finalize_it:
}
+/* set an action back to active state -- rgerhards, 2007-08-02
+ */
+static rsRetVal actionResume(action_t *pThis)
+{
+ DEFiRet;
+
+dprintf("actionResume\n");
+ assert(pThis != NULL);
+ pThis->bSuspended = 0;
+
+ return iRet;
+}
+
+
+#define ACTION_RESUME_INTERVAL 5 /* TODO: make this dynamic from conf file */
+/* suspend an action -- rgerhards, 2007-08-02
+ */
+static rsRetVal actionSuspend(action_t *pThis)
+{
+ DEFiRet;
+
+dprintf("actionSuspend\n");
+ assert(pThis != NULL);
+ pThis->bSuspended = 1;
+ pThis->ttResumeRtry = time(NULL) + ACTION_RESUME_INTERVAL;
+ pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */
+
+ return iRet;
+}
+
+#if 1
+#define actionIsSuspended(pThis) ((pThis)->bSuspended == 1)
+#else
+static int actionIsSuspended(action_t *pThis)
+{
+ int i;
+ i = pThis->bSuspended == 1;
+ dprintf("in IsSuspend(), returns %d\n", i);
+ return i;
+}
+#endif
+
+/* try to resume an action -- rgerhards, 2007-08-02
+ * returns RS_RET_OK if resumption worked, RS_RET_SUSPEND if the
+ * action is still suspended.
+ */
+static rsRetVal actionTryResume(action_t *pThis)
+{
+ DEFiRet;
+ time_t ttNow;
+
+ assert(pThis != NULL);
+
+ ttNow = time(NULL); /* do the system call just once */
+
+ /* first check if it is time for a re-try */
+ if(ttNow > pThis->ttResumeRtry) {
+ iRet = pThis->pMod->tryResume(pThis->pModData);
+ if(iRet == RS_RET_SUSPENDED) {
+ /* set new tryResume time */
+ ++pThis->iNbrResRtry;
+ /* if we have more than 10 retries, we prolong the
+ * retry interval. If something is really stalled, it will
+ * get re-tried only very, very seldom - but that saves
+ * CPU time. TODO: maybe a config option for that?
+ * rgerhards, 2007-08-02
+ */
+ pThis->ttResumeRtry = ttNow + ACTION_RESUME_INTERVAL * (pThis->iNbrResRtry / 10 + 1);
+ }
+ } else {
+ /* it's too early, we are still suspended --> indicate this */
+ iRet = RS_RET_SUSPENDED;
+ }
+
+ if(iRet == RS_RET_OK)
+ actionResume(pThis);
+
+ dprintf("actionTryResume: iRet: %d, next retry (if applicable): %u [now %u]\n",
+ iRet, pThis->ttResumeRtry, (unsigned) ttNow);
+
+ return iRet;
+}
+
+
/* debug-print the contents of an action object
* rgerhards, 2007-08-02
*/
@@ -2486,11 +2572,11 @@ static rsRetVal callAction(msg_t *pMsg, action_t *pAction)
ABORT_FINALIZE(RS_RET_OK);
}
- if(pAction->bSuspended == 1) {
- CHKiRet(pAction->pMod->tryResume(pAction->pModData));
- /* if we reach this point, we have resumed */
- pAction->bSuspended = 0;
+dprintf("callAction, vor IsSusp()\n");
+ if(actionIsSuspended(pAction)) {
+ CHKiRet(actionTryResume(pAction));
}
+dprintf("callAction, after IsSusp()\n");
/* don't output marks to recently written files */
if ((pMsg->msgFlags & MARK) && (now - pAction->f_time) < MarkInterval / 2) {
@@ -3354,7 +3440,7 @@ rsRetVal fprintlog(action_t *pAction)
if(iRet == RS_RET_SUSPENDED) {
dprintf("Action requested to be suspended, done that.\n");
- pAction->bSuspended = 1; /* message process, so we start a new cycle */
+ actionSuspend(pAction);
}
if(iRet == RS_RET_OK)
@@ -4900,9 +4986,11 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr
dprintf("module is incompatible with RepeatedMsgReduction - turned off\n");
pAction->f_ReduceRepeated = 0;
}
- pAction->bSuspended = bSuspended;
pAction->bEnabled = 1; /* action is enabled */
+ if(bSuspended)
+ actionSuspend(pAction);
+
*ppAction = pAction; /* finally store the action pointer */
finalize_it: