summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog2
-rw-r--r--action.c32
-rw-r--r--configure.ac4
-rw-r--r--doc/queues.html7
-rw-r--r--plugins/imudp/imudp.c56
-rw-r--r--runtime/debug.c5
-rw-r--r--runtime/msg.c4
-rw-r--r--runtime/srutils.c4
-rw-r--r--runtime/wtp.c2
-rw-r--r--threads.c2
-rw-r--r--tools/Makefile.am3
-rw-r--r--tools/msggen.c38
-rw-r--r--tools/syslogd.c8
13 files changed, 127 insertions, 40 deletions
diff --git a/ChangeLog b/ChangeLog
index 7e9580d9..ed6f8723 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,7 @@
---------------------------------------------------------------------------
Version 3.21.6 [DEVEL] (rgerhards), 2008-10-??
+- doc bugfix: queue doc had wrong parameter name for setting controlling
+ worker thread shutdown period
- consolidated time calls during msg object creation, improves performance
and consistency
- bugfix: subsecond time properties generated by imfile, imklog and
diff --git a/action.c b/action.c
index 5c5bdbe9..a25eca23 100644
--- a/action.c
+++ b/action.c
@@ -625,14 +625,44 @@ actionWriteToAction(action_t *pAction)
dbgprintf("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n",
(int) pAction->iSecsExecOnceInterval, (int) getActNow(pAction),
(int) (pAction->iSecsExecOnceInterval + pAction->tLastExec));
+ /* TODO: the time call below may use reception time, not dequeue time - under consideration. -- rgerhards, 2008-09-17 */
+ pAction->tLastExec = getActNow(pAction); /* re-init time flags */
FINALIZE;
}
- pAction->f_time = pAction->tLastExec = getActNow(pAction); /* re-init time flags */
+
+
+ /* TODO: move this to msg object or some other object. This is just for quick testing!
+ * ALSO, THIS DOES NOT YET WORK PROPERLY!
+ * The reason is that we do not know the DST status, which is major pain. I need to
+ * think about obtaining this information (or the actual Unix timestamp) when I
+ * create the reception timestamp, but that also means I need to preserve that information
+ * while in the on-disk queue. Also need to think about a few other implications.
+ * rgerhards, 2008-09-17
+ */
+ {
+ struct tm tTm;
+ tTm.tm_sec = pAction->f_pMsg->tRcvdAt.second;
+ tTm.tm_min = pAction->f_pMsg->tRcvdAt.minute;
+ tTm.tm_hour = pAction->f_pMsg->tRcvdAt.hour;
+ tTm.tm_mday = pAction->f_pMsg->tRcvdAt.day;
+ tTm.tm_mon = pAction->f_pMsg->tRcvdAt.month - 1;
+ tTm.tm_year = pAction->f_pMsg->tRcvdAt.year - 1900;
+ /********************************************************************************/
+ tTm.tm_isdst = 1; /* TODO THIS IS JUST VALID FOR THE NEXT FEW DAYS ;) TODO */
+ /********************************************************************************/
+ pAction->f_time = mktime(&tTm);
+/* note: mktime seems to do a stat(/etc/localtime), so this is also sub-optimal! */
+dbgprintf("XXXX create our own timestamp: %ld, system time is %ld\n", pAction->f_time, time(NULL));
+ }
+
+ //pAction->f_time = getActNow(pAction); /* re-init time flags */
/* Note: tLastExec could be set in the if block above, but f_time causes us a hard time
* so far, I do not see a solution to getting rid of it. -- rgerhards, 2008-09-16
*/
+
+
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
diff --git a/configure.ac b/configure.ac
index fea7c063..6b6a699d 100644
--- a/configure.ac
+++ b/configure.ac
@@ -53,7 +53,7 @@ AC_SUBST(dl_libs)
AC_HEADER_RESOLV
AC_HEADER_STDC
AC_HEADER_SYS_WAIT
-AC_CHECK_HEADERS([arpa/inet.h libgen.h fcntl.h locale.h netdb.h netinet/in.h paths.h stddef.h stdlib.h string.h sys/file.h sys/ioctl.h sys/param.h sys/socket.h sys/time.h sys/stat.h syslog.h unistd.h utmp.h])
+AC_CHECK_HEADERS([arpa/inet.h libgen.h fcntl.h locale.h netdb.h netinet/in.h paths.h stddef.h stdlib.h string.h sys/file.h sys/ioctl.h sys/param.h sys/socket.h sys/time.h sys/stat.h syslog.h unistd.h utmp.h sys/epoll.h])
# Checks for typedefs, structures, and compiler characteristics.
AC_C_CONST
@@ -88,7 +88,7 @@ AC_TYPE_SIGNAL
AC_FUNC_STAT
AC_FUNC_STRERROR_R
AC_FUNC_VPRINTF
-AC_CHECK_FUNCS([flock basename alarm clock_gettime gethostbyname gethostname gettimeofday localtime_r memset mkdir regcomp select setid socket strcasecmp strchr strdup strerror strndup strnlen strrchr strstr strtol strtoul uname ttyname_r])
+AC_CHECK_FUNCS([flock basename alarm clock_gettime gethostbyname gethostname gettimeofday localtime_r memset mkdir regcomp select setid socket strcasecmp strchr strdup strerror strndup strnlen strrchr strstr strtol strtoul uname ttyname_r epoll_wait])
# Check for MAXHOSTNAMELEN
AC_MSG_CHECKING(for MAXHOSTNAMELEN)
diff --git a/doc/queues.html b/doc/queues.html
index a2074d36..7461121b 100644
--- a/doc/queues.html
+++ b/doc/queues.html
@@ -219,11 +219,12 @@ parall. Thus, the upper limit ca be set via "<i>$&lt;object&gt;QueueWorkerThread
If it, for example, is set to four, no more than four workers will ever be
started, no matter how many elements are enqueued. </p>
<p>Worker threads that have been started are kept running until an inactivity
-timeout happens. The timeout can be set via "<i>$&lt;object&gt;QueueWorkerTimeoutShutdown</i>"
+timeout happens. The timeout can be set via "<i>$&lt;object&gt;QueueWorkerTimeoutThreadShutdown</i>"
and is specified in milliseconds. If you do not like to keep the workers
running, simply set it to 0, which means immediate timeout and thus immediate
shutdown. But consider that creating threads involves some overhead, and this is
-why we keep them running.</p>
+why we keep them running. If you would like to never shutdown any worker
+threads, specify -1 for this parameter.</p>
<h2>Discarding Messages</h2>
<p>If the queue reaches the so called "discard watermark" (a number of queued
elements), less important messages can automatically be discarded. This is in an
@@ -357,4 +358,4 @@ parameters, because not all are applicable. For example, in current output
module design, actions do not support multi-threading. Consequently, the number
of worker threads is fixed to one for action queues and can not be changed.</p>
-</body></html> \ No newline at end of file
+</body></html>
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 5792a999..f4830a47 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -127,6 +127,8 @@ finalize_it:
/* This function is called to gather input.
+ * Note that udpLstnSocks must be non-NULL because otherwise we would not have
+ * indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02
*/
BEGINrunInput
int maxfds;
@@ -154,17 +156,14 @@ CODESTARTrunInput
maxfds = 0;
FD_ZERO (&readfds);
- /* Add the UDP listen sockets to the list of read descriptors.
- */
- if(udpLstnSocks != NULL) {
- for (i = 0; i < *udpLstnSocks; i++) {
- if (udpLstnSocks[i+1] != -1) {
- if(Debug)
- net.debugListenInfo(udpLstnSocks[i+1], "UDP");
- FD_SET(udpLstnSocks[i+1], &readfds);
- if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1];
- }
- }
+ /* Add the UDP listen sockets to the list of read descriptors. */
+ for (i = 0; i < *udpLstnSocks; i++) {
+ if (udpLstnSocks[i+1] != -1) {
+ if(Debug)
+ net.debugListenInfo(udpLstnSocks[i+1], "UDP");
+ FD_SET(udpLstnSocks[i+1], &readfds);
+ if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1];
+ }
}
if(Debug) {
dbgprintf("--------imUDP calling select, active file descriptors (max %d): ", maxfds);
@@ -177,13 +176,22 @@ CODESTARTrunInput
/* wait for io to become ready */
nfds = select(maxfds+1, (fd_set *) &readfds, NULL, NULL, NULL);
- if(udpLstnSocks != NULL) {
- for (i = 0; nfds && i < *udpLstnSocks; i++) {
- if (FD_ISSET(udpLstnSocks[i+1], &readfds)) {
- socklen = sizeof(frominet);
- l = recvfrom(udpLstnSocks[i+1], (char*) pRcvBuf, iMaxLine, 0,
+ for (i = 0; nfds && i < *udpLstnSocks; i++) {
+ if (FD_ISSET(udpLstnSocks[i+1], &readfds)) {
+ socklen = sizeof(frominet);
+ do {
+ /* we now try to read from the file descriptor until there
+ * is no more data. This is done in the hope to get better performance
+ * out of the system. However, this also means that a descriptor
+ * monopolizes processing while it contains data. This can lead to
+ * data loss in other descriptors. However, if the system is incapable of
+ * handling the workload, we will loss data in any case. So it doesn't really
+ * matter where the actual loss occurs - it is always random, because we depend
+ * on scheduling order. -- rgerhards, 2008-10-02
+ */
+ l = recvfrom(udpLstnSocks[i+1], (char*) pRcvBuf, iMaxLine, 0,
(struct sockaddr *)&frominet, &socklen);
- if (l > 0) {
+ if(l > 0) {
if(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP) == RS_RET_OK) {
dbgprintf("Message from inetd socket: #%d, host: %s\n",
udpLstnSocks[i+1], fromHost);
@@ -205,18 +213,20 @@ CODESTARTrunInput
}
}
}
- } else if (l < 0 && errno != EINTR && errno != EAGAIN) {
+ } else if(l < 0 && errno != EINTR && errno != EAGAIN) {
char errStr[1024];
rs_strerror_r(errno, errStr, sizeof(errStr));
dbgprintf("INET socket error: %d = %s.\n", errno, errStr);
errmsg.LogError(errno, NO_ERRCODE, "recvfrom inet");
/* should be harmless */
sleep(1);
- }
- --nfds; /* indicate we have processed one */
- }
- }
- }
+ }
+ } while(l > 0); /* Warning: do ... while()! */
+
+ --nfds; /* indicate we have processed one descriptor */
+ }
+ }
+ /* end of a run, back to loop for next recv() */
}
return iRet;
diff --git a/runtime/debug.c b/runtime/debug.c
index b0bf76ea..102d5d0f 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -1,3 +1,4 @@
+#include <sys/syscall.h>
/* debug.c
*
* This file proides debug and run time error analysis support. Some of the
@@ -817,7 +818,9 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf);
if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf);
}
- lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "%s: ", pszThrdName);
+
+ // old, reenable TODO lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "%s: ", pszThrdName);
+ lenWriteBuf = snprintf(pszWriteBuf, sizeof(pszWriteBuf), "{%ld}%s: ", (long) syscall(SYS_gettid), pszThrdName);
if(stddbg != -1) write(stddbg, pszWriteBuf, lenWriteBuf);
if(altdbg != -1) write(altdbg, pszWriteBuf, lenWriteBuf);
/* print object name header if we have an object */
diff --git a/runtime/msg.c b/runtime/msg.c
index 9c2e3f17..69296710 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -140,8 +140,8 @@ void (*funcMsgPrepareEnqueue)(msg_t *pMsg);
#define MsgLock(pMsg) funcLock(pMsg)
#define MsgUnlock(pMsg) funcUnlock(pMsg)
#else
-#define MsgLock(pMsg) {dbgprintf("line %d\n - ", __LINE__); funcLock(pMsg);; }
-#define MsgUnlock(pMsg) {dbgprintf("line %d - ", __LINE__); funcUnlock(pMsg); }
+#define MsgLock(pMsg) {dbgprintf("MsgLock line %d\n - ", __LINE__); funcLock(pMsg);; }
+#define MsgUnlock(pMsg) {dbgprintf("MsgUnlock line %d - ", __LINE__); funcUnlock(pMsg); }
#endif
/* the next function is a dummy to be used by the looking functions
diff --git a/runtime/srutils.c b/runtime/srutils.c
index 97cc3252..1280e40d 100644
--- a/runtime/srutils.c
+++ b/runtime/srutils.c
@@ -371,6 +371,7 @@ int getNumberDigits(long lNum)
rsRetVal
timeoutComp(struct timespec *pt, long iTimeout)
{
+ BEGINfunc
assert(pt != NULL);
/* compute timeout */
clock_gettime(CLOCK_REALTIME, pt);
@@ -379,6 +380,7 @@ timeoutComp(struct timespec *pt, long iTimeout)
pt->tv_nsec -= 1000000000;
}
pt->tv_sec += iTimeout / 1000;
+ ENDfunc
return RS_RET_OK; /* so far, this is static... */
}
@@ -393,6 +395,7 @@ timeoutVal(struct timespec *pt)
{
struct timespec t;
long iTimeout;
+ BEGINfunc
assert(pt != NULL);
/* compute timeout */
@@ -403,6 +406,7 @@ timeoutVal(struct timespec *pt)
if(iTimeout < 0)
iTimeout = 0;
+ ENDfunc
return iTimeout;
}
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 734c8d57..45034fa7 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -456,7 +456,7 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
ISOBJ_TYPE_assert(pThis, wtp);
- wtpProcessThrdChanges(pThis);
+ wtpProcessThrdChanges(pThis); // TODO: Performance: this causes a lot of FUTEX calls
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
diff --git a/threads.c b/threads.c
index 61ea8f29..13222694 100644
--- a/threads.c
+++ b/threads.c
@@ -127,7 +127,7 @@ static void* thrdStarter(void *arg)
assert(pThis != NULL);
assert(pThis->pUsrThrdMain != NULL);
- /* block all signalsi */
+ /* block all signals */
sigset_t sigSet;
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
diff --git a/tools/Makefile.am b/tools/Makefile.am
index a265af9c..b66b8e0c 100644
--- a/tools/Makefile.am
+++ b/tools/Makefile.am
@@ -26,8 +26,9 @@ rsyslogd_LDADD = $(zlib_libs) $(pthreads_libs) $(rsrt_libs)
rsyslogd_LDFLAGS = -export-dynamic
if ENABLE_DIAGTOOLS
-sbin_PROGRAMS += rsyslog_diag_hostname
+sbin_PROGRAMS += rsyslog_diag_hostname msggen
rsyslog_diag_hostname_SOURCES = gethostn.c
+msggen_SOURCES = msggen.c
endif
EXTRA_DIST = $(man_MANS)
diff --git a/tools/msggen.c b/tools/msggen.c
new file mode 100644
index 00000000..7990a3c8
--- /dev/null
+++ b/tools/msggen.c
@@ -0,0 +1,38 @@
+/* msggen - a small diagnostic utility that does very quick
+ * syslog() calls.
+ *
+ * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
+
+#include <stdio.h>
+#include <syslog.h>
+
+int main(int argc, char *argv[])
+{
+ int i;
+
+ openlog("msggen", 0 , LOG_LOCAL0);
+
+ for(i = 0 ; i < 10 ; ++i)
+ syslog(LOG_NOTICE, "This is message number %d", i);
+
+ closelog();
+ return 0;
+}
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 7a99fc1d..b576bb6d 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -1931,7 +1931,7 @@ die(int sig)
/* close the inputs */
dbgprintf("Terminating input threads...\n");
- thrdTerminateAll(); /* TODO: inputs only, please */
+ thrdTerminateAll();
/* and THEN send the termination log message (see long comment above) */
if (sig) {
@@ -2171,8 +2171,8 @@ static void dbgPrintInitInfo(void)
cCCEscapeChar);
dbgprintf("Main queue size %d messages.\n", iMainMsgQueueSize);
- dbgprintf("Main queue worker threads: %d, Perists every %d updates.\n",
- iMainMsgQueueNumWorkers, iMainMsgQPersistUpdCnt);
+ dbgprintf("Main queue worker threads: %d, wThread shutdown: %d, Perists every %d updates.\n",
+ iMainMsgQueueNumWorkers, iMainMsgQtoWrkShutdown, iMainMsgQPersistUpdCnt);
dbgprintf("Main queue timeouts: shutdown: %d, action completion shutdown: %d, enq: %d\n",
iMainMsgQtoQShutdown, iMainMsgQtoActShutdown, iMainMsgQtoEnq);
dbgprintf("Main queue watermarks: high: %d, low: %d, discard: %d, discard-severity: %d\n",
@@ -2182,11 +2182,9 @@ static void dbgPrintInitInfo(void)
/* TODO: add
iActionRetryCount = 0;
iActionRetryInterval = 30000;
- static int iMainMsgQtoWrkShutdown = 60000;
static int iMainMsgQtoWrkMinMsgs = 100;
static int iMainMsgQbSaveOnShutdown = 1;
iMainMsgQueMaxDiskSpace = 0;
- setQPROP(queueSettoWrkShutdown, "$MainMsgQueueTimeoutWorkerThreadShutdown", 5000);
setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100);
setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1);
*/