diff options
-rw-r--r-- | ChangeLog | 8 | ||||
-rw-r--r-- | Makefile.am | 8 | ||||
-rw-r--r-- | action.c | 44 | ||||
-rw-r--r-- | action.h | 2 | ||||
-rw-r--r-- | configure.ac | 41 | ||||
-rw-r--r-- | doc/rscript_abnf.html | 2 | ||||
-rw-r--r-- | doc/rsyslog-vers.dot | 82 | ||||
-rw-r--r-- | plugins/omtemplate/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/omtemplate/omtemplate.c | 220 | ||||
-rw-r--r-- | runtime/conf.c | 11 | ||||
-rw-r--r-- | runtime/ctok.c | 17 | ||||
-rw-r--r-- | runtime/expr.c | 63 | ||||
-rw-r--r-- | runtime/modules.c | 4 | ||||
-rw-r--r-- | runtime/net.c | 5 | ||||
-rw-r--r-- | runtime/queue.c | 425 | ||||
-rw-r--r-- | runtime/queue.h | 50 | ||||
-rw-r--r-- | runtime/rsyslog.c | 2 | ||||
-rw-r--r-- | runtime/rsyslog.h | 4 | ||||
-rw-r--r-- | runtime/srutils.c | 2 | ||||
-rw-r--r-- | runtime/vm.c | 29 | ||||
-rw-r--r-- | runtime/vmop.c | 60 | ||||
-rw-r--r-- | runtime/vmop.h | 33 | ||||
-rw-r--r-- | runtime/vmprg.c | 2 | ||||
-rw-r--r-- | runtime/wti.c | 5 | ||||
-rw-r--r-- | runtime/wtp.c | 5 | ||||
-rw-r--r-- | tests/1.rstest | 34 | ||||
-rw-r--r-- | tests/2.rstest | 4 | ||||
-rw-r--r-- | tests/3.rstest | 21 | ||||
-rw-r--r-- | tests/Makefile.am | 2 | ||||
-rw-r--r-- | tests/rscript.c | 5 | ||||
-rw-r--r-- | tools/Makefile.am | 2 | ||||
-rw-r--r-- | tools/omfile.c | 4 | ||||
-rw-r--r-- | tools/syslogd.c | 63 |
33 files changed, 908 insertions, 359 deletions
@@ -1,5 +1,13 @@ --------------------------------------------------------------------------- Version 4.1.6 [DEVEL] (rgerhards), 2009-03-?? +- implemented function support in RainerScript. That means the engine + parses and compile functions, as well as executes a few build-in + ones. Dynamic loading and registration of functions is not yet + supported - but we now have a good foundation to do that later on. + NOTE: nested function calls are not yet supported due to a design + issue with the function call VM instruction set design. +- implemented the strlen() RainerScript function +- added a template output module - fixed a bug that caused action retries not to work correctly situation was only cleared by a restart - bugfix: closed dynafile was potentially never written until another diff --git a/Makefile.am b/Makefile.am index aaca570d..87e378ee 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,3 +1,5 @@ +AUTOMAKE_OPTIONS=dejagnu + sbin_PROGRAMS = pkglib_LTLIBRARIES = @@ -88,6 +90,10 @@ if ENABLE_IMTEMPLATE SUBDIRS += plugins/imtemplate endif +if ENABLE_OMTEMPLATE +SUBDIRS += plugins/omtemplate +endif + if ENABLE_IMFILE SUBDIRS += plugins/imfile endif @@ -114,5 +120,5 @@ SUBDIRS += tests # temporarily be removed below. The intent behind forcing everthing to compile # in a make distcheck is so that we detect code that accidently was not updated # when some global update happened. -DISTCHECK_CONFIGURE_FLAGS=--enable-gssapi_krb5 --enable-imfile --enable-snmp --enable-pgsql --enable-libdbi --enable-mysql --enable-imtemplate --enable-relp --enable-rsyslogd --enable-mail --enable-klog --enable-diagtools --enable-gnutls +DISTCHECK_CONFIGURE_FLAGS=--enable-gssapi_krb5 --enable-imfile --enable-snmp --enable-pgsql --enable-libdbi --enable-mysql --enable-omtemplate --enable-imtemplate --enable-relp --enable-rsyslogd --enable-mail --enable-klog --enable-diagtools --enable-gnutls ACLOCAL_AMFLAGS = -I m4 @@ -180,7 +180,7 @@ rsRetVal actionDestruct(action_t *pThis) ASSERT(pThis != NULL); if(pThis->pQueue != NULL) { - queueDestruct(&pThis->pQueue); + qqueueDestruct(&pThis->pQueue); } if(pThis->pMod != NULL) @@ -255,7 +255,7 @@ actionConstructFinalize(action_t *pThis) * to be run on multiple threads. So far, this is forbidden by the interface * spec. -- rgerhards, 2008-01-30 */ - CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); + CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -268,24 +268,24 @@ actionConstructFinalize(action_t *pThis) errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ } - queueSetpUsr(pThis->pQueue, pThis); - setQPROP(queueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace); - setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); - setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); - setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); - setQPROP(queueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown ); - setQPROP(queueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown); - setQPROP(queueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown); - setQPROP(queueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq); - setQPROP(queueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark); - setQPROP(queueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark); - setQPROP(queueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark); - setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity); - setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs); - setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown); - setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown); - setQPROP(queueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr); - setQPROP(queueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr); + qqueueSetpUsr(pThis->pQueue, pThis); + setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace); + setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); + setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); + setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); + setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown ); + setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown); + setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown); + setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq); + setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark); + setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark); + setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark); + setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs); + setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown); + setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown); + setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr); + setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr); # undef setQPROP # undef setQPROPstr @@ -294,7 +294,7 @@ actionConstructFinalize(action_t *pThis) bActionQSaveOnShutdown, iActionQueMaxDiskSpace); - CHKiRet(queueStart(pThis->pQueue)); + CHKiRet(qqueueStart(pThis->pQueue)); dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue); /* and now reset the queue params (see comment in its function header!) */ @@ -681,7 +681,7 @@ actionWriteToAction(action_t *pAction) /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = queueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg)); + iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg)); if(iRet == RS_RET_OK) pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ @@ -68,7 +68,7 @@ struct action_s { * content later). This is preserved after the message has been * processed - it is also used to detect duplicates. */ - queue_t *pQueue; /* action queue */ + qqueue_t *pQueue; /* action queue */ SYNC_OBJ_TOOL; /* required for mutex support */ pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */ }; diff --git a/configure.ac b/configure.ac index 00b65410..0c9483ca 100644 --- a/configure.ac +++ b/configure.ac @@ -35,6 +35,13 @@ case "${host}" in # do not DEFINE OS_BSD os_type="bsd" ;; + *-*-solaris*) + os_type="solaris" + AC_DEFINE([OS_SOLARIS], [1], [Indicator for a Solaris OS]) + AC_DEFINE([_POSIX_PTHREAD_SEMANTICS], [1], [Use POSIX pthread semantics]) + SOL_LIBS="-lsocket -lnsl" + AC_SUBST(SOL_LIBS) + ;; esac AC_DEFINE_UNQUOTED([HOSTENV], "$host", [the host environment, can be queried via a system variable]) @@ -233,7 +240,10 @@ if test "x$enable_pthreads" != "xno"; then [ AC_DEFINE([USE_PTHREADS], [1], [Multithreading support enabled.]) PTHREADS_LIBS="-lpthread" - PTHREADS_CFLAGS="-pthread" + case "${os_type}" in + solaris) PTHREADS_CFLAGS="-pthreads" ;; + *) PTHREADS_CFLAGS="-pthread" ;; + esac AC_SUBST(PTHREADS_LIBS) AC_SUBST(PTHREADS_CFLAGS) ], @@ -621,9 +631,7 @@ AC_SUBST(LIBLOGGING_CFLAGS) AC_SUBST(LIBLOGGING_LIBS) -# settings for the template input module; copy and modify this code -# if you intend to add your own module. Be sure to replace imtemplate -# by the actual name of your module. +# settings for the file input module; AC_ARG_ENABLE(imfile, [AS_HELP_STRING([--enable-imfile],[file input module enabled @<:@default=no@:>@])], [case "${enableval}" in @@ -639,8 +647,7 @@ AC_ARG_ENABLE(imfile, # AM_CONDITIONAL(ENABLE_IMFILE, test x$enable_imfile = xyes) -AM_CONDITIONAL(ENABLE_IMTEMPLATE, test x$enable_imtemplate = xyes) -# end of copy template - be sure to serach for imtemplate to find everything! + # settings for the template input module; copy and modify this code # if you intend to add your own module. Be sure to replace imtemplate # by the actual name of your module. @@ -661,6 +668,26 @@ AM_CONDITIONAL(ENABLE_IMTEMPLATE, test x$enable_imtemplate = xyes) # end of copy template - be sure to serach for imtemplate to find everything! +# settings for the template output module; copy and modify this code +# if you intend to add your own module. Be sure to replace omtemplate +# by the actual name of your module. +AC_ARG_ENABLE(omtemplate, + [AS_HELP_STRING([--enable-omtemplate],[Compiles omtemplate template module @<:@default=no@:>@])], + [case "${enableval}" in + yes) enable_omtemplate="yes" ;; + no) enable_omtemplate="no" ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-omtemplate) ;; + esac], + [enable_omtemplate=no] +) +# +# you may want to do some library checks here - see snmp, mysql, pgsql modules +# for samples +# +AM_CONDITIONAL(ENABLE_OMTEMPLATE, test x$enable_omtemplate = xyes) +# end of copy template - be sure to serach for omtemplate to find everything! + + AC_CONFIG_FILES([Makefile \ runtime/Makefile \ tools/Makefile \ @@ -673,6 +700,7 @@ AC_CONFIG_FILES([Makefile \ plugins/immark/Makefile \ plugins/imklog/Makefile \ plugins/imtemplate/Makefile \ + plugins/omtemplate/Makefile \ plugins/imfile/Makefile \ plugins/imrelp/Makefile \ plugins/imdiag/Makefile \ @@ -703,6 +731,7 @@ echo "RELP support enabled: $enable_relp" echo "imdiag enabled: $enable_imdiag" echo "file input module enabled: $enable_imfile" echo "input template module will be compiled: $enable_imtemplate" +echo "output template module will be compiled: $enable_omtemplate" echo "Large file support enabled: $enable_largefile" echo "Networking support enabled: $enable_inet" echo "GnuTLS network stream driver enabled: $enable_gnutls" diff --git a/doc/rscript_abnf.html b/doc/rscript_abnf.html index 278fb59c..d60edb5c 100644 --- a/doc/rscript_abnf.html +++ b/doc/rscript_abnf.html @@ -30,6 +30,8 @@ table... values('&$facility&','&$severity&...?]<br> endact table... values('&$facility&','&$severity&...?]<br> )<br><br>... or ...</p><p>define action writeMySQL(<br> type='ommysql.so', queue.mode='disk', queue.highwatermark = 300,<br> action.dbname='events', action.dbuser='uid',<br> [?action.template='templatename'?] or [?action.sql='insert into table... values('&$facility&','&$severity&...?]<br> )</p><p>if $message contains "error" then action writeMySQL(action.dbname='differentDB')</p><p></p><p>[<a href="rsyslog_conf.html">rsyslog.conf overview</a>] +<h2>Implementation</h2> +RainerScript will be implemented via a hand-crafted LL(1) parser. I was tempted to use yacc, but it turned out the resulting code was not thread-safe and as such did not fit within the context of rsyslog. Also, limited error handling is not a real problem for us: if there is a problem in parsing the configuration file, we stop processing. Guessing what was meant and trying to recover would IMHO not be good choices for something like a syslogd. [<a href="manual.html">manual index</a>] [<a href="http://www.rsyslog.com/">rsyslog site</a>]</p> <p><font size="2">This documentation is part of the <a href="http://www.rsyslog.com/">rsyslog</a> diff --git a/doc/rsyslog-vers.dot b/doc/rsyslog-vers.dot new file mode 100644 index 00000000..a5563f94 --- /dev/null +++ b/doc/rsyslog-vers.dot @@ -0,0 +1,82 @@ +// This file is part of rsyslog. +// +// rsyslog "family tree" compressed version +// +// see http://www.graphviz.org for how to obtain the graphviz processor +// which is used to build the actual graph. +// +// generate the graph with +// $ dot rsyslog-vers.dot -Tpng >rsyslog-vers.png + +digraph G { + label="\n\nrsyslog \"family tree\"\nhttp://www.rsyslog.com"; + fontsize=20; + + v1stable [label="v1-stable", shape=box, style=dotted]; + v2stable [label="v2-stable", shape=box, style=filled]; + v3stable [label="v3-stable", shape=box, style=filled]; + beta [label="beta", shape=box, style=filled]; + devel [label="devel", shape=box, style=filled]; + "1.0.5" [style=dotted]; + perf [style=dashed]; + "imudp-select" [style=dashed label="imudp-\nselect"]; + msgnolock [style=dashed]; + "file-errHdlr" [style=dashed]; + solaris [style=dashed]; + tests [style=dashed]; + "0.x.y" [group=master]; + "1.10.0" [group=master]; + "1.21.2" [group=master]; + "3.10.0" [group=master]; + "3.15.1" [group=master]; + "3.17.0" [group=master]; + "3.19.x" [group=master]; + "3.21.x" [group=master]; + "4.1.0" [group=master]; + "4.1.4" [group=master]; + "4.1.5" [group=master]; + "4.1.6" [group=master]; + + sysklogd -> "0.x.y" [color=red]; + "0.x.y" -> "1.0.0"; + "0.x.y" -> "1.10.0" [color=red]; + "1.0.0" -> "1.0.5" [style=dashed]; + "1.10.0" -> "1.21.2" [color=red style=dashed]; + "1.21.2" -> "2.0.0" [color=blue]; + "2.0.0" -> "2.0.5" [style=dashed, color=blue]; + "1.21.2" -> "3.10.0" [color=red]; + "3.10.0" -> "3.15.1" [color=red style=dashed]; + "3.15.1" -> "tests"; + "3.15.1" -> "3.17.0" [color=red style=dashed]; + "3.15.1" -> "3.16.x"; + "3.16.x" -> "3.18.x" [color=blue, style=dashed]; + "3.17.0" -> "3.18.x"; + "3.17.0" -> "3.19.x" [color=red, style=dashed]; + "3.19.x" -> "3.20.x"; + "3.19.x" -> "3.21.x" [color=red]; + "3.18.x" -> debian_lenny; + "3.18.x" -> "3.20.x" [color=blue, style=dashed]; + "3.21.x" -> "3.21.11"; + "3.21.x" -> "4.1.0" [color=red]; + "3.21.x" -> "perf"; + "perf" -> "4.1.0"; + "4.1.0" -> "4.1.4" [color=red, style=dashed]; + "4.1.4" -> "file-errHdlr"; + "4.1.4" -> "4.1.5" [color=red]; + "4.1.5" -> "4.1.6" [color=red]; + "3.21.x" -> msgnolock + "3.21.x" -> "imudp-select"; + "4.1.5" -> solaris; + "file-errHdlr" -> "4.1.6"; + "tests" -> "4.1.6"; + + "1.0.5" -> v1stable [dir=none, style=dotted]; + "2.0.5" -> v2stable [dir=none, style=dotted]; + "3.20.x" -> v3stable [dir=none, style=dotted]; + "3.21.11" -> beta [dir=none, style=dotted]; + "4.1.6" -> devel [dir=none, style=dotted]; + + {rank=same; "4.1.5" "solaris"} + {rank=same; "3.18.x" "debian_lenny"} + {rank=same; "1.0.5" "2.0.5" "3.20.x" "3.21.11" "4.1.6"} +} diff --git a/plugins/omtemplate/Makefile.am b/plugins/omtemplate/Makefile.am new file mode 100644 index 00000000..e816c7c6 --- /dev/null +++ b/plugins/omtemplate/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = omtemplate.la + +omtemplate_la_SOURCES = omtemplate.c +omtemplate_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) +omtemplate_la_LDFLAGS = -module -avoid-version +omtemplate_la_LIBADD = + +EXTRA_DIST = diff --git a/plugins/omtemplate/omtemplate.c b/plugins/omtemplate/omtemplate.c new file mode 100644 index 00000000..e35968ad --- /dev/null +++ b/plugins/omtemplate/omtemplate.c @@ -0,0 +1,220 @@ +/* omtemplate.c + * This is a template for an output module. It implements a very + * simple single-threaded output, just as thought of by the output + * plugin interface. + * + * NOTE: read comments in module-template.h for more specifics! + * + * File begun on 2009-03-16 by RGerhards + * + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <time.h> +#include "dirty.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "cfsysline.h" + +MODULE_TYPE_OUTPUT + +/* internal structures + */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) + +typedef struct _instanceData { + /* here you need to define all action-specific data. A record of type + * instanceData will be handed over to each instance of the action. Keep + * in mind that there may be several invocations of the same type of action + * inside rsyslog.conf, and this is what keeps them apart. Do NOT use + * static data for this! + */ + unsigned iSrvPort; /* sample: server port */ +} instanceData; + +/* config variables + * For the configuration interface, we need to keep track of some settings. This + * is done in global variables. It works as follows: when configuration statements + * are entered, the config file handler (or custom function) sets the global + * variable here. When the action then actually is instantiated, this handler + * copies over to instanceData whatever configuration settings (from the global + * variables) apply. The global variables are NEVER used inside an action + * instance (at least this is how it is supposed to work ;) + */ +static int iSrvPort = 0; /* sample: server port */ + + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + /* use this to specify if select features are supported by this + * plugin. If not, the framework will handle that. Currently, only + * RepeatedMsgReduction ("last message repeated n times") is optional. + */ + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINfreeInstance +CODESTARTfreeInstance + /* this is a cleanup callback. All dynamically-allocated resources + * in instance data must be cleaned up here. Prime examples are + * malloc()ed memory, file & database handles and the like. + */ +ENDfreeInstance + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo + /* permits to spit out some debug info */ +ENDdbgPrintInstInfo + + +BEGINtryResume +CODESTARTtryResume + /* this is called when an action has been suspended and the + * rsyslog core tries to resume it. The action must then + * retry (if possible) and report RS_RET_OK if it succeeded + * or RS_RET_SUSPENDED otherwise. + * Note that no data can be written in this callback, as it is + * not present. Prime examples of what can be retried are + * reconnects to remote hosts, reconnects to database, + * opening of files and the like. + * If there is no retry-type of operation, the action may + * return RS_RET_OK, so that it will get called on its doAction + * entry point (where it receives data), retries there, and + * immediately returns RS_RET_SUSPENDED if that does not work + * out. This disables some optimizations in the core's retry logic, + * but is a valid and expected behaviour. Note that it is also OK + * for the retry entry point to return OK but the immediately following + * doAction call to fail. In real life, for example, a buggy com line + * may cause such behaviour. + * Note that there is no guarantee that the core will very quickly + * call doAction after the retry succeeded. Today, it does, but that may + * not always be the case. + */ +ENDtryResume + +BEGINdoAction +CODESTARTdoAction + /* this is where you receive the message and need to carry out the + * action. Data is provided in ppString[i] where 0 <= i <= num of strings + * requested. + * Return RS_RET_OK if all goes well, RS_RET_SUSPENDED if the action can + * currently not complete, or an error code or RS_RET_DISABLED. The later + * two should only be returned if there is no hope that the action can be + * restored unless an rsyslog restart (prime example is an invalid config). + * Error code or RS_RET_DISABLED permanently disables the action, up to + * the next restart. + */ +ENDdoAction + + +BEGINparseSelectorAct +CODESTARTparseSelectorAct +CODE_STD_STRING_REQUESTparseSelectorAct(1) + /* first check if this config line is actually for us + * This is a clumpsy interface. We receive the action-part of the selector line + * and need to look at the first characters. If they match our signature + * ":omtemplate:", then we need to instantiate an action. It is recommended that + * newer actions just watch for the template and all other parameters are passed in + * via $-config-lines, this will hopefully be compatbile with future config syntaxes. + * If we do not detect our signature, we must return with RS_RET_CONFLINE_UNPROCESSED + * and NOT do anything else. + */ + if(strncmp((char*) p, ":omtemplate:", sizeof(":omtemplate:") - 1)) { + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); + } + + /* ok, if we reach this point, we have something for us */ + p += sizeof(":omtemplate:") - 1; /* eat indicator sequence (-1 because of '\0'!) */ + CHKiRet(createInstance(&pData)); + + /* check if a non-standard template is to be applied */ + if(*(p-1) == ';') + --p; + /* if we have, call rsyslog runtime to get us template. Note that StdFmt below is + * the standard name. Currently, we may need to patch tools/syslogd.c if we need + * to add a new standard template. + */ + CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_RQD_TPL_OPT_SQL, (uchar*) " StdFmt")); + + /* if we reach this point, all went well, and we can copy over to instanceData + * those configuration elements that we need. + */ + pData->iSrvPort = (unsigned) iSrvPort; /* set configured port */ + +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + + +BEGINmodExit +CODESTARTmodExit +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +ENDqueryEtryPt + + +/* Reset config variables for this module to default values. + */ +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + DEFiRet; + iSrvPort = 0; /* zero is the default port */ + RETiRet; +} + + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + /* register our config handlers */ + /* confguration parameters MUST always be specified in lower case! */ + CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionomtemplteserverport", 0, eCmdHdlrInt, NULL, &iSrvPort, STD_LOADABLE_MODULE_ID)); + /* "resetconfigvariables" should be provided. Notat that it is a chained directive */ + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); +ENDmodInit + +/* vi:set ai: + */ diff --git a/runtime/conf.c b/runtime/conf.c index c5208d86..ede15cc7 100644 --- a/runtime/conf.c +++ b/runtime/conf.c @@ -46,7 +46,9 @@ #include <glob.h> #include <sys/types.h> #ifdef HAVE_LIBGEN_H -# include <libgen.h> +# ifndef OS_SOLARIS +# include <libgen.h> +# endif #endif #include "rsyslog.h" @@ -68,6 +70,9 @@ #include "ctok.h" #include "ctok_token.h" +#ifdef OS_SOLARIS +# define NAME_MAX MAXNAMELEN +#endif /* forward definitions */ static rsRetVal cfline(uchar *line, selector_t **pfCurr); @@ -789,6 +794,10 @@ dbgprintf("calling expression parser, pp %p ('%s')\n", *pline, *pline); CHKiRet(ctok.Getpp(tok, pline)); CHKiRet(ctok.Destruct(&tok)); + /* debug support - print vmprg after construction (uncomment to use) */ + /* vmprgDebugPrint(f->f_filterData.f_expr->pVmprg); */ + vmprgDebugPrint(f->f_filterData.f_expr->pVmprg); + /* we now need to skip whitespace to the action part, else we confuse * the legacy rsyslog conf parser. -- rgerhards, 2008-02-25 */ diff --git a/runtime/ctok.c b/runtime/ctok.c index ceab15bd..d2cd8bbd 100644 --- a/runtime/ctok.c +++ b/runtime/ctok.c @@ -259,12 +259,17 @@ ctokGetVar(ctok_t *pThis, ctok_token_t *pToken) } CHKiRet(rsCStrConstruct(&pstrVal)); - /* this loop is quite simple, a variable name is terminated by whitespace. */ - while(!isspace(c)) { + /* this loop is quite simple, a variable name is terminated when a non-supported + * character is detected. Note that we currently permit a numerical digit as the + * first char, which is not permitted by ABNF. -- rgerhards, 2009-03-10 + */ + while(isalpha(c) || isdigit(c) || (c == '_') || (c == '-')) { CHKiRet(rsCStrAppendChar(pstrVal, tolower(c))); CHKiRet(ctokGetCharFromStream(pThis, &c)); } - CHKiRet(rsCStrFinish(pStrB)); + CHKiRet(ctokUngetCharFromStream(pThis, c)); /* put not processed char back */ + + CHKiRet(rsCStrFinish(pstrVal)); CHKiRet(var.SetString(pToken->pVar, pstrVal)); pstrVal = NULL; @@ -389,6 +394,7 @@ ctokGetToken(ctok_t *pThis, ctok_token_t **ppToken) uchar c; uchar szWord[128]; int bRetry = 0; /* retry parse? Only needed for inline comments... */ + cstr_t *pstrVal; ISOBJ_TYPE_assert(pThis, ctok); ASSERT(ppToken != NULL); @@ -512,7 +518,10 @@ ctokGetToken(ctok_t *pThis, ctok_token_t **ppToken) /* push c back, higher level parser needs it */ CHKiRet(ctokUngetCharFromStream(pThis, c)); pToken->tok = ctok_FUNCTION; - /* TODO: fill function name */ + /* fill function name */ + CHKiRet(rsCStrConstruct(&pstrVal)); + CHKiRet(rsCStrSetSzStr(pstrVal, szWord)); + CHKiRet(var.SetString(pToken->pVar, pstrVal)); } else { /* give up... */ dbgprintf("parser has an invalid word (token) '%s'\n", szWord); pToken->tok = ctok_INVALID; diff --git a/runtime/expr.c b/runtime/expr.c index ee5b9e2c..38ed1c68 100644 --- a/runtime/expr.c +++ b/runtime/expr.c @@ -55,11 +55,63 @@ DEFobjCurrIf(ctok) * rgerhards, 2008-02-19 */ -/* forward definiton - thanks to recursive ABNF, we can not avoid at least one ;) */ +/* forward definition - thanks to recursive ABNF, we can not avoid at least one ;) */ static rsRetVal expr(expr_t *pThis, ctok_t *tok); static rsRetVal +function(expr_t *pThis, ctok_t *tok) +{ + DEFiRet; + ctok_token_t *pToken = NULL; + int iNumArgs = 0; + var_t *pVar; + + ISOBJ_TYPE_assert(pThis, expr); + ISOBJ_TYPE_assert(tok, ctok); + + CHKiRet(ctok.GetToken(tok, &pToken)); + /* note: pToken is destructed in finalize_it */ + + if(pToken->tok == ctok_LPAREN) { + CHKiRet(ctok_token.Destruct(&pToken)); /* token processed, "eat" it */ + CHKiRet(ctok.GetToken(tok, &pToken)); /* get next one */ + } else + ABORT_FINALIZE(RS_RET_FUNC_NO_LPAREN); + + /* we first push all the params on the stack. Then we call the function */ + while(pToken->tok != ctok_RPAREN) { + ++iNumArgs; + CHKiRet(ctok.UngetToken(tok, pToken)); /* not for us, so let others process it */ + CHKiRet(expr(pThis, tok)); + CHKiRet(ctok.GetToken(tok, &pToken)); /* get next one, needed for while() check */ + if(pToken->tok == ctok_COMMA) { + CHKiRet(ctok_token.Destruct(&pToken)); /* token processed, "eat" it */ + CHKiRet(ctok.GetToken(tok, &pToken)); /* get next one */ + if(pToken->tok == ctok_RPAREN) { + ABORT_FINALIZE(RS_RET_FUNC_MISSING_EXPR); + } + } + } + + + /* now push number of arguments - this must be on top of the stack */ + CHKiRet(var.Construct(&pVar)); + CHKiRet(var.ConstructFinalize(pVar)); + CHKiRet(var.SetNumber(pVar, iNumArgs)); + CHKiRet(vmprg.AddVarOperation(pThis->pVmprg, opcode_PUSHCONSTANT, pVar)); /* add to program */ + + +finalize_it: + if(pToken != NULL) { + ctok_token.Destruct(&pToken); /* "eat" processed token */ + } + + RETiRet; +} + + +static rsRetVal terminal(expr_t *pThis, ctok_t *tok) { DEFiRet; @@ -85,8 +137,12 @@ terminal(expr_t *pThis, ctok_t *tok) break; case ctok_FUNCTION: dbgoprint((obj_t*) pThis, "function\n"); - /* TODO: vm: call - well, need to implement that first */ - ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); + CHKiRet(function(pThis, tok)); /* this creates the stack call frame */ + /* ... but we place the call instruction onto the stack ourselfs (because + * we have all relevant information) + */ + CHKiRet(ctok_token.UnlinkVar(pToken, &pVar)); + CHKiRet(vmprg.AddVarOperation(pThis->pVmprg, opcode_FUNC_CALL, pVar)); /* add to program */ break; case ctok_MSGVAR: dbgoprint((obj_t*) pThis, "MSGVAR\n"); @@ -406,6 +462,7 @@ ENDobjQueryInterface(expr) */ BEGINObjClassInit(expr, 1, OBJ_IS_CORE_MODULE) /* class, version */ /* request objects we use */ + CHKiRet(objUse(var, CORE_COMPONENT)); CHKiRet(objUse(vmprg, CORE_COMPONENT)); CHKiRet(objUse(var, CORE_COMPONENT)); CHKiRet(objUse(ctok_token, CORE_COMPONENT)); diff --git a/runtime/modules.c b/runtime/modules.c index 169d234b..d548a949 100644 --- a/runtime/modules.c +++ b/runtime/modules.c @@ -49,6 +49,10 @@ #include <unistd.h> #include <sys/file.h> +#ifdef OS_SOLARIS +# define PATH_MAX MAXPATHLEN +#endif + #include "cfsysline.h" #include "modules.h" #include "errmsg.h" diff --git a/runtime/net.c b/runtime/net.c index 4e6d54a1..db2d7e37 100644 --- a/runtime/net.c +++ b/runtime/net.c @@ -63,6 +63,11 @@ #include "errmsg.h" #include "net.h" +#ifdef OS_SOLARIS +# define s6_addr32 _S6_un._S6_u32 + typedef unsigned int u_int32_t; +#endif + MODULE_TYPE_LIB /* static data */ diff --git a/runtime/queue.c b/runtime/queue.c index a3e29a96..c4a0fad2 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -51,19 +51,24 @@ #include "wti.h" #include "atomic.h" +#ifdef OS_SOLARIS +# include <sched.h> +# define pthread_yield() sched_yield() +#endif + /* static data */ DEFobjStaticHelpers DEFobjCurrIf(glbl) /* forward-definitions */ -rsRetVal queueChkPersist(queue_t *pThis); -static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex); -static rsRetVal queueRateLimiter(queue_t *pThis); -static int queueChkStopWrkrDA(queue_t *pThis); -static int queueIsIdleDA(queue_t *pThis); -static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave); -static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2); -static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex); +rsRetVal qqueueChkPersist(qqueue_t *pThis); +static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); +static rsRetVal qqueueRateLimiter(qqueue_t *pThis); +static int qqueueChkStopWrkrDA(qqueue_t *pThis); +static int qqueueIsIdleDA(qqueue_t *pThis); +static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); +static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2); +static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -77,7 +82,7 @@ static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex); * rgerhards, 2008-01-29 */ static inline int -queueGetOverallQueueSize(queue_t *pThis) +qqueueGetOverallQueueSize(qqueue_t *pThis) { #if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */ BEGINfunc @@ -96,7 +101,7 @@ ENDfunc * This function returns void, as it makes no sense to communicate an error back, even if * it happens. */ -static inline void queueDrain(queue_t *pThis) +static inline void queueDrain(qqueue_t *pThis) { void *pUsr; @@ -119,26 +124,26 @@ static inline void queueDrain(queue_t *pThis) * this point in time. The mutex must be locked when * ths function is called. -- rgerhards, 2008-01-25 */ -static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) +static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) { DEFiRet; int iMaxWorkers; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(!pThis->bEnqOnly) { if(pThis->bRunsDA) { /* if we have not yet reached the high water mark, there is no need to start a * worker. -- rgerhards, 2008-01-26 */ - if(queueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { + if(qqueueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } } else { if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { iMaxWorkers = 1; } else { - iMaxWorkers = queueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; + iMaxWorkers = qqueueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ } @@ -153,11 +158,11 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) * rgerhards, 2008-02-27 */ static rsRetVal -queueWaitDAModeInitialized(queue_t *pThis) +qqueueWaitDAModeInitialized(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->bRunsDA); while(pThis->bRunsDA != 2) { @@ -179,17 +184,17 @@ queueWaitDAModeInitialized(queue_t *pThis) * rgerhards, 2008-01-15 */ static rsRetVal -queueTurnOffDAMode(queue_t *pThis) +qqueueTurnOffDAMode(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->bRunsDA); /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need * to wait for its startup... -- rgerhards, 2008-01-25 */ - queueWaitDAModeInitialized(pThis); + qqueueWaitDAModeInitialized(pThis); /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to @@ -208,15 +213,15 @@ queueTurnOffDAMode(queue_t *pThis) /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, * this will be quick. */ - queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ + qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n", iRet); /* now we need to check if the regular queue has some messages. This may be the case * when it is waiting that the high water mark is reached again. If so, we need to start up * a regular worker. -- rgerhards, 2008-01-26 */ - if(queueGetOverallQueueSize(pThis) > 0) { - queueAdviseMaxWorkers(pThis); + if(qqueueGetOverallQueueSize(pThis) > 0) { + qqueueAdviseMaxWorkers(pThis); } } @@ -232,11 +237,11 @@ queueTurnOffDAMode(queue_t *pThis) * rgerhards, 2008-01-14 */ static rsRetVal -queueChkIsDA(queue_t *pThis) +qqueueChkIsDA(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->pszFilePrefix != NULL) { pThis->bIsDA = 1; dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n"); @@ -260,18 +265,18 @@ queueChkIsDA(queue_t *pThis) * rgerhards, 2008-01-15 */ static rsRetVal -queueStartDA(queue_t *pThis) +qqueueStartDA(qqueue_t *pThis) { DEFiRet; uchar pszDAQName[128]; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */ FINALIZE; /* ... then we are already done! */ /* create message queue */ - CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer)); + CHKiRet(qqueueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer)); /* give it a name */ snprintf((char*) pszDAQName, sizeof(pszDAQName)/sizeof(uchar), "%s[DA]", obj.GetName((obj_t*) pThis)); @@ -282,30 +287,30 @@ queueStartDA(queue_t *pThis) */ pThis->pqDA->pqParent = pThis; - CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr)); - CHKiRet(queueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax)); - CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); - CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); - CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); - CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); - CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); - CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); - CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); - CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); - CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); - CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); - CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); + CHKiRet(qqueueSetpUsr(pThis->pqDA, pThis->pUsr)); + CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax)); + CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); + CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); + CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); + CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); + CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq)); + CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); + CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); + CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); + CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0)); + CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0)); if(pThis->toQShutdown == 0) { - CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ + CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ } else { /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND * have an obviously large backlog, we can't finish it in any case. So there is no point * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15 */ - CHKiRet(queueSettoQShutdown(pThis->pqDA, 1)); + CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1)); } - iRet = queueStart(pThis->pqDA); + iRet = qqueueStart(pThis->pqDA); /* file not found is expected, that means it is no previous QIF available */ if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) FINALIZE; /* something is wrong */ @@ -323,12 +328,12 @@ queueStartDA(queue_t *pThis) pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n", - queueGetID(pThis->pqDA)); + qqueueGetID(pThis->pqDA)); finalize_it: if(iRet != RS_RET_OK) { if(pThis->pqDA != NULL) { - queueDestruct(&pThis->pqDA); + qqueueDestruct(&pThis->pqDA); } dbgoprint((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet); pThis->bIsDA = 0; @@ -345,7 +350,7 @@ finalize_it: * rgerhards, 2008-01-16 */ static inline rsRetVal -queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) +qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; @@ -363,12 +368,12 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) queueConsumerCancelCleanup)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueStartDA)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueTurnOffDAMode)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); @@ -401,14 +406,14 @@ finalize_it: * rgerhards, 2008-01-14 */ static inline rsRetVal -queueChkStrtDA(queue_t *pThis) +qqueueChkStrtDA(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); /* if we do not hit the high water mark, we have nothing to do */ - if(queueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk) + if(qqueueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk) ABORT_FINALIZE(RS_RET_OK); if(pThis->bRunsDA) { @@ -422,15 +427,15 @@ queueChkStrtDA(queue_t *pThis) * we need at least one). */ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n", - queueGetOverallQueueSize(pThis)); - queueAdviseMaxWorkers(pThis); + qqueueGetOverallQueueSize(pThis)); + qqueueAdviseMaxWorkers(pThis); } else { /* this is the case when we are currently not running in DA mode. So it is time * to turn it back on. */ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n", - queueGetOverallQueueSize(pThis)); - queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ + qqueueGetOverallQueueSize(pThis)); + qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ } finalize_it: @@ -448,7 +453,7 @@ finalize_it: */ /* -------------------- fixed array -------------------- */ -static rsRetVal qConstructFixedArray(queue_t *pThis) +static rsRetVal qConstructFixedArray(qqueue_t *pThis) { DEFiRet; @@ -464,14 +469,14 @@ static rsRetVal qConstructFixedArray(queue_t *pThis) pThis->tVars.farray.head = 0; pThis->tVars.farray.tail = 0; - queueChkIsDA(pThis); + qqueueChkIsDA(pThis); finalize_it: RETiRet; } -static rsRetVal qDestructFixedArray(queue_t *pThis) +static rsRetVal qDestructFixedArray(qqueue_t *pThis) { DEFiRet; @@ -486,7 +491,7 @@ static rsRetVal qDestructFixedArray(queue_t *pThis) } -static rsRetVal qAddFixedArray(queue_t *pThis, void* in) +static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) { DEFiRet; @@ -499,7 +504,7 @@ static rsRetVal qAddFixedArray(queue_t *pThis, void* in) RETiRet; } -static rsRetVal qDelFixedArray(queue_t *pThis, void **out) +static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out) { DEFiRet; @@ -518,7 +523,7 @@ static rsRetVal qDelFixedArray(queue_t *pThis, void **out) /* first some generic functions which are also used for the unget linked list */ -static inline rsRetVal queueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr) +static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr) { DEFiRet; qLinkedList_t *pEntry; @@ -544,7 +549,7 @@ finalize_it: RETiRet; } -static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr) +static inline rsRetVal qqueueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr) { DEFiRet; qLinkedList_t *pEntry; @@ -571,7 +576,7 @@ static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t /* end generic functions which are also used for the unget linked list */ -static rsRetVal qConstructLinkedList(queue_t *pThis) +static rsRetVal qConstructLinkedList(qqueue_t *pThis) { DEFiRet; @@ -580,13 +585,13 @@ static rsRetVal qConstructLinkedList(queue_t *pThis) pThis->tVars.linklist.pRoot = 0; pThis->tVars.linklist.pLast = 0; - queueChkIsDA(pThis); + qqueueChkIsDA(pThis); RETiRet; } -static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis) +static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) { DEFiRet; @@ -599,11 +604,11 @@ static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis) RETiRet; } -static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr) +static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) { DEFiRet; - iRet = queueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr); + iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr); #if 0 qLinkedList_t *pEntry; @@ -627,10 +632,10 @@ finalize_it: RETiRet; } -static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr) +static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr) { DEFiRet; - iRet = queueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr); + iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr); #if 0 qLinkedList_t *pEntry; @@ -657,11 +662,11 @@ static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr) static rsRetVal -queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t __attribute__((unused)) *pThis) +qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pThis) { DEFiRet; ISOBJ_TYPE_assert(pStrm, strm); - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); CHKiRet(strmSetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); finalize_it: RETiRet; @@ -673,14 +678,14 @@ finalize_it: * rgerhards, 2008-01-15 */ static rsRetVal -queueHaveQIF(queue_t *pThis) +qqueueHaveQIF(qqueue_t *pThis) { DEFiRet; uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; struct stat stat_buf; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->pszFilePrefix == NULL) ABORT_FINALIZE(RS_RET_NO_FILEPREFIX); @@ -710,7 +715,7 @@ finalize_it: * rgerhards, 2008-01-11 */ static rsRetVal -queueTryLoadPersistedInfo(queue_t *pThis) +qqueueTryLoadPersistedInfo(qqueue_t *pThis) { DEFiRet; strm_t *psQIF = NULL; @@ -720,7 +725,7 @@ queueTryLoadPersistedInfo(queue_t *pThis) int iUngottenObjs; obj_t *pUsr; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", @@ -755,15 +760,15 @@ queueTryLoadPersistedInfo(queue_t *pThis) while(iUngottenObjs > 0) { /* fill the queue from disk */ CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL)); - queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); + qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); --iUngottenObjs; /* one less */ } /* and now the stream objects (some order as when persisted!) */ CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF, - (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis)); + (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF, - (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis)); + (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite)); CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead)); @@ -793,7 +798,7 @@ finalize_it: * allowed file size at this point - that should be a config setting... * rgerhards, 2008-01-10 */ -static rsRetVal qConstructDisk(queue_t *pThis) +static rsRetVal qConstructDisk(qqueue_t *pThis) { DEFiRet; int bRestarted = 0; @@ -801,7 +806,7 @@ static rsRetVal qConstructDisk(queue_t *pThis) ASSERT(pThis != NULL); /* and now check if there is some persistent information that needs to be read in */ - iRet = queueTryLoadPersistedInfo(pThis); + iRet = qqueueTryLoadPersistedInfo(pThis); if(iRet == RS_RET_OK) bRestarted = 1; else if(iRet != RS_RET_FILE_NOT_FOUND) @@ -843,7 +848,7 @@ finalize_it: } -static rsRetVal qDestructDisk(queue_t *pThis) +static rsRetVal qDestructDisk(qqueue_t *pThis) { DEFiRet; @@ -855,7 +860,7 @@ static rsRetVal qDestructDisk(queue_t *pThis) RETiRet; } -static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) +static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) { DEFiRet; number_t nWriteCount; @@ -882,7 +887,7 @@ finalize_it: RETiRet; } -static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) +static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr) { DEFiRet; @@ -913,18 +918,18 @@ finalize_it: } /* -------------------- direct (no queueing) -------------------- */ -static rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis) +static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis) { return RS_RET_OK; } -static rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis) +static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) { return RS_RET_OK; } -static rsRetVal qAddDirect(queue_t *pThis, void* pUsr) +static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) { DEFiRet; @@ -941,7 +946,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr) RETiRet; } -static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) +static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) { return RS_RET_OK; } @@ -956,12 +961,12 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__ * rgerhards, 2008-01-20 */ static rsRetVal -queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex) +qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15 The second time I noticed it the queue was in destruction with NO worker threads running. The pUsr ptr was totally off and provided no clue what it may be pointing @@ -970,7 +975,7 @@ queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex) dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr)); BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex); - iRet = queueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr); + iRet = qqueueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr); ++pThis->iUngottenObjs; /* indicate one more */ END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -986,14 +991,14 @@ queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex) * rgerhards, 2008-01-29 */ static rsRetVal -queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr) +qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(ppUsr != NULL); - iRet = queueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr); + iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr); --pThis->iUngottenObjs; /* indicate one less */ dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr)); @@ -1007,7 +1012,7 @@ queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr) * things truely different. -- rgerhards, 2008-02-12 */ static rsRetVal -queueAdd(queue_t *pThis, void *pUsr) +qqueueAdd(qqueue_t *pThis, void *pUsr) { DEFiRet; @@ -1030,7 +1035,7 @@ finalize_it: * ungotten list and, if so, dequeue it first. */ static rsRetVal -queueDel(queue_t *pThis, void *pUsr) +qqueueDel(qqueue_t *pThis, void *pUsr) { DEFiRet; @@ -1042,7 +1047,7 @@ queueDel(queue_t *pThis, void *pUsr) * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ if(pThis->iUngottenObjs > 0) { - iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr); + iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr); } else { iRet = pThis->qDel(pThis, pUsr); ATOMIC_DEC(pThis->iQueueSize); @@ -1066,14 +1071,14 @@ queueDel(queue_t *pThis, void *pUsr) * complex) if each would have its own shutdown. The function does not self check * this condition - the caller must make sure it is not called with a parent. */ -static rsRetVal queueShutdownWorkers(queue_t *pThis) +static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) { DEFiRet; DEFVARS_mutexProtection; struct timespec tTimeout; rsRetVal iRetLocal; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); @@ -1087,7 +1092,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* first try to shutdown the queue within the regular shutdown period */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(queueGetOverallQueueSize(pThis) > 0) { + if(qqueueGetOverallQueueSize(pThis) > 0) { if(pThis->bRunsDA) { /* We may have waited on the low water mark. As it may have changed, we * see if we reactivate the worker. @@ -1125,7 +1130,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) if(pThis->bRunsDA) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", - queueGetID(pThis->pqDA)); + qqueueGetID(pThis->pqDA)); /* we use the same absolute timeout as above, so we do not use more than the configured * timeout interval! */ @@ -1154,19 +1159,19 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ if(pThis->bRunsDA) - queueWaitDAModeInitialized(pThis); + qqueueWaitDAModeInitialized(pThis); BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ - if(pThis->bIsDA && queueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { + if(pThis->bIsDA && qqueueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { /* switch to enqueue-only mode so that no more actions happen */ if(pThis->bRunsDA == 0) { - queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ + qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ } else { /* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1) * but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27 */ - queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */ + qqueueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */ } END_MTX_PROTECTED_OPERATIONS(pThis->mut); /* make sure we do not timeout before we are done */ @@ -1188,7 +1193,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) * they will automatically terminate as there no longer is any message left to process. */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(queueGetOverallQueueSize(pThis) > 0) { + if(qqueueGetOverallQueueSize(pThis) > 0) { timeoutComp(&tTimeout, pThis->toActShutdown); if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -1257,7 +1262,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) * Well, more precisely, they *are in termination*. Some cancel cleanup handlers * may still be running. */ - dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", queueGetOverallQueueSize(pThis)); + dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", qqueueGetOverallQueueSize(pThis)); RETiRet; } @@ -1269,17 +1274,17 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) * is done by queueStart(). The reason is that we want to give the caller a chance * to modify some parameters before the queue is actually started. */ -rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, +rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)) { DEFiRet; - queue_t *pThis; + qqueue_t *pThis; ASSERT(ppThis != NULL); ASSERT(pConsumer != NULL); ASSERT(iWorkerThreads >= 0); - if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) { + if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } @@ -1316,7 +1321,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, pThis->qConstruct = qConstructLinkedList; pThis->qDestruct = qDestructLinkedList; pThis->qAdd = qAddLinkedList; - pThis->qDel = (rsRetVal (*)(queue_t*,void**)) qDelLinkedList; + pThis->qDel = (rsRetVal (*)(qqueue_t*,void**)) qDelLinkedList; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; @@ -1343,25 +1348,25 @@ finalize_it: /* cancellation cleanup handler for queueWorker () * Updates admin structure and frees ressources. * Params: - * arg1 - user pointer (in this case a queue_t) + * arg1 - user pointer (in this case a qqueue_t) * arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!]) * Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists! * rgerhards, 2008-01-16 */ static rsRetVal -queueConsumerCancelCleanup(void *arg1, void *arg2) +qqueueConsumerCancelCleanup(void *arg1, void *arg2) { DEFiRet; - queue_t *pThis = (queue_t*) arg1; + qqueue_t *pThis = (qqueue_t*) arg1; obj_t *pUsr = (obj_t*) arg2; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pUsr != NULL) { /* make sure the data element is not lost */ dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n"); - CHKiRet(queueUngetObj(pThis, pUsr, LOCK_MUTEX)); + CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX)); } finalize_it: @@ -1383,13 +1388,13 @@ finalize_it: * the return state! * rgerhards, 2008-01-24 */ -static int queueChkDiscardMsg(queue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr) +static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr) { DEFiRet; rsRetVal iRetLocal; int iSeverity; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_assert(pUsr); if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) { @@ -1414,7 +1419,7 @@ finalize_it: * rgerhards, 2008-10-21 */ static rsRetVal -queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; void *pUsr; @@ -1422,9 +1427,9 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) int bRunsDA; /* cache for early mutex release */ /* dequeue element (still protected from mutex) */ - iRet = queueDel(pThis, &pUsr); - queueChkPersist(pThis); - iQueueSize = queueGetOverallQueueSize(pThis); /* cache this for after mutex release */ + iRet = qqueueDel(pThis, &pUsr); + qqueueChkPersist(pThis); + iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */ bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ /* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY @@ -1475,7 +1480,7 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) * provide real-time creation of spool files. * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong. */ - CHKiRet(queueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr)); finalize_it: if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { @@ -1524,7 +1529,7 @@ finalize_it: * but you get the idea from the code above. */ static rsRetVal -queueRateLimiter(queue_t *pThis) +qqueueRateLimiter(qqueue_t *pThis) { DEFiRet; int iDelay; @@ -1532,7 +1537,7 @@ queueRateLimiter(queue_t *pThis) time_t tCurr; struct tm m; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); iDelay = 0; if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */ @@ -1587,14 +1592,14 @@ queueRateLimiter(queue_t *pThis) * rgerhards, 2008-01-21 */ static rsRetVal -queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave)); CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp)); /* we now need to check if we should deliberately delay processing a bit @@ -1621,15 +1626,15 @@ finalize_it: * rgerhards, 2008-01-14 */ static rsRetVal -queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(queueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp)); + CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp)); finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); @@ -1645,7 +1650,7 @@ finalize_it: * the DA queue */ static int -queueChkStopWrkrDA(queue_t *pThis) +qqueueChkStopWrkrDA(qqueue_t *pThis) { /* if our queue is in destruction, we drain to the DA queue and so we shall not terminate * until we have done so. @@ -1664,7 +1669,7 @@ queueChkStopWrkrDA(queue_t *pThis) && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { /* this queue can never grow, so we can give up... */ bStopWrkr = 1; - } else if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { + } else if(qqueueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { bStopWrkr = 1; } else { bStopWrkr = 0; @@ -1687,9 +1692,9 @@ queueChkStopWrkrDA(queue_t *pThis) * the DA queue */ static int -queueChkStopWrkrReg(queue_t *pThis) +qqueueChkStopWrkrReg(qqueue_t *pThis) { - return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && queueGetOverallQueueSize(pThis) == 0); + return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0); } @@ -1697,26 +1702,26 @@ queueChkStopWrkrReg(queue_t *pThis) * are not stable! DA queue version */ static int -queueIsIdleDA(queue_t *pThis) +qqueueIsIdleDA(qqueue_t *pThis) { /* remember: iQueueSize is the DA queue size, not the main queue! */ /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */ - return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); } /* must only be called when the queue mutex is locked, else results * are not stable! Regular queue version */ static int -queueIsIdleReg(queue_t *pThis) +qqueueIsIdleReg(qqueue_t *pThis) { #if 0 /* enable for performance testing */ int ret; - ret = queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk); + ret = qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk); if(ret) fprintf(stderr, "queue is idle\n"); return ret; #else /* regular code! */ - return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); #endif } @@ -1735,11 +1740,11 @@ queueIsIdleReg(queue_t *pThis) * I am telling this, because I, too, always get confused by those... */ static rsRetVal -queueRegOnWrkrShutdown(queue_t *pThis) +qqueueRegOnWrkrShutdown(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->pqParent != NULL) { pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ @@ -1756,11 +1761,11 @@ queueRegOnWrkrShutdown(queue_t *pThis) * hook to indicate in the parent queue (if we are a child) that we are not done yet. */ static rsRetVal -queueRegOnWrkrStartup(queue_t *pThis) +qqueueRegOnWrkrStartup(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->pqParent != NULL) { pThis->pqParent->bChildIsDone = 0; @@ -1773,7 +1778,7 @@ queueRegOnWrkrStartup(queue_t *pThis) /* start up the queue - it must have been constructed and parameters defined * before. */ -rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ +rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; rsRetVal iRetLocal; @@ -1811,7 +1816,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, - queueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1); + qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ @@ -1822,13 +1827,13 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); - CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))queueConsumerCancelCleanup)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrStartup)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrShutdown)); + CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); @@ -1841,10 +1846,10 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ /* If we are disk-assisted, we need to check if there is a QIF file * which we need to load. -- rgerhards, 2008-01-15 */ - iRetLocal = queueHaveQIF(pThis); + iRetLocal = qqueueHaveQIF(pThis); if(iRetLocal == RS_RET_OK) { dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n"); - queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ + qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ bInitialized = 1; /* we are done */ } else { /* TODO: use logerror? -- rgerhards, 2008-01-16 */ @@ -1861,7 +1866,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ /* if the queue already contains data, we need to start the correct number of worker threads. This can be * the case when a disk queue has been loaded. If we did not start it here, it would never start. */ - queueAdviseMaxWorkers(pThis); + qqueueAdviseMaxWorkers(pThis); pThis->bQueueStarted = 1; finalize_it: @@ -1876,7 +1881,7 @@ finalize_it: * and 0 otherwise. * rgerhards, 2008-01-10 */ -static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) +static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) { DEFiRet; strm_t *psQIF = NULL; /* Queue Info File */ @@ -1887,7 +1892,7 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) ASSERT(pThis != NULL); if(pThis->qType != QUEUETYPE_DISK) { - if(queueGetOverallQueueSize(pThis) > 0) { + if(qqueueGetOverallQueueSize(pThis) > 0) { /* This error code is OK, but we will probably not implement this any time * The reason is that persistence happens via DA queues. But I would like to * leave the code as is, as we so have a hook in case we need one. @@ -1898,13 +1903,13 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) FINALIZE; /* if the queue is empty, we are happy and done... */ } - dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", queueGetOverallQueueSize(pThis)); + dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", qqueueGetOverallQueueSize(pThis)); /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - if((bIsCheckpoint != QUEUE_CHECKPOINT) && (queueGetOverallQueueSize(pThis) == 0)) { + if((bIsCheckpoint != QUEUE_CHECKPOINT) && (qqueueGetOverallQueueSize(pThis) == 0)) { if(pThis->bNeedDelQIF) { unlink((char*)pszQIFNam); pThis->bNeedDelQIF = 0; @@ -1938,7 +1943,7 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) * to the regular files. -- rgerhards, 2008-01-29 */ while(pThis->iUngottenObjs > 0) { - CHKiRet(queueGetUngottenObj(pThis, &pUsr)); + CHKiRet(qqueueGetUngottenObj(pThis, &pUsr)); CHKiRet((objSerialize(pUsr))(pUsr, psQIF)); objDestruct(pUsr); } @@ -1972,14 +1977,14 @@ finalize_it: * abide to our regular call interface)... * rgerhards, 2008-01-13 */ -rsRetVal queueChkPersist(queue_t *pThis) +rsRetVal qqueueChkPersist(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) { - queuePersist(pThis, QUEUE_CHECKPOINT); + qqueuePersist(pThis, QUEUE_CHECKPOINT); pThis->iUpdsSincePersist = 0; } @@ -1988,8 +1993,8 @@ rsRetVal queueChkPersist(queue_t *pThis) /* destructor for the queue object */ -BEGINobjDestruct(queue) /* be sure to specify the object type also in END and CODESTART macros! */ -CODESTARTobjDestruct(queue) +BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */ +CODESTARTobjDestruct(qqueue) pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ /* shut down all workers (handles *all* of the persistence logic) @@ -1999,7 +2004,7 @@ CODESTARTobjDestruct(queue) * with a child! -- rgerhards, 2008-01-28 */ if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL) - queueShutdownWorkers(pThis); + qqueueShutdownWorkers(pThis); /* finally destruct our (regular) worker thread pool * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen, @@ -2024,7 +2029,7 @@ CODESTARTobjDestruct(queue) wtpDestruct(&pThis->pWtpDA); } if(pThis->pqDA != NULL) { - queueDestruct(&pThis->pqDA); + qqueueDestruct(&pThis->pqDA); } /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) @@ -2034,7 +2039,7 @@ CODESTARTobjDestruct(queue) * disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here * if need arises (what I doubt...) -- rgerhards, 2008-01-25 */ - CHKiRet_Hdlr(queuePersist(pThis, QUEUE_NO_CHECKPOINT)) { + CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) { dbgoprint((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet); } @@ -2059,7 +2064,7 @@ CODESTARTobjDestruct(queue) if(pThis->pszSpoolDir != NULL) free(pThis->pszSpoolDir); -ENDobjDestruct(queue) +ENDobjDestruct(qqueue) /* set the queue's file prefix @@ -2068,7 +2073,7 @@ ENDobjDestruct(queue) * rgerhards, 2008-01-09 */ rsRetVal -queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) +qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) { DEFiRet; @@ -2091,11 +2096,11 @@ finalize_it: * rgerhards, 2008-01-09 */ rsRetVal -queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize) +qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(iMaxFileSize < 1024) { ABORT_FINALIZE(RS_RET_VALUE_TOO_LOW); @@ -2112,13 +2117,13 @@ finalize_it: * Enqueues the new element and awakes worker thread. */ rsRetVal -queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) +qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) { DEFiRet; int iCancelStateSave; struct timespec t; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); /* first check if we need to discard this message (which will cause CHKiRet() to exit) * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize @@ -2127,7 +2132,7 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) * threading tools may point this access to be an error, but this is done * intentional. I do not see this causes problems to us. */ - CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE @@ -2142,7 +2147,7 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) - CHKiRet(queueChkStrtDA(pThis)); + CHKiRet(qqueueChkStrtDA(pThis)); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. @@ -2195,13 +2200,13 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) } /* and finally enqueue the message */ - CHKiRet(queueAdd(pThis, pUsr)); - queueChkPersist(pThis); + CHKiRet(qqueueAdd(pThis, pUsr)); + qqueueChkPersist(pThis); finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { /* make sure at least one worker is running. */ - queueAdviseMaxWorkers(pThis); + qqueueAdviseMaxWorkers(pThis); /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); @@ -2228,12 +2233,12 @@ finalize_it: * rgerhards, 2008-01-16 */ static rsRetVal -queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex) +qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); /* for simplicity, we do one big mutex lock. This method is extremely seldom * called, so that doesn't matter... -- rgerhards, 2008-01-16 @@ -2272,24 +2277,24 @@ finalize_it: /* some simple object access methods */ -DEFpropSetMeth(queue, iPersistUpdCnt, int) -DEFpropSetMeth(queue, iDeqtWinFromHr, int) -DEFpropSetMeth(queue, iDeqtWinToHr, int) -DEFpropSetMeth(queue, toQShutdown, long) -DEFpropSetMeth(queue, toActShutdown, long) -DEFpropSetMeth(queue, toWrkShutdown, long) -DEFpropSetMeth(queue, toEnq, long) -DEFpropSetMeth(queue, iHighWtrMrk, int) -DEFpropSetMeth(queue, iLowWtrMrk, int) -DEFpropSetMeth(queue, iDiscardMrk, int) -DEFpropSetMeth(queue, iFullDlyMrk, int) -DEFpropSetMeth(queue, iDiscardSeverity, int) -DEFpropSetMeth(queue, bIsDA, int) -DEFpropSetMeth(queue, iMinMsgsPerWrkr, int) -DEFpropSetMeth(queue, bSaveOnShutdown, int) -DEFpropSetMeth(queue, pUsr, void*) -DEFpropSetMeth(queue, iDeqSlowdown, int) -DEFpropSetMeth(queue, sizeOnDiskMax, int64) +DEFpropSetMeth(qqueue, iPersistUpdCnt, int) +DEFpropSetMeth(qqueue, iDeqtWinFromHr, int) +DEFpropSetMeth(qqueue, iDeqtWinToHr, int) +DEFpropSetMeth(qqueue, toQShutdown, long) +DEFpropSetMeth(qqueue, toActShutdown, long) +DEFpropSetMeth(qqueue, toWrkShutdown, long) +DEFpropSetMeth(qqueue, toEnq, long) +DEFpropSetMeth(qqueue, iHighWtrMrk, int) +DEFpropSetMeth(qqueue, iLowWtrMrk, int) +DEFpropSetMeth(qqueue, iDiscardMrk, int) +DEFpropSetMeth(qqueue, iFullDlyMrk, int) +DEFpropSetMeth(qqueue, iDiscardSeverity, int) +DEFpropSetMeth(qqueue, bIsDA, int) +DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) +DEFpropSetMeth(qqueue, bSaveOnShutdown, int) +DEFpropSetMeth(qqueue, pUsr, void*) +DEFpropSetMeth(qqueue, iDeqSlowdown, int) +DEFpropSetMeth(qqueue, sizeOnDiskMax, int64) /* This function can be used as a generic way to set properties. Only the subset @@ -2298,11 +2303,11 @@ DEFpropSetMeth(queue, sizeOnDiskMax, int64) * rgerhards, 2008-01-11 */ #define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1) -static rsRetVal queueSetProperty(queue_t *pThis, var_t *pProp) +static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pProp != NULL); if(isProp("iQueueSize")) { @@ -2324,19 +2329,19 @@ finalize_it: #undef isProp /* dummy */ -rsRetVal queueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; } +rsRetVal qqueueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; } /* Initialize the stream class. Must be called as the very first method * before anything else is called inside this class. * rgerhards, 2008-01-09 */ -BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE) +BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); /* now set our own handlers */ - OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty); -ENDObjClassInit(queue) + OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty); +ENDObjClassInit(qqueue) /* vi:set ai: */ diff --git a/runtime/queue.h b/runtime/queue.h index a2dd594f..a267862d 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -160,7 +160,7 @@ typedef struct queue_s { strm_t *pRead; /* current file to be read */ } disk; } tVars; -} queue_t; +} qqueue_t; /* some symbolic constants for easier reference */ #define QUEUE_MODE_ENQDEQ 0 @@ -177,30 +177,30 @@ typedef struct queue_s { #define QUEUE_TIMEOUT_ETERNAL 24 * 60 * 60 * 1000 /* prototypes */ -rsRetVal queueDestruct(queue_t **ppThis); -rsRetVal queueEnqObj(queue_t *pThis, flowControl_t flwCtlType, void *pUsr); -rsRetVal queueStart(queue_t *pThis); -rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize); -rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); -rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, +rsRetVal qqueueDestruct(qqueue_t **ppThis); +rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr); +rsRetVal qqueueStart(qqueue_t *pThis); +rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); +rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); +rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)); -PROTOTYPEObjClassInit(queue); -PROTOTYPEpropSetMeth(queue, iPersistUpdCnt, int); -PROTOTYPEpropSetMeth(queue, iDeqtWinFromHr, int); -PROTOTYPEpropSetMeth(queue, iDeqtWinToHr, int); -PROTOTYPEpropSetMeth(queue, toQShutdown, long); -PROTOTYPEpropSetMeth(queue, toActShutdown, long); -PROTOTYPEpropSetMeth(queue, toWrkShutdown, long); -PROTOTYPEpropSetMeth(queue, toEnq, long); -PROTOTYPEpropSetMeth(queue, iHighWtrMrk, int); -PROTOTYPEpropSetMeth(queue, iLowWtrMrk, int); -PROTOTYPEpropSetMeth(queue, iDiscardMrk, int); -PROTOTYPEpropSetMeth(queue, iDiscardSeverity, int); -PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int); -PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int); -PROTOTYPEpropSetMeth(queue, pUsr, void*); -PROTOTYPEpropSetMeth(queue, iDeqSlowdown, int); -PROTOTYPEpropSetMeth(queue, sizeOnDiskMax, int64); -#define queueGetID(pThis) ((unsigned long) pThis) +PROTOTYPEObjClassInit(qqueue); +PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); +PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int); +PROTOTYPEpropSetMeth(qqueue, iDeqtWinToHr, int); +PROTOTYPEpropSetMeth(qqueue, toQShutdown, long); +PROTOTYPEpropSetMeth(qqueue, toActShutdown, long); +PROTOTYPEpropSetMeth(qqueue, toWrkShutdown, long); +PROTOTYPEpropSetMeth(qqueue, toEnq, long); +PROTOTYPEpropSetMeth(qqueue, iHighWtrMrk, int); +PROTOTYPEpropSetMeth(qqueue, iLowWtrMrk, int); +PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int); +PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int); +PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int); +PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int); +PROTOTYPEpropSetMeth(qqueue, pUsr, void*); +PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int); +PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64); +#define qqueueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c index 54db12c2..8df100a1 100644 --- a/runtime/rsyslog.c +++ b/runtime/rsyslog.c @@ -157,7 +157,7 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF) if(ppErrObj != NULL) *ppErrObj = "wtp"; CHKiRet(wtpClassInit(NULL)); if(ppErrObj != NULL) *ppErrObj = "queue"; - CHKiRet(queueClassInit(NULL)); + CHKiRet(qqueueClassInit(NULL)); if(ppErrObj != NULL) *ppErrObj = "vmstk"; CHKiRet(vmstkClassInit(NULL)); if(ppErrObj != NULL) *ppErrObj = "sysvar"; diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 00290ee5..899f5e13 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -254,6 +254,10 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_INVLD_TIME = -2107, /**< invalid timestamp (e.g. could not be parsed) */ RS_RET_NO_ZIP = -2108, /**< ZIP functionality is not present */ RS_RET_CODE_ERR = -2109, /**< program code (internal) error */ + RS_RET_FUNC_NO_LPAREN = -2110, /**< left parenthesis missing after function call (rainerscript) */ + RS_RET_FUNC_MISSING_EXPR = -2111, /**< no expression after comma in function call (rainerscript) */ + RS_RET_INVLD_NBR_ARGUMENTS = -2112, /**< invalid number of arguments for function call (rainerscript) */ + RS_RET_INVLD_FUNC = -2113, /**< invalid function name for function call (rainerscript) */ /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ diff --git a/runtime/srutils.c b/runtime/srutils.c index 1280e40d..d01ca20d 100644 --- a/runtime/srutils.c +++ b/runtime/srutils.c @@ -458,7 +458,7 @@ srSleep(int iSeconds, int iuSeconds) * Added 2008-01-30 */ char *rs_strerror_r(int errnum, char *buf, size_t buflen) { -#ifdef __hpux +#ifndef HAVE_STRERROR_R char *pszErr; pszErr = strerror(errnum); snprintf(buf, buflen, "%s", pszErr); diff --git a/runtime/vm.c b/runtime/vm.c index bc6c3dd2..a25476c2 100644 --- a/runtime/vm.c +++ b/runtime/vm.c @@ -331,6 +331,34 @@ finalize_it: ENDop(PUSHSYSVAR) +/* The function call operation is only very roughly implemented. While the plumbing + * to reach this instruction is fine, the instruction itself currently supports only + * functions with a single argument AND with a name that we know. + * TODO: later, we can add here the real logic, that involves looking up function + * names, loading them dynamically ... and all that... + * implementation begun 2009-03-10 by rgerhards + */ +BEGINop(FUNC_CALL) /* remember to set the instruction also in the ENDop macro! */ + var_t *numOperands; + var_t *operand1; + int iStrlen; +CODESTARTop(FUNC_CALL) + vmstk.PopNumber(pThis->pStk, &numOperands); + if(numOperands->val.num != 1) + ABORT_FINALIZE(RS_RET_INVLD_NBR_ARGUMENTS); + vmstk.PopString(pThis->pStk, &operand1); /* guess there's just one ;) */ + if(!rsCStrSzStrCmp(pOp->operand.pVar->val.pStr, (uchar*) "strlen", 6)) { /* only one supported so far ;) */ +RUNLOG_VAR("%s", rsCStrGetSzStr(operand1->val.pStr)); + iStrlen = strlen((char*) rsCStrGetSzStr(operand1->val.pStr)); +RUNLOG_VAR("%d", iStrlen); + } else + ABORT_FINALIZE(RS_RET_INVLD_FUNC); + PUSHRESULTop(operand1, iStrlen); // TODO: dummy, FIXME + var.Destruct(&numOperands); /* no longer needed */ +finalize_it: +ENDop(FUNC_CALL) + + /* ------------------------------ end instruction set implementation ------------------------------ */ @@ -412,6 +440,7 @@ execProg(vm_t *pThis, vmprg_t *pProg) doOP(DIV); doOP(MOD); doOP(UNARY_MINUS); + doOP(FUNC_CALL); default: ABORT_FINALIZE(RS_RET_INVALID_VMOP); dbgoprint((obj_t*) pThis, "invalid instruction %d in vmprg\n", pCurrOp->opcode); diff --git a/runtime/vmop.c b/runtime/vmop.c index fcacb15b..a343481e 100644 --- a/runtime/vmop.c +++ b/runtime/vmop.c @@ -61,24 +61,25 @@ rsRetVal vmopConstructFinalize(vmop_t __attribute__((unused)) *pThis) /* destructor for the vmop object */ BEGINobjDestruct(vmop) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(vmop) - if( pThis->opcode == opcode_PUSHSYSVAR - || pThis->opcode == opcode_PUSHMSGVAR - || pThis->opcode == opcode_PUSHCONSTANT) { - if(pThis->operand.pVar != NULL) - var.Destruct(&pThis->operand.pVar); - } + if(pThis->operand.pVar != NULL) + var.Destruct(&pThis->operand.pVar); ENDobjDestruct(vmop) /* DebugPrint support for the vmop object */ BEGINobjDebugPrint(vmop) /* be sure to specify the object type also in END and CODESTART macros! */ uchar *pOpcodeName; + cstr_t *pStrVar; CODESTARTobjDebugPrint(vmop) vmopOpcode2Str(pThis, &pOpcodeName); - dbgoprint((obj_t*) pThis, "opcode: %d\t(%s), next %p, var in next line\n", (int) pThis->opcode, pOpcodeName, - pThis->pNext); - if(pThis->operand.pVar != NULL) - var.DebugPrint(pThis->operand.pVar); + CHKiRet(rsCStrConstruct(&pStrVar)); + CHKiRet(rsCStrFinish(&pStrVar)); + if(pThis->operand.pVar != NULL) { + CHKiRet(var.Obj2Str(pThis->operand.pVar, pStrVar)); + } + dbgoprint((obj_t*) pThis, "%.12s\t%s\n", pOpcodeName, rsCStrGetSzStrNoNULL(pStrVar)); + rsCStrDestruct(&pStrVar); +finalize_it: ENDobjDebugPrint(vmop) @@ -158,37 +159,37 @@ vmopOpcode2Str(vmop_t *pThis, uchar **ppName) *ppName = (uchar*) "and"; break; case opcode_PLUS: - *ppName = (uchar*) "+"; + *ppName = (uchar*) "add"; break; case opcode_MINUS: - *ppName = (uchar*) "-"; + *ppName = (uchar*) "sub"; break; case opcode_TIMES: - *ppName = (uchar*) "*"; + *ppName = (uchar*) "mul"; break; case opcode_DIV: - *ppName = (uchar*) "/"; + *ppName = (uchar*) "div"; break; case opcode_MOD: - *ppName = (uchar*) "%"; + *ppName = (uchar*) "mod"; break; case opcode_NOT: *ppName = (uchar*) "not"; break; case opcode_CMP_EQ: - *ppName = (uchar*) "=="; + *ppName = (uchar*) "cmp_=="; break; case opcode_CMP_NEQ: - *ppName = (uchar*) "!="; + *ppName = (uchar*) "cmp_!="; break; case opcode_CMP_LT: - *ppName = (uchar*) "<"; + *ppName = (uchar*) "cmp_<"; break; case opcode_CMP_GT: - *ppName = (uchar*) ">"; + *ppName = (uchar*) "cmp_>"; break; case opcode_CMP_LTEQ: - *ppName = (uchar*) "<="; + *ppName = (uchar*) "cmp_<="; break; case opcode_CMP_CONTAINS: *ppName = (uchar*) "contains"; @@ -197,28 +198,31 @@ vmopOpcode2Str(vmop_t *pThis, uchar **ppName) *ppName = (uchar*) "startswith"; break; case opcode_CMP_GTEQ: - *ppName = (uchar*) ">="; + *ppName = (uchar*) "cmp_>="; break; case opcode_PUSHSYSVAR: - *ppName = (uchar*) "PUSHSYSVAR"; + *ppName = (uchar*) "push_sysvar"; break; case opcode_PUSHMSGVAR: - *ppName = (uchar*) "PUSHMSGVAR"; + *ppName = (uchar*) "push_msgvar"; break; case opcode_PUSHCONSTANT: - *ppName = (uchar*) "PUSHCONSTANT"; + *ppName = (uchar*) "push_const"; break; case opcode_POP: - *ppName = (uchar*) "POP"; + *ppName = (uchar*) "pop"; break; case opcode_UNARY_MINUS: - *ppName = (uchar*) "UNARY_MINUS"; + *ppName = (uchar*) "unary_minus"; break; case opcode_STRADD: - *ppName = (uchar*) "STRADD"; + *ppName = (uchar*) "strconcat"; + break; + case opcode_FUNC_CALL: + *ppName = (uchar*) "func_call"; break; default: - *ppName = (uchar*) "INVALID opcode"; + *ppName = (uchar*) "!invalid_opcode!"; break; } diff --git a/runtime/vmop.h b/runtime/vmop.h index c3d5d5f4..938b08fd 100644 --- a/runtime/vmop.h +++ b/runtime/vmop.h @@ -59,17 +59,44 @@ typedef enum { /* do NOT start at 0 to detect uninitialized types after calloc( opcode_PUSHMSGVAR = 1002, /* requires var operand */ opcode_PUSHCONSTANT = 1003, /* requires var operand */ opcode_UNARY_MINUS = 1010, - opcode_END_PROG = 1011 + opcode_FUNC_CALL = 1012, + opcode_END_PROG = 2000 } opcode_t; +/* Additional doc, operation specific + + FUNC_CALL + All parameter passing is via the stack. Parameters are placed onto the stack in reverse order, + that means the last parameter is on top of the stack, the first at the bottom location. + At the actual top of the stack is the number of parameters. This permits functions to be + called with variable number of arguments. The function itself is responsible for poping + the right number of parameters of the stack and complaining if the number is incorrect. + On exit, a single return value must be pushed onto the stack. The FUNC_CALL operation + is generic. Its pVar argument contains the function name string (TODO: very slow, make + faster in later releases). + + Sample Function call: sampleFunc(p1, p2, p3) ; returns number 4711 (sample) + Stacklayout on entry (order is top to bottom): + 3 + p3 + p2 + p1 + ... other vars ... + + Stack on exit + 4711 + ... other vars ... + + */ + + /* the vmop object */ typedef struct vmop_s { BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ opcode_t opcode; union { - var_t *pVar; - /* TODO: add function pointer */ + var_t *pVar; /* for function call, this is the name (string) of function to be called */ } operand; struct vmop_s *pNext; /* next operation or NULL, if end of program (logically this belongs to vmprg) */ } vmop_t; diff --git a/runtime/vmprg.c b/runtime/vmprg.c index 705e6948..75915025 100644 --- a/runtime/vmprg.c +++ b/runtime/vmprg.c @@ -74,7 +74,7 @@ ENDobjDestruct(vmprg) BEGINobjDebugPrint(vmprg) /* be sure to specify the object type also in END and CODESTART macros! */ vmop_t *pOp; CODESTARTobjDebugPrint(vmprg) - dbgoprint((obj_t*) pThis, "program contents:\n"); + dbgoprint((obj_t*) pThis, "VM Program:\n"); for(pOp = pThis->vmopRoot ; pOp != NULL ; pOp = pOp->pNext) { vmop.DebugPrint(pOp); } diff --git a/runtime/wti.c b/runtime/wti.c index e8a77474..9b3450e6 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -39,6 +39,11 @@ #include <pthread.h> #include <errno.h> +#ifdef OS_SOLARIS +# include <sched.h> +# define pthread_yield() sched_yield() +#endif + #include "rsyslog.h" #include "stringbuf.h" #include "srUtils.h" diff --git a/runtime/wtp.c b/runtime/wtp.c index 06173e04..41903576 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -40,6 +40,11 @@ #include <unistd.h> #include <errno.h> +#ifdef OS_SOLARIS +# include <sched.h> +# define pthread_yield() sched_yield() +#endif + #include "rsyslog.h" #include "stringbuf.h" #include "srUtils.h" diff --git a/tests/1.rstest b/tests/1.rstest index 5c152589..4716e8b3 100644 --- a/tests/1.rstest +++ b/tests/1.rstest @@ -4,23 +4,23 @@ in: 'test 1' <> $var or /* some comment */($SEVERITY == -4 +5 -(3 * - 2) and $fromhost == '127.0.0.1') then $$$ out: -00000000: PUSHCONSTANT test 1[cstr] -00000001: PUSHMSGVAR var[cstr] -00000002: != -00000003: PUSHMSGVAR severity[cstr] -00000004: PUSHCONSTANT 4[nbr] -00000005: UNARY_MINUS -00000006: PUSHCONSTANT 5[nbr] -00000007: + -00000008: PUSHCONSTANT 3[nbr] -00000009: PUSHCONSTANT 2[nbr] -00000010: UNARY_MINUS -00000011: * -00000012: - -00000013: == -00000014: PUSHMSGVAR fromhost[cstr] -00000015: PUSHCONSTANT 127.0.0.1[cstr] -00000016: == +00000000: push_const test 1[cstr] +00000001: push_msgvar var[cstr] +00000002: cmp_!= +00000003: push_msgvar severity[cstr] +00000004: push_const 4[nbr] +00000005: unary_minus +00000006: push_const 5[nbr] +00000007: add +00000008: push_const 3[nbr] +00000009: push_const 2[nbr] +00000010: unary_minus +00000011: mul +00000012: sub +00000013: cmp_== +00000014: push_msgvar fromhost[cstr] +00000015: push_const 127.0.0.1[cstr] +00000016: cmp_== 00000017: and 00000018: or $$$ diff --git a/tests/2.rstest b/tests/2.rstest index 7fb5b799..f0e8205b 100644 --- a/tests/2.rstest +++ b/tests/2.rstest @@ -4,7 +4,7 @@ in: $msg contains 'test' then $$$ out: -00000000: PUSHMSGVAR msg[cstr] -00000001: PUSHCONSTANT test[cstr] +00000000: push_msgvar msg[cstr] +00000001: push_const test[cstr] 00000002: contains $$$ diff --git a/tests/3.rstest b/tests/3.rstest new file mode 100644 index 00000000..93cb941a --- /dev/null +++ b/tests/3.rstest @@ -0,0 +1,21 @@ +# a simple RainerScript test +result: 0 +in: +strlen($msg & strlen('abc')) > 20 +30 + -40 then +$$$ +out: +00000000: push_msgvar msg[cstr] +00000001: push_const abc[cstr] +00000002: push_const 1[nbr] +00000003: func_call strlen[cstr] +00000004: strconcat +00000005: push_const 1[nbr] +00000006: func_call strlen[cstr] +00000007: push_const 20[nbr] +00000008: push_const 30[nbr] +00000009: add +00000010: push_const 40[nbr] +00000011: unary_minus +00000012: add +00000013: cmp_> +$$$ diff --git a/tests/Makefile.am b/tests/Makefile.am index 14e7c195..7a31be45 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -4,7 +4,7 @@ TESTS_ENVIRONMENT = RSYSLOG_MODDIR='$(abs_top_builddir)'/runtime/.libs/ #TESTS = $(check_PROGRAMS) test_files = testbench.h runtime-dummy.c -EXTRA_DIST=1.rstest 2.rstest err1.rstest \ +EXTRA_DIST=1.rstest 2.rstest 3.rstest err1.rstest \ cfg.sh \ cfg1.cfgtest \ cfg1.testin \ diff --git a/tests/rscript.c b/tests/rscript.c index d4e8caeb..3eec9c3c 100644 --- a/tests/rscript.c +++ b/tests/rscript.c @@ -101,9 +101,10 @@ PerformTest(cstr_t *pstrIn, rsRetVal iRetExpected, cstr_t *pstrOut) CHKiRet(vmprg.Obj2Str(pExpr->pVmprg, pstrPrg)); if(strcmp((char*)rsCStrGetSzStr(pstrPrg), (char*)rsCStrGetSzStr(pstrOut))) { + int iLen; printf("error: compiled program different from expected result!\n"); - printf("generated vmprg:\n%s\n", rsCStrGetSzStr(pstrPrg)); - printf("expected:\n%s\n", rsCStrGetSzStr(pstrOut)); + printf("generated vmprg (%d bytes):\n%s\n", strlen(rsCStrGetSzStr(pstrPrg)), rsCStrGetSzStr(pstrPrg)); + printf("expected (%d bytes):\n%s\n", strlen(rsCStrGetSzStr(pstrOut)), rsCStrGetSzStr(pstrOut)); ABORT_FINALIZE(RS_RET_ERR); } diff --git a/tools/Makefile.am b/tools/Makefile.am index 776279a5..e523b854 100644 --- a/tools/Makefile.am +++ b/tools/Makefile.am @@ -22,7 +22,7 @@ rsyslogd_SOURCES = \ \ ../dirty.h rsyslogd_CPPFLAGS = $(PTHREADS_CFLAGS) $(RSRT_CFLAGS) -rsyslogd_LDADD = $(ZLIB_LIBS) $(PTHREADS_LIBS) $(RSRT_LIBS) +rsyslogd_LDADD = $(ZLIB_LIBS) $(PTHREADS_LIBS) $(RSRT_LIBS) $(SOL_LIBS) rsyslogd_LDFLAGS = -export-dynamic if ENABLE_DIAGTOOLS diff --git a/tools/omfile.c b/tools/omfile.c index ff4c4618..65306846 100644 --- a/tools/omfile.c +++ b/tools/omfile.c @@ -44,6 +44,10 @@ #include <unistd.h> #include <sys/file.h> +#ifdef OS_SOLARIS +# include <fcntl.h> +#endif + #include "syslogd.h" #include "syslogd-types.h" #include "srUtils.h" diff --git a/tools/syslogd.c b/tools/syslogd.c index eb496521..235bc52e 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -72,13 +72,18 @@ #include <stdarg.h> #include <time.h> #include <assert.h> -#include <libgen.h> -#ifdef __sun +#ifdef OS_SOLARIS # include <errno.h> +# include <fcntl.h> +# include <stropts.h> +# include <sys/termios.h> +# include <sys/types.h> #else +# include <libgen.h> # include <sys/errno.h> #endif + #include <sys/ioctl.h> #include <sys/wait.h> #include <sys/file.h> @@ -279,7 +284,7 @@ static int gidDropPriv = 0; /* group-id to which priveleges should be dropped to extern int errno; /* main message queue and its configuration parameters */ -static queue_t *pMsgQueue = NULL; /* the main message queue */ +static qqueue_t *pMsgQueue = NULL; /* the main message queue */ static int iMainMsgQueueSize = 10000; /* size of the main message queue above */ static int iMainMsgQHighWtrMark = 8000; /* high water mark for disk-assisted queues */ static int iMainMsgQLowWtrMark = 2000; /* low water mark for disk-assisted queues */ @@ -1622,7 +1627,7 @@ submitMsg(msg_t *pMsg) ISOBJ_TYPE_assert(pMsg, msg); MsgPrepareEnqueue(pMsg); - queueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg); + qqueueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg); RETiRet; } @@ -1683,7 +1688,7 @@ logmsg(msg_t *pMsg, int flags) /* now submit the message to the main queue - then we are done */ pMsg->msgFlags = flags; MsgPrepareEnqueue(pMsg); - queueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg); + qqueueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg); ENDfunc } @@ -1981,7 +1986,7 @@ die(int sig) /* drain queue (if configured so) and stop main queue worker thread pool */ dbgprintf("Terminating main queue...\n"); - queueDestruct(&pMsgQueue); + qqueueDestruct(&pMsgQueue); pMsgQueue = NULL; /* Free ressources and close connections. This includes flushing any remaining @@ -2271,8 +2276,8 @@ static void dbgPrintInitInfo(void) static int iMainMsgQtoWrkMinMsgs = 100; static int iMainMsgQbSaveOnShutdown = 1; iMainMsgQueMaxDiskSpace = 0; - setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100); - setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100); + setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1); */ dbgprintf("Work Directory: '%s'.\n", glbl.GetWorkDir()); } @@ -2334,7 +2339,7 @@ init(void) /* delete the message queue, which also flushes all messages left over */ if(pMsgQueue != NULL) { dbgprintf("deleting main message queue\n"); - queueDestruct(&pMsgQueue); /* delete pThis here! */ + qqueueDestruct(&pMsgQueue); /* delete pThis here! */ pMsgQueue = NULL; } @@ -2446,7 +2451,7 @@ init(void) } /* create message queue */ - CHKiRet_Hdlr(queueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueNumWorkers, iMainMsgQueueSize, msgConsumer)) { + CHKiRet_Hdlr(qqueueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueNumWorkers, iMainMsgQueueSize, msgConsumer)) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet); exit(1); @@ -2464,29 +2469,29 @@ init(void) errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ } - setQPROP(queueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize); - setQPROP(queueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace); - setQPROPstr(queueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName); - setQPROP(queueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt); - setQPROP(queueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown ); - setQPROP(queueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown); - setQPROP(queueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", iMainMsgQtoWrkShutdown); - setQPROP(queueSettoEnq, "$MainMsgQueueTimeoutEnqueue", iMainMsgQtoEnq); - setQPROP(queueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", iMainMsgQHighWtrMark); - setQPROP(queueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", iMainMsgQLowWtrMark); - setQPROP(queueSetiDiscardMrk, "$MainMsgQueueDiscardMark", iMainMsgQDiscardMark); - setQPROP(queueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity); - setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs); - setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown); - setQPROP(queueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown); - setQPROP(queueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", iMainMsgQueueDeqtWinFromHr); - setQPROP(queueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", iMainMsgQueueDeqtWinToHr); + setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize); + setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace); + setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName); + setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt); + setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown ); + setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown); + setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", iMainMsgQtoWrkShutdown); + setQPROP(qqueueSettoEnq, "$MainMsgQueueTimeoutEnqueue", iMainMsgQtoEnq); + setQPROP(qqueueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", iMainMsgQHighWtrMark); + setQPROP(qqueueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", iMainMsgQLowWtrMark); + setQPROP(qqueueSetiDiscardMrk, "$MainMsgQueueDiscardMark", iMainMsgQDiscardMark); + setQPROP(qqueueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs); + setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown); + setQPROP(qqueueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown); + setQPROP(qqueueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", iMainMsgQueueDeqtWinFromHr); + setQPROP(qqueueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", iMainMsgQueueDeqtWinToHr); # undef setQPROP # undef setQPROPstr /* ... and finally start the queue! */ - CHKiRet_Hdlr(queueStart(pMsgQueue)) { + CHKiRet_Hdlr(qqueueStart(pMsgQueue)) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not start message queue - rsyslogd can not run!\n", iRet); exit(1); @@ -3095,7 +3100,7 @@ GlobalClassExit(void) CHKiRet(strmClassInit(NULL)); CHKiRet(wtiClassInit(NULL)); CHKiRet(wtpClassInit(NULL)); - CHKiRet(queueClassInit(NULL)); + CHKiRet(qqueueClassInit(NULL)); CHKiRet(vmstkClassInit(NULL)); CHKiRet(sysvarClassInit(NULL)); CHKiRet(vmClassInit(NULL)); |