diff options
Diffstat (limited to 'tools/omfwd.c')
-rw-r--r-- | tools/omfwd.c | 202 |
1 files changed, 133 insertions, 69 deletions
diff --git a/tools/omfwd.c b/tools/omfwd.c index 10cce0e2..c39d1d67 100644 --- a/tools/omfwd.c +++ b/tools/omfwd.c @@ -97,18 +97,36 @@ typedef struct _instanceData { # define FORW_TCP 1 /* following fields for TCP-based delivery */ tcpclt_t *pTCPClt; /* our tcpclt object */ + uchar sndBuf[16*1024]; /* this is intensionally fixed -- see no good reason to make configurable */ + unsigned offsSndBuf; /* next free spot in send buffer */ } instanceData; /* config data */ -static uchar *pszTplName = NULL; /* name of the default template to use */ -static uchar *pszStrmDrvr = NULL; /* name of the stream driver to use */ -static int iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */ -static int bResendLastOnRecon = 0; /* should the last message be re-sent on a successful reconnect? */ -static uchar *pszStrmDrvrAuthMode = NULL; /* authentication mode to use */ -static int iUDPRebindInterval = 0; /* support for automatic re-binding (load balancers!). 0 - no rebind */ -static int iTCPRebindInterval = 0; /* support for automatic re-binding (load balancers!). 0 - no rebind */ +typedef struct configSettings_s { + uchar *pszTplName; /* name of the default template to use */ + uchar *pszStrmDrvr; /* name of the stream driver to use */ + int iStrmDrvrMode; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */ + int bResendLastOnRecon; /* should the last message be re-sent on a successful reconnect? */ + uchar *pszStrmDrvrAuthMode; /* authentication mode to use */ + int iUDPRebindInterval; /* support for automatic re-binding (load balancers!). 0 - no rebind */ + int iTCPRebindInterval; /* support for automatic re-binding (load balancers!). 0 - no rebind */ + permittedPeers_t *pPermPeers; +} configSettings_t; + +SCOPING_SUPPORT; /* must be set AFTER configSettings_t is defined */ + +BEGINinitConfVars /* (re)set config variables to default values */ +CODESTARTinitConfVars + cs.pszTplName = NULL; /* name of the default template to use */ + cs.pszStrmDrvr = NULL; /* name of the stream driver to use */ + cs.iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */ + cs.bResendLastOnRecon = 0; /* should the last message be re-sent on a successful reconnect? */ + cs.pszStrmDrvrAuthMode = NULL; /* authentication mode to use */ + cs.iUDPRebindInterval = 0; /* support for automatic re-binding (load balancers!). 0 - no rebind */ + cs.iTCPRebindInterval = 0; /* support for automatic re-binding (load balancers!). 0 - no rebind */ + cs.pPermPeers = NULL; +ENDinitConfVars -static permittedPeers_t *pPermPeers = NULL; static rsRetVal doTryResume(instanceData *pData); @@ -151,6 +169,10 @@ static char *getFwdPt(instanceData *pData) * This, for example, is needed after something went wrong. * This function is void because it "can not" fail. * rgerhards, 2008-06-04 + * Note that we DO NOT discard the current buffer contents + * (if any). This permits us to save data between sessions. In + * the wort case, some duplication occurs, but we do not + * loose data. */ static inline void DestructTCPInstanceData(instanceData *pData) @@ -164,6 +186,7 @@ DestructTCPInstanceData(instanceData *pData) BEGINcreateInstance CODESTARTcreateInstance + pData->offsSndBuf = 0; ENDcreateInstance @@ -262,7 +285,7 @@ static rsRetVal setPermittedPeer(void __attribute__((unused)) *pVal, uchar *pszID) { DEFiRet; - CHKiRet(net.AddPermittedPeer(&pPermPeers, pszID)); + CHKiRet(net.AddPermittedPeer(&cs.pPermPeers, pszID)); free(pszID); /* no longer needed, but we must free it as of interface def */ finalize_it: RETiRet; @@ -273,30 +296,60 @@ finalize_it: /* CODE FOR SENDING TCP MESSAGES */ -/* Send a frame via plain TCP protocol - * rgerhards, 2007-12-28 +/* Send a buffer via TCP. Usually, this is used to send the current + * send buffer, but if a message is larger than the buffer, we need to + * have the capability to send the message buffer directly. + * rgerhards, 2011-04-04 */ -static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len) +static rsRetVal +TCPSendBuf(instanceData *pData, uchar *buf, unsigned len) { DEFiRet; + unsigned alreadySent; ssize_t lenSend; - instanceData *pData = (instanceData *) pvData; - lenSend = len; + alreadySent = 0; +dbgprintf("omfwd: XXXX: pData %p, pNetStrm %p\n", pData, pData->pNetstrm); netstrm.CheckConnection(pData->pNetstrm); /* hack for plain tcp syslog - see ptcp driver for details */ - CHKiRet(netstrm.Send(pData->pNetstrm, (uchar*)msg, &lenSend)); - dbgprintf("TCP sent %ld bytes, requested %ld\n", (long) lenSend, (long) len); + while(alreadySent != len) { + lenSend = len - alreadySent; + CHKiRet(netstrm.Send(pData->pNetstrm, buf+alreadySent, &lenSend)); + DBGPRINTF("omfwd: TCP sent %ld bytes, requested %u\n", (long) lenSend, len - alreadySent); + alreadySent += lenSend; + } - if(lenSend != (ssize_t) len) { - /* no real error, could "just" not send everything... - * For the time being, we ignore this... - * rgerhards, 2005-10-25 - */ - dbgprintf("message not completely (tcp)send, ignoring %ld\n", (long) lenSend); - usleep(1000); /* experimental - might be benefitial in this situation */ - /* TODO: we need to revisit this code -- rgerhards, 2007-12-28 */ +finalize_it: + RETiRet; +} + + +/* Add frame to send buffer (or send, if requried) + */ +static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len) +{ + DEFiRet; + instanceData *pData = (instanceData *) pvData; + + DBGPRINTF("omfwd: add %u bytes to send buffer (curr offs %u)\n", + (unsigned) len, pData->offsSndBuf); + if(pData->offsSndBuf != 0 && pData->offsSndBuf + len >= sizeof(pData->sndBuf)) { + /* no buffer space left, need to commit previous records */ + CHKiRet(TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf)); + pData->offsSndBuf = 0; + iRet = RS_RET_PREVIOUS_COMMITTED; } + /* check if the message is too large to fit into buffer */ + if(len > sizeof(pData->sndBuf)) { + CHKiRet(TCPSendBuf(pData, (uchar*)msg, len)); + ABORT_FINALIZE(RS_RET_OK); /* committed everything so far */ + } + + /* we now know the buffer has enough free space */ + memcpy(pData->sndBuf + pData->offsSndBuf, msg, len); + pData->offsSndBuf += len; + iRet = RS_RET_DEFER_COMMIT; + finalize_it: RETiRet; } @@ -329,7 +382,7 @@ static rsRetVal TCPSendInit(void *pvData) if(pData->pNetstrm == NULL) { CHKiRet(netstrms.Construct(&pData->pNS)); /* the stream driver must be set before the object is finalized! */ - CHKiRet(netstrms.SetDrvrName(pData->pNS, pszStrmDrvr)); + CHKiRet(netstrms.SetDrvrName(pData->pNS, cs.pszStrmDrvr)); CHKiRet(netstrms.ConstructFinalize(pData->pNS)); /* now create the actual stream and connect to the server */ @@ -411,6 +464,13 @@ CODESTARTtryResume iRet = doTryResume(pData); ENDtryResume + +BEGINbeginTransaction +CODESTARTbeginTransaction +dbgprintf("omfwd: beginTransaction\n"); +ENDbeginTransaction + + BEGINdoAction char *psz = NULL; /* temporary buffering */ register unsigned l; @@ -477,9 +537,8 @@ CODESTARTdoAction CHKiRet(UDPSend(pData, psz, l)); } else { /* forward via TCP */ - rsRetVal ret; - ret = tcpclt.Send(pData->pTCPClt, pData, psz, l); - if(ret != RS_RET_OK) { + iRet = tcpclt.Send(pData->pTCPClt, pData, psz, l); + if(iRet != RS_RET_OK && iRet != RS_RET_DEFER_COMMIT && iRet != RS_RET_PREVIOUS_COMMITTED) { /* error! */ dbgprintf("error forwarding via tcp, suspending\n"); DestructTCPInstanceData(pData); @@ -496,6 +555,17 @@ finalize_it: ENDdoAction +BEGINendTransaction +CODESTARTendTransaction +dbgprintf("omfwd: endTransaction, offsSndBuf %u\n", pData->offsSndBuf); + if(pData->offsSndBuf != 0) { + iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); + pData->offsSndBuf = 0; + } +ENDendTransaction + + + /* This function loads TCP support, if not already loaded. It will be called * during config processing. To server ressources, TCP support will only * be loaded if it actually is used. -- rgerhard, 2008-04-17 @@ -512,7 +582,6 @@ finalize_it: RETiRet; } - BEGINparseSelectorAct uchar *q; int i; @@ -652,32 +721,32 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) } /* copy over config data as needed */ - pData->iUDPRebindInterval = iUDPRebindInterval; - pData->iTCPRebindInterval = iTCPRebindInterval; + pData->iUDPRebindInterval = cs.iUDPRebindInterval; + pData->iTCPRebindInterval = cs.iTCPRebindInterval; /* process template */ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, - (pszTplName == NULL) ? (uchar*)"RSYSLOG_TraditionalForwardFormat" : pszTplName)); + (cs.pszTplName == NULL) ? (uchar*)"RSYSLOG_TraditionalForwardFormat" : cs.pszTplName)); if(pData->protocol == FORW_TCP) { /* create our tcpclt */ CHKiRet(tcpclt.Construct(&pData->pTCPClt)); - CHKiRet(tcpclt.SetResendLastOnRecon(pData->pTCPClt, bResendLastOnRecon)); + CHKiRet(tcpclt.SetResendLastOnRecon(pData->pTCPClt, cs.bResendLastOnRecon)); /* and set callbacks */ CHKiRet(tcpclt.SetSendInit(pData->pTCPClt, TCPSendInit)); CHKiRet(tcpclt.SetSendFrame(pData->pTCPClt, TCPSendFrame)); CHKiRet(tcpclt.SetSendPrepRetry(pData->pTCPClt, TCPSendPrepRetry)); CHKiRet(tcpclt.SetFraming(pData->pTCPClt, tcp_framing)); CHKiRet(tcpclt.SetRebindInterval(pData->pTCPClt, pData->iTCPRebindInterval)); - pData->iStrmDrvrMode = iStrmDrvrMode; - if(pszStrmDrvr != NULL) - CHKmalloc(pData->pszStrmDrvr = (uchar*)strdup((char*)pszStrmDrvr)); - if(pszStrmDrvrAuthMode != NULL) + pData->iStrmDrvrMode = cs.iStrmDrvrMode; + if(cs.pszStrmDrvr != NULL) + CHKmalloc(pData->pszStrmDrvr = (uchar*)strdup((char*)cs.pszStrmDrvr)); + if(cs.pszStrmDrvrAuthMode != NULL) CHKmalloc(pData->pszStrmDrvrAuthMode = - (uchar*)strdup((char*)pszStrmDrvrAuthMode)); - if(pPermPeers != NULL) { - pData->pPermPeers = pPermPeers; - pPermPeers = NULL; + (uchar*)strdup((char*)cs.pszStrmDrvrAuthMode)); + if(cs.pPermPeers != NULL) { + pData->pPermPeers = cs.pPermPeers; + cs.pPermPeers = NULL; } } @@ -691,21 +760,14 @@ ENDparseSelectorAct static void freeConfigVars(void) { - if(pszTplName != NULL) { - free(pszTplName); - pszTplName = NULL; - } - if(pszStrmDrvr != NULL) { - free(pszStrmDrvr); - pszStrmDrvr = NULL; - } - if(pszStrmDrvrAuthMode != NULL) { - free(pszStrmDrvrAuthMode); - pszStrmDrvrAuthMode = NULL; - } - if(pPermPeers != NULL) { - free(pPermPeers); - } + free(cs.pszTplName); + cs.pszTplName = NULL; + free(cs.pszStrmDrvr); + cs.pszStrmDrvr = NULL; + free(cs.pszStrmDrvrAuthMode); + cs.pszStrmDrvrAuthMode = NULL; + free(cs.pPermPeers); + cs.pPermPeers = NULL; /* TODO: fix in older builds! */ } @@ -726,6 +788,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt @@ -737,10 +800,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a freeConfigVars(); /* we now must reset all non-string values */ - iStrmDrvrMode = 0; - bResendLastOnRecon = 0; - iUDPRebindInterval = 0; - iTCPRebindInterval = 0; + cs.iStrmDrvrMode = 0; + cs.bResendLastOnRecon = 0; + cs.iUDPRebindInterval = 0; + cs.iTCPRebindInterval = 0; return RS_RET_OK; } @@ -748,21 +811,22 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a BEGINmodInit(Fwd) CODESTARTmodInit +SCOPINGmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(net,LM_NET_FILENAME)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionforwarddefaulttemplate", 0, eCmdHdlrGetWord, NULL, &pszTplName, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionsendtcprebindinterval", 0, eCmdHdlrInt, NULL, &iTCPRebindInterval, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionsendudprebindinterval", 0, eCmdHdlrInt, NULL, &iUDPRebindInterval, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionsendstreamdriver", 0, eCmdHdlrGetWord, NULL, &pszStrmDrvr, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionsendstreamdrivermode", 0, eCmdHdlrInt, NULL, &iStrmDrvrMode, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionsendstreamdriverauthmode", 0, eCmdHdlrGetWord, NULL, &pszStrmDrvrAuthMode, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionsendstreamdriverpermittedpeer", 0, eCmdHdlrGetWord, setPermittedPeer, NULL, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionsendresendlastmsgonreconnect", 0, eCmdHdlrBinary, NULL, &bResendLastOnRecon, NULL)); - CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionforwarddefaulttemplate", 0, eCmdHdlrGetWord, NULL, &cs.pszTplName, NULL, eConfObjAction)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionsendtcprebindinterval", 0, eCmdHdlrInt, NULL, &cs.iTCPRebindInterval, NULL, eConfObjAction)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionsendudprebindinterval", 0, eCmdHdlrInt, NULL, &cs.iUDPRebindInterval, NULL, eConfObjAction)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionsendstreamdriver", 0, eCmdHdlrGetWord, NULL, &cs.pszStrmDrvr, NULL, eConfObjAction)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionsendstreamdrivermode", 0, eCmdHdlrInt, NULL, &cs.iStrmDrvrMode, NULL, eConfObjAction)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionsendstreamdriverauthmode", 0, eCmdHdlrGetWord, NULL, &cs.pszStrmDrvrAuthMode, NULL, eConfObjAction)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionsendstreamdriverpermittedpeer", 0, eCmdHdlrGetWord, setPermittedPeer, NULL, NULL, eConfObjAction)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionsendresendlastmsgonreconnect", 0, eCmdHdlrBinary, NULL, &cs.bResendLastOnRecon, NULL, eConfObjAction)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID, eConfObjAction)); ENDmodInit /* vim:set ai: |