From 090627b79c1e801bc11c706f62628c7517dd3596 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 25 Oct 2005 13:34:14 +0000 Subject: fixed some issues with TCP sender --- syslogd.c | 56 ++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 10 deletions(-) (limited to 'syslogd.c') diff --git a/syslogd.c b/syslogd.c index 7e5929f2..418acd69 100644 --- a/syslogd.c +++ b/syslogd.c @@ -1339,8 +1339,22 @@ static int TCPSendCreateSocket(struct filed *f) return fd; } + /* We can not allow the TCP sender to block syslogd, at least + * not in a single-threaded design. That would cause rsyslogd to + * loose input messages - which obviously also would affect + * other selector lines, too. So we do set it to non-blocking and + * handle the situation ourselfs (by discarding messages). IF we run + * dual-threaded, however, the situation is different: in this case, + * the receivers and the selector line processing is only loosely + * coupled via a memory buffer. Now, I think, we can afford the extra + * wait time. Thus, we enable blocking mode for TCP if we compile with + * pthreads. + * rgerhards, 2005-10-25 + */ +# ifndef USE_PTHREADS /* set to nonblocking - rgerhards 2005-07-20 */ fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK); +# endif if(connect(fd, (struct sockaddr*) &(f->f_un.f_forw.f_addr), addrlen) < 0) { if(errno == EINPROGRESS) { @@ -1391,8 +1405,9 @@ static int TCPSend(struct filed *f, char *msg) do { /* try to send message */ if(f->f_file <= 0) { /* we need to open the socket first */ - if((f->f_file = TCPSendCreateSocket(f)) <= 0) + if((f->f_file = TCPSendCreateSocket(f)) <= 0) { return -1; + } } eState = TCPSendGetStatus(f); /* cache info */ @@ -1405,13 +1420,13 @@ static int TCPSend(struct filed *f, char *msg) * if there is not yet a saved message present. * rgerhards 2005-07-20 */ - if(f->f_un.f_forw.savedMsg == NULL) { + if(f->f_un.f_forw.savedMsg == NULL) { f->f_un.f_forw.savedMsg = malloc((len + 1) * sizeof(char)); if(f->f_un.f_forw.savedMsg == NULL) return 0; /* nothing we can do... */ memcpy(f->f_un.f_forw.savedMsg, msg, len + 1); - return 0; - } + } + return 0; } else if(eState != TCP_SEND_READY) /* This here is debatable. For the time being, we * accept the loss of a single message (e.g. during @@ -1482,18 +1497,41 @@ static int TCPSend(struct filed *f, char *msg) free(buf); } return 0; + } else if(lenSend != -1) { + /* no real error, could "just" not send everything... + * For the time being, we ignore this... + * rgerhards, 2005-10-25 + */ + dprintf("message not completely (tcp)send, ignoring %d\n", lenSend); +# if USE_PTHREADS + usleep(1000); /* experimental - might be benefitial in this situation */ +# endif + if(buf != NULL) + free(buf); + return 0; } switch(errno) { case EMSGSIZE: dprintf("message not (tcp)send, too large\n"); + /* This is not a real error, so it is not flagged as one */ + if(buf != NULL) + free(buf); + return 0; break; case EINPROGRESS: case EAGAIN: dprintf("message not (tcp)send, would block\n"); +# if USE_PTHREADS + usleep(1000); /* experimental - might be benefitial in this situation */ +# endif /* we loose this message, but that's better than loosing * all ;) */ + /* This is not a real error, so it is not flagged as one */ + if(buf != NULL) + free(buf); + return 0; break; default: f_type = f->f_type; @@ -3221,19 +3259,18 @@ int main(int argc, char **argv) && (TCPSendGetStatus(f) == TCP_SEND_CONNECTING) && (FD_ISSET(f->f_file, &writefds))) { dprintf("tcp send socket %d ready for writing.\n", f->f_file); - /* TODO: multithreading note: at least in theory, this - * must be guarded by a mutex! rgerhards, 2005-10-24 - */ TCPSendSetStatus(f, TCP_SEND_READY); /* Send stored message (if any) */ - if(f->f_un.f_forw.savedMsg != NULL) + if(f->f_un.f_forw.savedMsg != NULL) { if(TCPSend(f, f->f_un.f_forw.savedMsg) != 0) { /* error! */ f->f_type = F_FORW_SUSP; errno = 0; logerror("error forwarding via tcp, suspending..."); } - f->f_un.f_forw.savedMsg = NULL; + free(f->f_un.f_forw.savedMsg); + f->f_un.f_forw.savedMsg = NULL; + } } } } @@ -4171,7 +4208,6 @@ static void enqueueMsg(struct msg *pMsg) dprintf ("enqueueMsg: queue FULL.\n"); pthread_cond_wait (fifo->notFull, fifo->mut); } - dprintf("enqueue and add ref\n"); queueAdd(fifo, MsgAddRef(pMsg)); /* now activate the worker thread */ pthread_mutex_unlock(fifo->mut); -- cgit