From 8a77bc82acfb5960e8c4054094f7eb80a158ec1f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 20 Dec 2007 14:34:40 +0000 Subject: removed single-threading support for sending TCP messages; caused simplyfication of output module interface as well as core syslog processing. --- omfwd.c | 155 +++------------------------------------------------------------- 1 file changed, 5 insertions(+), 150 deletions(-) (limited to 'omfwd.c') diff --git a/omfwd.c b/omfwd.c index 13ec3b73..e8f8584c 100644 --- a/omfwd.c +++ b/omfwd.c @@ -47,11 +47,7 @@ #ifdef USE_NETZIP #include #endif -#ifdef USE_PTHREADS #include -#else -#include -#endif #ifdef USE_GSSAPI #include #endif @@ -109,17 +105,8 @@ typedef struct _instanceData { # define FORW_UDP 0 # define FORW_TCP 1 /* following fields for TCP-based delivery */ - enum TCPSendStatus { - TCP_SEND_NOTCONNECTED = 0, - TCP_SEND_CONNECTING = 1, - TCP_SEND_READY = 2 - } status; - char *savedMsg; - int savedMsgLen; /* length of savedMsg in octets */ time_t ttSuspend; /* time selector was suspended */ -# ifdef USE_PTHREADS pthread_mutex_t mtxTCPSend; -# endif # ifdef USE_GSSAPI gss_ctx_id_t gss_context; OM_uint32 gss_flags; @@ -160,12 +147,10 @@ CODESTARTfreeInstance free(pData->port); break; } -# ifdef USE_PTHREADS /* delete any mutex objects, if present */ if(pData->protocol == FORW_TCP) { pthread_mutex_destroy(&pData->mtxTCPSend); } -# endif # ifdef USE_GSSAPI if (gss_mode != GSSMODE_NONE) { OM_uint32 maj_stat, min_stat; @@ -194,50 +179,6 @@ ENDdbgPrintInstInfo /* CODE FOR SENDING TCP MESSAGES */ -/* get send status - * rgerhards, 2005-10-24 - */ -static void TCPSendSetStatus(instanceData *pData, enum TCPSendStatus iNewState) -{ - assert(pData != NULL); - assert(pData->protocol == FORW_TCP); - assert( (iNewState == TCP_SEND_NOTCONNECTED) - || (iNewState == TCP_SEND_CONNECTING) - || (iNewState == TCP_SEND_READY)); - - /* there can potentially be a race condition, so guard by mutex */ -# ifdef USE_PTHREADS - pthread_mutex_lock(&pData->mtxTCPSend); -# endif - pData->status = iNewState; -# ifdef USE_PTHREADS - pthread_mutex_unlock(&pData->mtxTCPSend); -# endif -} - - -/* set send status - * rgerhards, 2005-10-24 - */ -static enum TCPSendStatus TCPSendGetStatus(instanceData *pData) -{ - enum TCPSendStatus eState; - assert(pData != NULL); - assert(pData->protocol == FORW_TCP); - - /* there can potentially be a race condition, so guard by mutex */ -# ifdef USE_PTHREADS - pthread_mutex_lock(&pData->mtxTCPSend); -# endif - eState = pData->status; -# ifdef USE_PTHREADS - pthread_mutex_unlock(&pData->mtxTCPSend); -# endif - - return eState; -} - - /* Initialize TCP sockets (for sender) * This is done once per selector line, if not yet initialized. */ @@ -262,17 +203,12 @@ static int TCPSendCreateSocket(instanceData *pData, struct addrinfo *addrDest) * the receivers and the selector line processing are only loosely * coupled via a memory buffer. Now, I think, we can afford the extra * wait time. Thus, we enable blocking mode for TCP if we compile with - * pthreads. - * rgerhards, 2005-10-25 + * pthreads. -- rgerhards, 2005-10-25 + * And now, we always run on multiple threads... -- rgerhards, 2007-12-20 */ -# ifndef USE_PTHREADS - /* set to nonblocking - rgerhards 2005-07-20 */ - fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK); -# endif if (connect (fd, r->ai_addr, r->ai_addrlen) != 0) { if(errno == EINPROGRESS) { - /* this is normal - will complete during select */ - TCPSendSetStatus(pData, TCP_SEND_CONNECTING); + /* this is normal - will complete later select */ return fd; } else { char errStr[1024]; @@ -282,7 +218,6 @@ static int TCPSendCreateSocket(instanceData *pData, struct addrinfo *addrDest) } else { - TCPSendSetStatus(pData, TCP_SEND_READY); return fd; } close(fd); @@ -437,7 +372,6 @@ static int TCPSendGSSSend(instanceData *pData, char *msg, size_t len) fail: close(s); pData->sock = -1; - TCPSendSetStatus(pData, TCP_SEND_NOTCONNECTED); gss_delete_sec_context(&min_stat, context, GSS_C_NO_BUFFER); *context = GSS_C_NO_CONTEXT; gss_release_buffer(&min_stat, &out_buf); @@ -461,7 +395,7 @@ static int TCPSendGSSSend(instanceData *pData, char *msg, size_t len) * rgerhards 2005-07-06 * This function is now expected to stay. Libloging won't be used for * that purpose. I have added the param "len", because it is known by the - * caller and so safes us some time. Also, it MUST be given because there + * caller and so saves us some time. Also, it MUST be given because there * may be NULs inside msg so that we can not rely on strlen(). Please note * that the restrictions outlined above do not existin in multi-threaded * mode, which we assume will now be most often used. So there is no @@ -487,7 +421,6 @@ static int TCPSend(instanceData *pData, char *msg, size_t len) int bIsCompressed; int lenSend; char *buf = NULL; /* if this is non-NULL, it MUST be freed before return! */ - enum TCPSendStatus eState; TCPFRAMINGMODE framingToUse; assert(pData != NULL); @@ -514,41 +447,12 @@ static int TCPSend(instanceData *pData, char *msg, size_t len) return -1; } - eState = TCPSendGetStatus(pData); /* cache info */ - - if(eState == TCP_SEND_CONNECTING) { - /* In this case, we save the buffer. If we have a - * system with few messages, that hopefully prevents - * message loss at all. However, we make no further attempts, - * just the first message is saved. So we only try this - * if there is not yet a saved message present. - * rgerhards 2005-07-20 - */ - if(pData->savedMsg == NULL) { - pData->savedMsg = malloc(len * sizeof(char)); - if(pData->savedMsg == NULL) - return 0; /* nothing we can do... */ - memcpy(pData->savedMsg, msg, len); - pData->savedMsgLen = len; - } - return 0; - } else if(eState != TCP_SEND_READY) - /* This here is debatable. For the time being, we - * accept the loss of a single message (e.g. during - * connection setup in favour of not messing with - * wait time and timeouts. The reason is that such - * things might otherwise cost us considerable message - * loss on the receiving side (even at a timeout set - * to just 1 second). - rgerhards 2005-07-20 - */ - return 0; - /* now check if we need to add a line terminator. We need to * copy the string in memory in this case, this is probably * quicker than using writev and definitely quicker than doing * two socket calls. * rgerhards 2005-07-22 - *//* + * * Some messages already contain a \n character at the end * of the message. We append one only if we there is not * already one. This seems the best fit, though this also @@ -678,9 +582,7 @@ static int TCPSend(instanceData *pData, char *msg, size_t len) * rgerhards, 2005-10-25 */ dbgprintf("message not completely (tcp)send, ignoring %d\n", lenSend); -# if USE_PTHREADS usleep(1000); /* experimental - might be benefitial in this situation */ -# endif if(buf != NULL) free(buf); return 0; @@ -694,20 +596,6 @@ static int TCPSend(instanceData *pData, char *msg, size_t len) free(buf); return 0; break; - case EINPROGRESS: - case EAGAIN: - dbgprintf("message not (tcp)send, would block\n"); -# if USE_PTHREADS - usleep(1000); /* experimental - might be benefitial in this situation */ -# endif - /* we loose this message, but that's better than loosing - * all ;) - */ - /* This is not a real error, so it is not flagged as one */ - if(buf != NULL) - free(buf); - return 0; - break; default: dbgprintf("message not (tcp)send"); break; @@ -717,7 +605,6 @@ static int TCPSend(instanceData *pData, char *msg, size_t len) ++retry; /* try to recover */ close(pData->sock); - TCPSendSetStatus(pData, TCP_SEND_NOTCONNECTED); pData->sock = -1; } else { if(buf != NULL) @@ -947,9 +834,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) pData->protocol = FORW_TCP; ++p; /* eat this '@', too */ /* in this case, we also need a mutex... */ -# ifdef USE_PTHREADS pthread_mutex_init(&pData->mtxTCPSend, 0); -# endif } else { pData->protocol = FORW_UDP; } @@ -1104,36 +989,6 @@ CODESTARTneedUDPSocket ENDneedUDPSocket -BEGINonSelectReadyWrite -CODESTARTonSelectReadyWrite - dbgprintf("tcp send socket %d ready for writing.\n", pData->sock); - TCPSendSetStatus(pData, TCP_SEND_READY); - /* Send stored message (if any) */ - if(pData->savedMsg != NULL) { - if(TCPSend(pData, pData->savedMsg, - pData->savedMsgLen) != 0) { - /* error! */ - pData->eDestState = eDestFORW_SUSP; - errno = 0; - logerror("error forwarding via tcp, suspending..."); - } - free(pData->savedMsg); - pData->savedMsg = NULL; - } -ENDonSelectReadyWrite - - -BEGINgetWriteFDForSelect -CODESTARTgetWriteFDForSelect - if( (pData->eDestState == eDestFORW) - && (pData->protocol == FORW_TCP) - && TCPSendGetStatus(pData) == TCP_SEND_CONNECTING) { - *fd = pData->sock; - iRet = RS_RET_OK; - } -ENDgetWriteFDForSelect - - BEGINmodExit CODESTARTmodExit ENDmodExit -- cgit