summaryrefslogtreecommitdiffstats
path: root/omfwd.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2007-12-20 14:34:40 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2007-12-20 14:34:40 +0000
commit8a77bc82acfb5960e8c4054094f7eb80a158ec1f (patch)
treea447af04d7419c2049442d089ed3c6565d687bb1 /omfwd.c
parenta5ac2e420e6f58f2cad5123ea674b0da5c016aea (diff)
downloadrsyslog-8a77bc82acfb5960e8c4054094f7eb80a158ec1f.tar.gz
rsyslog-8a77bc82acfb5960e8c4054094f7eb80a158ec1f.tar.xz
rsyslog-8a77bc82acfb5960e8c4054094f7eb80a158ec1f.zip
removed single-threading support for sending TCP messages; caused
simplyfication of output module interface as well as core syslog processing.
Diffstat (limited to 'omfwd.c')
-rw-r--r--omfwd.c155
1 files changed, 5 insertions, 150 deletions
diff --git a/omfwd.c b/omfwd.c
index 13ec3b73..e8f8584c 100644
--- a/omfwd.c
+++ b/omfwd.c
@@ -47,11 +47,7 @@
#ifdef USE_NETZIP
#include <zlib.h>
#endif
-#ifdef USE_PTHREADS
#include <pthread.h>
-#else
-#include <fcntl.h>
-#endif
#ifdef USE_GSSAPI
#include <gssapi.h>
#endif
@@ -109,17 +105,8 @@ typedef struct _instanceData {
# define FORW_UDP 0
# define FORW_TCP 1
/* following fields for TCP-based delivery */
- enum TCPSendStatus {
- TCP_SEND_NOTCONNECTED = 0,
- TCP_SEND_CONNECTING = 1,
- TCP_SEND_READY = 2
- } status;
- char *savedMsg;
- int savedMsgLen; /* length of savedMsg in octets */
time_t ttSuspend; /* time selector was suspended */
-# ifdef USE_PTHREADS
pthread_mutex_t mtxTCPSend;
-# endif
# ifdef USE_GSSAPI
gss_ctx_id_t gss_context;
OM_uint32 gss_flags;
@@ -160,12 +147,10 @@ CODESTARTfreeInstance
free(pData->port);
break;
}
-# ifdef USE_PTHREADS
/* delete any mutex objects, if present */
if(pData->protocol == FORW_TCP) {
pthread_mutex_destroy(&pData->mtxTCPSend);
}
-# endif
# ifdef USE_GSSAPI
if (gss_mode != GSSMODE_NONE) {
OM_uint32 maj_stat, min_stat;
@@ -194,50 +179,6 @@ ENDdbgPrintInstInfo
/* CODE FOR SENDING TCP MESSAGES */
-/* get send status
- * rgerhards, 2005-10-24
- */
-static void TCPSendSetStatus(instanceData *pData, enum TCPSendStatus iNewState)
-{
- assert(pData != NULL);
- assert(pData->protocol == FORW_TCP);
- assert( (iNewState == TCP_SEND_NOTCONNECTED)
- || (iNewState == TCP_SEND_CONNECTING)
- || (iNewState == TCP_SEND_READY));
-
- /* there can potentially be a race condition, so guard by mutex */
-# ifdef USE_PTHREADS
- pthread_mutex_lock(&pData->mtxTCPSend);
-# endif
- pData->status = iNewState;
-# ifdef USE_PTHREADS
- pthread_mutex_unlock(&pData->mtxTCPSend);
-# endif
-}
-
-
-/* set send status
- * rgerhards, 2005-10-24
- */
-static enum TCPSendStatus TCPSendGetStatus(instanceData *pData)
-{
- enum TCPSendStatus eState;
- assert(pData != NULL);
- assert(pData->protocol == FORW_TCP);
-
- /* there can potentially be a race condition, so guard by mutex */
-# ifdef USE_PTHREADS
- pthread_mutex_lock(&pData->mtxTCPSend);
-# endif
- eState = pData->status;
-# ifdef USE_PTHREADS
- pthread_mutex_unlock(&pData->mtxTCPSend);
-# endif
-
- return eState;
-}
-
-
/* Initialize TCP sockets (for sender)
* This is done once per selector line, if not yet initialized.
*/
@@ -262,17 +203,12 @@ static int TCPSendCreateSocket(instanceData *pData, struct addrinfo *addrDest)
* the receivers and the selector line processing are only loosely
* coupled via a memory buffer. Now, I think, we can afford the extra
* wait time. Thus, we enable blocking mode for TCP if we compile with
- * pthreads.
- * rgerhards, 2005-10-25
+ * pthreads. -- rgerhards, 2005-10-25
+ * And now, we always run on multiple threads... -- rgerhards, 2007-12-20
*/
-# ifndef USE_PTHREADS
- /* set to nonblocking - rgerhards 2005-07-20 */
- fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
-# endif
if (connect (fd, r->ai_addr, r->ai_addrlen) != 0) {
if(errno == EINPROGRESS) {
- /* this is normal - will complete during select */
- TCPSendSetStatus(pData, TCP_SEND_CONNECTING);
+ /* this is normal - will complete later select */
return fd;
} else {
char errStr[1024];
@@ -282,7 +218,6 @@ static int TCPSendCreateSocket(instanceData *pData, struct addrinfo *addrDest)
}
else {
- TCPSendSetStatus(pData, TCP_SEND_READY);
return fd;
}
close(fd);
@@ -437,7 +372,6 @@ static int TCPSendGSSSend(instanceData *pData, char *msg, size_t len)
fail:
close(s);
pData->sock = -1;
- TCPSendSetStatus(pData, TCP_SEND_NOTCONNECTED);
gss_delete_sec_context(&min_stat, context, GSS_C_NO_BUFFER);
*context = GSS_C_NO_CONTEXT;
gss_release_buffer(&min_stat, &out_buf);
@@ -461,7 +395,7 @@ static int TCPSendGSSSend(instanceData *pData, char *msg, size_t len)
* rgerhards 2005-07-06
* This function is now expected to stay. Libloging won't be used for
* that purpose. I have added the param "len", because it is known by the
- * caller and so safes us some time. Also, it MUST be given because there
+ * caller and so saves us some time. Also, it MUST be given because there
* may be NULs inside msg so that we can not rely on strlen(). Please note
* that the restrictions outlined above do not existin in multi-threaded
* mode, which we assume will now be most often used. So there is no
@@ -487,7 +421,6 @@ static int TCPSend(instanceData *pData, char *msg, size_t len)
int bIsCompressed;
int lenSend;
char *buf = NULL; /* if this is non-NULL, it MUST be freed before return! */
- enum TCPSendStatus eState;
TCPFRAMINGMODE framingToUse;
assert(pData != NULL);
@@ -514,41 +447,12 @@ static int TCPSend(instanceData *pData, char *msg, size_t len)
return -1;
}
- eState = TCPSendGetStatus(pData); /* cache info */
-
- if(eState == TCP_SEND_CONNECTING) {
- /* In this case, we save the buffer. If we have a
- * system with few messages, that hopefully prevents
- * message loss at all. However, we make no further attempts,
- * just the first message is saved. So we only try this
- * if there is not yet a saved message present.
- * rgerhards 2005-07-20
- */
- if(pData->savedMsg == NULL) {
- pData->savedMsg = malloc(len * sizeof(char));
- if(pData->savedMsg == NULL)
- return 0; /* nothing we can do... */
- memcpy(pData->savedMsg, msg, len);
- pData->savedMsgLen = len;
- }
- return 0;
- } else if(eState != TCP_SEND_READY)
- /* This here is debatable. For the time being, we
- * accept the loss of a single message (e.g. during
- * connection setup in favour of not messing with
- * wait time and timeouts. The reason is that such
- * things might otherwise cost us considerable message
- * loss on the receiving side (even at a timeout set
- * to just 1 second). - rgerhards 2005-07-20
- */
- return 0;
-
/* now check if we need to add a line terminator. We need to
* copy the string in memory in this case, this is probably
* quicker than using writev and definitely quicker than doing
* two socket calls.
* rgerhards 2005-07-22
- *//*
+ *
* Some messages already contain a \n character at the end
* of the message. We append one only if we there is not
* already one. This seems the best fit, though this also
@@ -678,9 +582,7 @@ static int TCPSend(instanceData *pData, char *msg, size_t len)
* rgerhards, 2005-10-25
*/
dbgprintf("message not completely (tcp)send, ignoring %d\n", lenSend);
-# if USE_PTHREADS
usleep(1000); /* experimental - might be benefitial in this situation */
-# endif
if(buf != NULL)
free(buf);
return 0;
@@ -694,20 +596,6 @@ static int TCPSend(instanceData *pData, char *msg, size_t len)
free(buf);
return 0;
break;
- case EINPROGRESS:
- case EAGAIN:
- dbgprintf("message not (tcp)send, would block\n");
-# if USE_PTHREADS
- usleep(1000); /* experimental - might be benefitial in this situation */
-# endif
- /* we loose this message, but that's better than loosing
- * all ;)
- */
- /* This is not a real error, so it is not flagged as one */
- if(buf != NULL)
- free(buf);
- return 0;
- break;
default:
dbgprintf("message not (tcp)send");
break;
@@ -717,7 +605,6 @@ static int TCPSend(instanceData *pData, char *msg, size_t len)
++retry;
/* try to recover */
close(pData->sock);
- TCPSendSetStatus(pData, TCP_SEND_NOTCONNECTED);
pData->sock = -1;
} else {
if(buf != NULL)
@@ -947,9 +834,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
pData->protocol = FORW_TCP;
++p; /* eat this '@', too */
/* in this case, we also need a mutex... */
-# ifdef USE_PTHREADS
pthread_mutex_init(&pData->mtxTCPSend, 0);
-# endif
} else {
pData->protocol = FORW_UDP;
}
@@ -1104,36 +989,6 @@ CODESTARTneedUDPSocket
ENDneedUDPSocket
-BEGINonSelectReadyWrite
-CODESTARTonSelectReadyWrite
- dbgprintf("tcp send socket %d ready for writing.\n", pData->sock);
- TCPSendSetStatus(pData, TCP_SEND_READY);
- /* Send stored message (if any) */
- if(pData->savedMsg != NULL) {
- if(TCPSend(pData, pData->savedMsg,
- pData->savedMsgLen) != 0) {
- /* error! */
- pData->eDestState = eDestFORW_SUSP;
- errno = 0;
- logerror("error forwarding via tcp, suspending...");
- }
- free(pData->savedMsg);
- pData->savedMsg = NULL;
- }
-ENDonSelectReadyWrite
-
-
-BEGINgetWriteFDForSelect
-CODESTARTgetWriteFDForSelect
- if( (pData->eDestState == eDestFORW)
- && (pData->protocol == FORW_TCP)
- && TCPSendGetStatus(pData) == TCP_SEND_CONNECTING) {
- *fd = pData->sock;
- iRet = RS_RET_OK;
- }
-ENDgetWriteFDForSelect
-
-
BEGINmodExit
CODESTARTmodExit
ENDmodExit