diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2007-07-26 12:07:23 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2007-07-26 12:07:23 +0000 |
commit | 54669873b0469aa69a4c9f88bcf88470218082f8 (patch) | |
tree | 1042b7096d3332dc92abbb8c4ee9fdc0af1f7e55 /omfwd.c | |
parent | 1904ce3f5aa11f190c881ccda650c1f464fe9271 (diff) | |
download | rsyslog-54669873b0469aa69a4c9f88bcf88470218082f8.tar.gz rsyslog-54669873b0469aa69a4c9f88bcf88470218082f8.tar.xz rsyslog-54669873b0469aa69a4c9f88bcf88470218082f8.zip |
- implemented needUDPSocket() interface
- replaced (mis) use of f_prevcount in omfwd.c -> now data element in
instance data is used for retry counting
- removed f->f_type from syslogd.c, omfwd.c
- removed f->f_file from omfwd.c, omfile.c
- f->f_flags is gone away
Diffstat (limited to 'omfwd.c')
-rw-r--r-- | omfwd.c | 82 |
1 files changed, 52 insertions, 30 deletions
@@ -71,9 +71,15 @@ static const char *sys_h_errlist[] = { /* internal structures */ - typedef struct _instanceData { char f_hname[MAXHOSTNAMELEN+1]; + short sock; /* file descriptor */ + enum { /* TODO: we shoud revisit these definitions */ + eDestFORW, + eDestFORW_SUSP, + eDestFORW_UNKN + } eDestState; + int iRtryCnt; struct addrinfo *f_addr; int compressionLevel; /* 0 - no compression, else level for zlib */ char *port; @@ -110,12 +116,12 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance - switch (f->f_type) { - case F_FORW: - case F_FORW_SUSP: + switch (pData->eDestState) { + case eDestFORW: + case eDestFORW_SUSP: freeaddrinfo(pData->f_addr); /* fall through */ - case F_FORW_UNKN: + case eDestFORW_UNKN: if(pData->port != NULL) free(pData->port); break; @@ -296,9 +302,9 @@ static int TCPSend(selector_t *f, instanceData *pData, char *msg, size_t len) framingToUse = bIsCompressed ? TCP_FRAMING_OCTET_COUNTING : pData->tcp_framing; do { /* try to send message */ - if(f->f_file <= 0) { + if(pData->sock <= 0) { /* we need to open the socket first */ - if((f->f_file = TCPSendCreateSocket(pData, pData->f_addr)) <= 0) { + if((pData->sock = TCPSendCreateSocket(pData, pData->f_addr)) <= 0) { return -1; } } @@ -432,7 +438,7 @@ static int TCPSend(selector_t *f, instanceData *pData, char *msg, size_t len) /* frame building complete, on to actual sending */ - lenSend = send(f->f_file, msg, len, 0); + lenSend = send(pData->sock, msg, len, 0); dprintf("TCP sent %d bytes, requested %d, msg: '%s'\n", lenSend, len, bIsCompressed ? "***compressed***" : msg); if((unsigned)lenSend == len) { @@ -485,9 +491,9 @@ static int TCPSend(selector_t *f, instanceData *pData, char *msg, size_t len) if(retry == 0) { ++retry; /* try to recover */ - close(f->f_file); + close(pData->sock); TCPSendSetStatus(pData, TCP_SEND_NOTCONNECTED); - f->f_file = -1; + pData->sock = -1; } else { if(buf != NULL) free(buf); @@ -527,12 +533,12 @@ BEGINdoAction struct addrinfo *res, *r; struct addrinfo hints; CODESTARTdoAction - switch (f->f_type) { - case F_FORW_SUSP: + switch (pData->eDestState) { + case eDestFORW_SUSP: fwd_suspend = time(NULL) - pData->ttSuspend; if ( fwd_suspend >= INET_SUSPEND_TIME ) { dprintf("\nForwarding suspension over, retrying FORW "); - f->f_type = F_FORW; + pData->eDestState = eDestFORW; goto f_forw; } else { @@ -549,7 +555,7 @@ CODESTARTdoAction * need for resolving the address is on the same machine, but * is started after syslogd. */ - case F_FORW_UNKN: + case eDestFORW_UNKN: /* The remote address is not yet known and needs to be obtained */ dprintf(" %s\n", pData->f_hname); fwd_suspend = time(NULL) - pData->ttSuspend; @@ -566,19 +572,19 @@ CODESTARTdoAction if((e = getaddrinfo(pData->f_hname, getFwdSyslogPt(pData), &hints, &res)) != 0) { dprintf("Failure: %s\n", sys_h_errlist[h_errno]); - dprintf("Retries: %d\n", f->f_prevcount); - if ( --f->f_prevcount < 0 ) { + dprintf("Retries: %d\n", pData->iRtryCnt); + if ( --pData->iRtryCnt < 0 ) { dprintf("Giving up.\n"); iRet = RS_RET_DISABLE_ACTION; } else - dprintf("Left retries: %d\n", f->f_prevcount); + dprintf("Left retries: %d\n", pData->iRtryCnt); } else { dprintf("%s found, resuming.\n", pData->f_hname); pData->f_addr = res; - f->f_prevcount = 0; - f->f_type = F_FORW; + pData->iRtryCnt = 0; + pData->eDestState = eDestFORW; goto f_forw; } } @@ -587,7 +593,7 @@ CODESTARTdoAction "left: %d\n", INET_SUSPEND_TIME - fwd_suspend); break; - case F_FORW: + case eDestFORW: f_forw: dprintf(" %s:%s/%s\n", pData->f_hname, getFwdSyslogPt(pData), pData->protocol == FORW_UDP ? "udp" : "tcp"); @@ -668,7 +674,7 @@ CODESTARTdoAction } /* finished looping */ if (bSendSuccess == FALSE) { - f->f_type = F_FORW_SUSP; + pData->eDestState = eDestFORW_SUSP; errno = 0; logerror("error forwarding via udp, suspending"); } @@ -677,7 +683,7 @@ CODESTARTdoAction /* forward via TCP */ if(TCPSend(f, pData, psz, l) != 0) { /* error! */ - f->f_type = F_FORW_SUSP; + pData->eDestState = eDestFORW_SUSP; errno = 0; logerror("error forwarding via tcp, suspending..."); } @@ -685,6 +691,16 @@ CODESTARTdoAction } break; } + + if(pData->eDestState != eDestFORW) { + /* TODO: think somewhat more about this code at the end of modularization. I think + * it is clean right now, but we could build a better interface for suspension. I + * think we will naturally re-visit this when we implement global suspension and + * queueing - I anticipate that the whole FORW_SUSP/FORW_UNKN goes away by then. + * rgerhards, 2007-07-26 + */ + iRet = RS_RET_SUSPENDED; + } ENDdoAction @@ -825,7 +841,7 @@ CODESTARTparseSelectorAct strcpy(szTemplateName, " StdFwdFmt"); } - /* first set the f->f_type */ + /* first set the pData->eDestState */ strcpy(pData->f_hname, (char*) q); memset(&hints, 0, sizeof(hints)); /* port must be numeric, because config file syntax requests this */ @@ -833,11 +849,11 @@ CODESTARTparseSelectorAct hints.ai_family = family; hints.ai_socktype = pData->protocol == FORW_UDP ? SOCK_DGRAM : SOCK_STREAM; if( (error = getaddrinfo(pData->f_hname, getFwdSyslogPt(pData), &hints, &res)) != 0) { - f->f_type = F_FORW_UNKN; - f->f_prevcount = INET_RETRY_MAX; + pData->eDestState = eDestFORW_UNKN; + pData->iRtryCnt = INET_RETRY_MAX; pData->ttSuspend = time(NULL); } else { - f->f_type = F_FORW; + pData->eDestState = eDestFORW; pData->f_addr = res; } @@ -863,16 +879,22 @@ CODESTARTparseSelectorAct ENDparseSelectorAct +BEGINneedUDPSocket +CODESTARTneedUDPSocket + iRet = RS_RET_TRUE; +ENDneedUDPSocket + + BEGINonSelectReadyWrite CODESTARTonSelectReadyWrite - dprintf("tcp send socket %d ready for writing.\n", f->f_file); + dprintf("tcp send socket %d ready for writing.\n", pData->sock); TCPSendSetStatus(pData, TCP_SEND_READY); /* Send stored message (if any) */ if(pData->savedMsg != NULL) { if(TCPSend(f, pData, pData->savedMsg, pData->savedMsgLen) != 0) { /* error! */ - f->f_type = F_FORW_SUSP; + pData->eDestState = eDestFORW_SUSP; errno = 0; logerror("error forwarding via tcp, suspending..."); } @@ -884,10 +906,10 @@ ENDonSelectReadyWrite BEGINgetWriteFDForSelect CODESTARTgetWriteFDForSelect - if( (f->f_type == F_FORW) + if( (pData->eDestState == eDestFORW) && (pData->protocol == FORW_TCP) && TCPSendGetStatus(pData) == TCP_SEND_CONNECTING) { - *fd = f->f_file; + *fd = pData->sock; iRet = RS_RET_OK; } ENDgetWriteFDForSelect |