summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog7
-rw-r--r--action.c44
-rw-r--r--action.h2
-rw-r--r--configure.ac12
-rw-r--r--runtime/conf.c11
-rw-r--r--runtime/ctok.c17
-rw-r--r--runtime/expr.c63
-rw-r--r--runtime/modules.c4
-rw-r--r--runtime/net.c5
-rw-r--r--runtime/queue.c425
-rw-r--r--runtime/queue.h50
-rw-r--r--runtime/rsyslog.c2
-rw-r--r--runtime/rsyslog.h4
-rw-r--r--runtime/srutils.c2
-rw-r--r--runtime/vm.c29
-rw-r--r--runtime/vmop.c60
-rw-r--r--runtime/vmop.h33
-rw-r--r--runtime/vmprg.c2
-rw-r--r--runtime/wti.c5
-rw-r--r--runtime/wtp.c5
-rw-r--r--tools/Makefile.am2
-rw-r--r--tools/omfile.c4
-rw-r--r--tools/syslogd.c63
23 files changed, 520 insertions, 331 deletions
diff --git a/ChangeLog b/ChangeLog
index 18bb0e53..3488dfb2 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,10 @@
+- implemented function support in RainerScript. That means the engine
+ parses and compile functions, as well as executes a few build-in
+ ones. Dynamic loading and registration of functions is not yet
+ supported - but we now have a good foundation to do that later on.
+ NOTE: nested function calls are not yet supported due to a design
+ issue with the function call VM instruction set design.
+- implemented the strlen() RainerScript function
---------------------------------------------------------------------------
Version 4.1.6 [DEVEL] (rgerhards), 2009-03-??
---------------------------------------------------------------------------
diff --git a/action.c b/action.c
index a41f976c..cd4ba240 100644
--- a/action.c
+++ b/action.c
@@ -180,7 +180,7 @@ rsRetVal actionDestruct(action_t *pThis)
ASSERT(pThis != NULL);
if(pThis->pQueue != NULL) {
- queueDestruct(&pThis->pQueue);
+ qqueueDestruct(&pThis->pQueue);
}
if(pThis->pMod != NULL)
@@ -255,7 +255,7 @@ actionConstructFinalize(action_t *pThis)
* to be run on multiple threads. So far, this is forbidden by the interface
* spec. -- rgerhards, 2008-01-30
*/
- CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
+ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
obj.SetName((obj_t*) pThis->pQueue, pszQName);
/* ... set some properties ... */
@@ -268,24 +268,24 @@ actionConstructFinalize(action_t *pThis)
errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
}
- queueSetpUsr(pThis->pQueue, pThis);
- setQPROP(queueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
- setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
- setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
- setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
- setQPROP(queueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
- setQPROP(queueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
- setQPROP(queueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown);
- setQPROP(queueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq);
- setQPROP(queueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark);
- setQPROP(queueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark);
- setQPROP(queueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark);
- setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
- setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
- setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
- setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
- setQPROP(queueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr);
- setQPROP(queueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr);
+ qqueueSetpUsr(pThis->pQueue, pThis);
+ setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
+ setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
+ setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
+ setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
+ setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
+ setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
+ setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown);
+ setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq);
+ setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark);
+ setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark);
+ setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark);
+ setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
+ setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
+ setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
+ setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
+ setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr);
+ setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr);
# undef setQPROP
# undef setQPROPstr
@@ -294,7 +294,7 @@ actionConstructFinalize(action_t *pThis)
bActionQSaveOnShutdown, iActionQueMaxDiskSpace);
- CHKiRet(queueStart(pThis->pQueue));
+ CHKiRet(qqueueStart(pThis->pQueue));
dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue);
/* and now reset the queue params (see comment in its function header!) */
@@ -675,7 +675,7 @@ actionWriteToAction(action_t *pAction)
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = queueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
+ iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
if(iRet == RS_RET_OK)
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
diff --git a/action.h b/action.h
index e35e634c..dc9bbd74 100644
--- a/action.h
+++ b/action.h
@@ -68,7 +68,7 @@ struct action_s {
* content later). This is preserved after the message has been
* processed - it is also used to detect duplicates.
*/
- queue_t *pQueue; /* action queue */
+ qqueue_t *pQueue; /* action queue */
SYNC_OBJ_TOOL; /* required for mutex support */
pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */
};
diff --git a/configure.ac b/configure.ac
index 00b65410..4763fcd5 100644
--- a/configure.ac
+++ b/configure.ac
@@ -35,6 +35,13 @@ case "${host}" in
# do not DEFINE OS_BSD
os_type="bsd"
;;
+ *-*-solaris*)
+ os_type="solaris"
+ AC_DEFINE([OS_SOLARIS], [1], [Indicator for a Solaris OS])
+ AC_DEFINE([_POSIX_PTHREAD_SEMANTICS], [1], [Use POSIX pthread semantics])
+ SOL_LIBS="-lsocket -lnsl"
+ AC_SUBST(SOL_LIBS)
+ ;;
esac
AC_DEFINE_UNQUOTED([HOSTENV], "$host", [the host environment, can be queried via a system variable])
@@ -233,7 +240,10 @@ if test "x$enable_pthreads" != "xno"; then
[
AC_DEFINE([USE_PTHREADS], [1], [Multithreading support enabled.])
PTHREADS_LIBS="-lpthread"
- PTHREADS_CFLAGS="-pthread"
+ case "${os_type}" in
+ solaris) PTHREADS_CFLAGS="-pthreads" ;;
+ *) PTHREADS_CFLAGS="-pthread" ;;
+ esac
AC_SUBST(PTHREADS_LIBS)
AC_SUBST(PTHREADS_CFLAGS)
],
diff --git a/runtime/conf.c b/runtime/conf.c
index c5208d86..ede15cc7 100644
--- a/runtime/conf.c
+++ b/runtime/conf.c
@@ -46,7 +46,9 @@
#include <glob.h>
#include <sys/types.h>
#ifdef HAVE_LIBGEN_H
-# include <libgen.h>
+# ifndef OS_SOLARIS
+# include <libgen.h>
+# endif
#endif
#include "rsyslog.h"
@@ -68,6 +70,9 @@
#include "ctok.h"
#include "ctok_token.h"
+#ifdef OS_SOLARIS
+# define NAME_MAX MAXNAMELEN
+#endif
/* forward definitions */
static rsRetVal cfline(uchar *line, selector_t **pfCurr);
@@ -789,6 +794,10 @@ dbgprintf("calling expression parser, pp %p ('%s')\n", *pline, *pline);
CHKiRet(ctok.Getpp(tok, pline));
CHKiRet(ctok.Destruct(&tok));
+ /* debug support - print vmprg after construction (uncomment to use) */
+ /* vmprgDebugPrint(f->f_filterData.f_expr->pVmprg); */
+ vmprgDebugPrint(f->f_filterData.f_expr->pVmprg);
+
/* we now need to skip whitespace to the action part, else we confuse
* the legacy rsyslog conf parser. -- rgerhards, 2008-02-25
*/
diff --git a/runtime/ctok.c b/runtime/ctok.c
index ceab15bd..d2cd8bbd 100644
--- a/runtime/ctok.c
+++ b/runtime/ctok.c
@@ -259,12 +259,17 @@ ctokGetVar(ctok_t *pThis, ctok_token_t *pToken)
}
CHKiRet(rsCStrConstruct(&pstrVal));
- /* this loop is quite simple, a variable name is terminated by whitespace. */
- while(!isspace(c)) {
+ /* this loop is quite simple, a variable name is terminated when a non-supported
+ * character is detected. Note that we currently permit a numerical digit as the
+ * first char, which is not permitted by ABNF. -- rgerhards, 2009-03-10
+ */
+ while(isalpha(c) || isdigit(c) || (c == '_') || (c == '-')) {
CHKiRet(rsCStrAppendChar(pstrVal, tolower(c)));
CHKiRet(ctokGetCharFromStream(pThis, &c));
}
- CHKiRet(rsCStrFinish(pStrB));
+ CHKiRet(ctokUngetCharFromStream(pThis, c)); /* put not processed char back */
+
+ CHKiRet(rsCStrFinish(pstrVal));
CHKiRet(var.SetString(pToken->pVar, pstrVal));
pstrVal = NULL;
@@ -389,6 +394,7 @@ ctokGetToken(ctok_t *pThis, ctok_token_t **ppToken)
uchar c;
uchar szWord[128];
int bRetry = 0; /* retry parse? Only needed for inline comments... */
+ cstr_t *pstrVal;
ISOBJ_TYPE_assert(pThis, ctok);
ASSERT(ppToken != NULL);
@@ -512,7 +518,10 @@ ctokGetToken(ctok_t *pThis, ctok_token_t **ppToken)
/* push c back, higher level parser needs it */
CHKiRet(ctokUngetCharFromStream(pThis, c));
pToken->tok = ctok_FUNCTION;
- /* TODO: fill function name */
+ /* fill function name */
+ CHKiRet(rsCStrConstruct(&pstrVal));
+ CHKiRet(rsCStrSetSzStr(pstrVal, szWord));
+ CHKiRet(var.SetString(pToken->pVar, pstrVal));
} else { /* give up... */
dbgprintf("parser has an invalid word (token) '%s'\n", szWord);
pToken->tok = ctok_INVALID;
diff --git a/runtime/expr.c b/runtime/expr.c
index ee5b9e2c..38ed1c68 100644
--- a/runtime/expr.c
+++ b/runtime/expr.c
@@ -55,11 +55,63 @@ DEFobjCurrIf(ctok)
* rgerhards, 2008-02-19
*/
-/* forward definiton - thanks to recursive ABNF, we can not avoid at least one ;) */
+/* forward definition - thanks to recursive ABNF, we can not avoid at least one ;) */
static rsRetVal expr(expr_t *pThis, ctok_t *tok);
static rsRetVal
+function(expr_t *pThis, ctok_t *tok)
+{
+ DEFiRet;
+ ctok_token_t *pToken = NULL;
+ int iNumArgs = 0;
+ var_t *pVar;
+
+ ISOBJ_TYPE_assert(pThis, expr);
+ ISOBJ_TYPE_assert(tok, ctok);
+
+ CHKiRet(ctok.GetToken(tok, &pToken));
+ /* note: pToken is destructed in finalize_it */
+
+ if(pToken->tok == ctok_LPAREN) {
+ CHKiRet(ctok_token.Destruct(&pToken)); /* token processed, "eat" it */
+ CHKiRet(ctok.GetToken(tok, &pToken)); /* get next one */
+ } else
+ ABORT_FINALIZE(RS_RET_FUNC_NO_LPAREN);
+
+ /* we first push all the params on the stack. Then we call the function */
+ while(pToken->tok != ctok_RPAREN) {
+ ++iNumArgs;
+ CHKiRet(ctok.UngetToken(tok, pToken)); /* not for us, so let others process it */
+ CHKiRet(expr(pThis, tok));
+ CHKiRet(ctok.GetToken(tok, &pToken)); /* get next one, needed for while() check */
+ if(pToken->tok == ctok_COMMA) {
+ CHKiRet(ctok_token.Destruct(&pToken)); /* token processed, "eat" it */
+ CHKiRet(ctok.GetToken(tok, &pToken)); /* get next one */
+ if(pToken->tok == ctok_RPAREN) {
+ ABORT_FINALIZE(RS_RET_FUNC_MISSING_EXPR);
+ }
+ }
+ }
+
+
+ /* now push number of arguments - this must be on top of the stack */
+ CHKiRet(var.Construct(&pVar));
+ CHKiRet(var.ConstructFinalize(pVar));
+ CHKiRet(var.SetNumber(pVar, iNumArgs));
+ CHKiRet(vmprg.AddVarOperation(pThis->pVmprg, opcode_PUSHCONSTANT, pVar)); /* add to program */
+
+
+finalize_it:
+ if(pToken != NULL) {
+ ctok_token.Destruct(&pToken); /* "eat" processed token */
+ }
+
+ RETiRet;
+}
+
+
+static rsRetVal
terminal(expr_t *pThis, ctok_t *tok)
{
DEFiRet;
@@ -85,8 +137,12 @@ terminal(expr_t *pThis, ctok_t *tok)
break;
case ctok_FUNCTION:
dbgoprint((obj_t*) pThis, "function\n");
- /* TODO: vm: call - well, need to implement that first */
- ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED);
+ CHKiRet(function(pThis, tok)); /* this creates the stack call frame */
+ /* ... but we place the call instruction onto the stack ourselfs (because
+ * we have all relevant information)
+ */
+ CHKiRet(ctok_token.UnlinkVar(pToken, &pVar));
+ CHKiRet(vmprg.AddVarOperation(pThis->pVmprg, opcode_FUNC_CALL, pVar)); /* add to program */
break;
case ctok_MSGVAR:
dbgoprint((obj_t*) pThis, "MSGVAR\n");
@@ -406,6 +462,7 @@ ENDobjQueryInterface(expr)
*/
BEGINObjClassInit(expr, 1, OBJ_IS_CORE_MODULE) /* class, version */
/* request objects we use */
+ CHKiRet(objUse(var, CORE_COMPONENT));
CHKiRet(objUse(vmprg, CORE_COMPONENT));
CHKiRet(objUse(var, CORE_COMPONENT));
CHKiRet(objUse(ctok_token, CORE_COMPONENT));
diff --git a/runtime/modules.c b/runtime/modules.c
index 169d234b..d548a949 100644
--- a/runtime/modules.c
+++ b/runtime/modules.c
@@ -49,6 +49,10 @@
#include <unistd.h>
#include <sys/file.h>
+#ifdef OS_SOLARIS
+# define PATH_MAX MAXPATHLEN
+#endif
+
#include "cfsysline.h"
#include "modules.h"
#include "errmsg.h"
diff --git a/runtime/net.c b/runtime/net.c
index 4e6d54a1..db2d7e37 100644
--- a/runtime/net.c
+++ b/runtime/net.c
@@ -63,6 +63,11 @@
#include "errmsg.h"
#include "net.h"
+#ifdef OS_SOLARIS
+# define s6_addr32 _S6_un._S6_u32
+ typedef unsigned int u_int32_t;
+#endif
+
MODULE_TYPE_LIB
/* static data */
diff --git a/runtime/queue.c b/runtime/queue.c
index a3e29a96..c4a0fad2 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -51,19 +51,24 @@
#include "wti.h"
#include "atomic.h"
+#ifdef OS_SOLARIS
+# include <sched.h>
+# define pthread_yield() sched_yield()
+#endif
+
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(glbl)
/* forward-definitions */
-rsRetVal queueChkPersist(queue_t *pThis);
-static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex);
-static rsRetVal queueRateLimiter(queue_t *pThis);
-static int queueChkStopWrkrDA(queue_t *pThis);
-static int queueIsIdleDA(queue_t *pThis);
-static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
-static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2);
-static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex);
+rsRetVal qqueueChkPersist(qqueue_t *pThis);
+static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
+static rsRetVal qqueueRateLimiter(qqueue_t *pThis);
+static int qqueueChkStopWrkrDA(qqueue_t *pThis);
+static int qqueueIsIdleDA(qqueue_t *pThis);
+static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
+static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2);
+static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -77,7 +82,7 @@ static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex);
* rgerhards, 2008-01-29
*/
static inline int
-queueGetOverallQueueSize(queue_t *pThis)
+qqueueGetOverallQueueSize(qqueue_t *pThis)
{
#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */
BEGINfunc
@@ -96,7 +101,7 @@ ENDfunc
* This function returns void, as it makes no sense to communicate an error back, even if
* it happens.
*/
-static inline void queueDrain(queue_t *pThis)
+static inline void queueDrain(qqueue_t *pThis)
{
void *pUsr;
@@ -119,26 +124,26 @@ static inline void queueDrain(queue_t *pThis)
* this point in time. The mutex must be locked when
* ths function is called. -- rgerhards, 2008-01-25
*/
-static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
+static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
{
DEFiRet;
int iMaxWorkers;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(!pThis->bEnqOnly) {
if(pThis->bRunsDA) {
/* if we have not yet reached the high water mark, there is no need to start a
* worker. -- rgerhards, 2008-01-26
*/
- if(queueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
+ if(qqueueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
}
} else {
if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
iMaxWorkers = 1;
} else {
- iMaxWorkers = queueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
+ iMaxWorkers = qqueueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
}
wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
}
@@ -153,11 +158,11 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
* rgerhards, 2008-02-27
*/
static rsRetVal
-queueWaitDAModeInitialized(queue_t *pThis)
+qqueueWaitDAModeInitialized(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->bRunsDA);
while(pThis->bRunsDA != 2) {
@@ -179,17 +184,17 @@ queueWaitDAModeInitialized(queue_t *pThis)
* rgerhards, 2008-01-15
*/
static rsRetVal
-queueTurnOffDAMode(queue_t *pThis)
+qqueueTurnOffDAMode(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->bRunsDA);
/* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
* to wait for its startup... -- rgerhards, 2008-01-25
*/
- queueWaitDAModeInitialized(pThis);
+ qqueueWaitDAModeInitialized(pThis);
/* if we need to pull any data that we still need from the (child) disk queue,
* now would be the time to do so. At present, we do not need this, but I'd like to
@@ -208,15 +213,15 @@ queueTurnOffDAMode(queue_t *pThis)
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
* this will be quick.
*/
- queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
+ qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
iRet);
/* now we need to check if the regular queue has some messages. This may be the case
* when it is waiting that the high water mark is reached again. If so, we need to start up
* a regular worker. -- rgerhards, 2008-01-26
*/
- if(queueGetOverallQueueSize(pThis) > 0) {
- queueAdviseMaxWorkers(pThis);
+ if(qqueueGetOverallQueueSize(pThis) > 0) {
+ qqueueAdviseMaxWorkers(pThis);
}
}
@@ -232,11 +237,11 @@ queueTurnOffDAMode(queue_t *pThis)
* rgerhards, 2008-01-14
*/
static rsRetVal
-queueChkIsDA(queue_t *pThis)
+qqueueChkIsDA(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pszFilePrefix != NULL) {
pThis->bIsDA = 1;
dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n");
@@ -260,18 +265,18 @@ queueChkIsDA(queue_t *pThis)
* rgerhards, 2008-01-15
*/
static rsRetVal
-queueStartDA(queue_t *pThis)
+qqueueStartDA(qqueue_t *pThis)
{
DEFiRet;
uchar pszDAQName[128];
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */
FINALIZE; /* ... then we are already done! */
/* create message queue */
- CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
+ CHKiRet(qqueueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
/* give it a name */
snprintf((char*) pszDAQName, sizeof(pszDAQName)/sizeof(uchar), "%s[DA]", obj.GetName((obj_t*) pThis));
@@ -282,30 +287,30 @@ queueStartDA(queue_t *pThis)
*/
pThis->pqDA->pqParent = pThis;
- CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr));
- CHKiRet(queueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
- CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
- CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
- CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
- CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
- CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
- CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
- CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
- CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
- CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
- CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
- CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
+ CHKiRet(qqueueSetpUsr(pThis->pqDA, pThis->pUsr));
+ CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
+ CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
+ CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
+ CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
+ CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
+ CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq));
+ CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
+ CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
+ CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
+ CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0));
+ CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0));
if(pThis->toQShutdown == 0) {
- CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
+ CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
} else {
/* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND
* have an obviously large backlog, we can't finish it in any case. So there is no point
* in holding shutdown longer than necessary. -- rgerhards, 2008-01-15
*/
- CHKiRet(queueSettoQShutdown(pThis->pqDA, 1));
+ CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1));
}
- iRet = queueStart(pThis->pqDA);
+ iRet = qqueueStart(pThis->pqDA);
/* file not found is expected, that means it is no previous QIF available */
if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
FINALIZE; /* something is wrong */
@@ -323,12 +328,12 @@ queueStartDA(queue_t *pThis)
pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
- queueGetID(pThis->pqDA));
+ qqueueGetID(pThis->pqDA));
finalize_it:
if(iRet != RS_RET_OK) {
if(pThis->pqDA != NULL) {
- queueDestruct(&pThis->pqDA);
+ qqueueDestruct(&pThis->pqDA);
}
dbgoprint((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet);
pThis->bIsDA = 0;
@@ -345,7 +350,7 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static inline rsRetVal
-queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
+qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
@@ -363,12 +368,12 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) queueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueStartDA));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueTurnOffDAMode));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
@@ -401,14 +406,14 @@ finalize_it:
* rgerhards, 2008-01-14
*/
static inline rsRetVal
-queueChkStrtDA(queue_t *pThis)
+qqueueChkStrtDA(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
/* if we do not hit the high water mark, we have nothing to do */
- if(queueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk)
+ if(qqueueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk)
ABORT_FINALIZE(RS_RET_OK);
if(pThis->bRunsDA) {
@@ -422,15 +427,15 @@ queueChkStrtDA(queue_t *pThis)
* we need at least one).
*/
dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n",
- queueGetOverallQueueSize(pThis));
- queueAdviseMaxWorkers(pThis);
+ qqueueGetOverallQueueSize(pThis));
+ qqueueAdviseMaxWorkers(pThis);
} else {
/* this is the case when we are currently not running in DA mode. So it is time
* to turn it back on.
*/
dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
- queueGetOverallQueueSize(pThis));
- queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
+ qqueueGetOverallQueueSize(pThis));
+ qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
}
finalize_it:
@@ -448,7 +453,7 @@ finalize_it:
*/
/* -------------------- fixed array -------------------- */
-static rsRetVal qConstructFixedArray(queue_t *pThis)
+static rsRetVal qConstructFixedArray(qqueue_t *pThis)
{
DEFiRet;
@@ -464,14 +469,14 @@ static rsRetVal qConstructFixedArray(queue_t *pThis)
pThis->tVars.farray.head = 0;
pThis->tVars.farray.tail = 0;
- queueChkIsDA(pThis);
+ qqueueChkIsDA(pThis);
finalize_it:
RETiRet;
}
-static rsRetVal qDestructFixedArray(queue_t *pThis)
+static rsRetVal qDestructFixedArray(qqueue_t *pThis)
{
DEFiRet;
@@ -486,7 +491,7 @@ static rsRetVal qDestructFixedArray(queue_t *pThis)
}
-static rsRetVal qAddFixedArray(queue_t *pThis, void* in)
+static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
{
DEFiRet;
@@ -499,7 +504,7 @@ static rsRetVal qAddFixedArray(queue_t *pThis, void* in)
RETiRet;
}
-static rsRetVal qDelFixedArray(queue_t *pThis, void **out)
+static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out)
{
DEFiRet;
@@ -518,7 +523,7 @@ static rsRetVal qDelFixedArray(queue_t *pThis, void **out)
/* first some generic functions which are also used for the unget linked list */
-static inline rsRetVal queueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr)
+static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr)
{
DEFiRet;
qLinkedList_t *pEntry;
@@ -544,7 +549,7 @@ finalize_it:
RETiRet;
}
-static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr)
+static inline rsRetVal qqueueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr)
{
DEFiRet;
qLinkedList_t *pEntry;
@@ -571,7 +576,7 @@ static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t
/* end generic functions which are also used for the unget linked list */
-static rsRetVal qConstructLinkedList(queue_t *pThis)
+static rsRetVal qConstructLinkedList(qqueue_t *pThis)
{
DEFiRet;
@@ -580,13 +585,13 @@ static rsRetVal qConstructLinkedList(queue_t *pThis)
pThis->tVars.linklist.pRoot = 0;
pThis->tVars.linklist.pLast = 0;
- queueChkIsDA(pThis);
+ qqueueChkIsDA(pThis);
RETiRet;
}
-static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis)
+static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis)
{
DEFiRet;
@@ -599,11 +604,11 @@ static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis)
RETiRet;
}
-static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr)
+static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
{
DEFiRet;
- iRet = queueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
+ iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
#if 0
qLinkedList_t *pEntry;
@@ -627,10 +632,10 @@ finalize_it:
RETiRet;
}
-static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr)
+static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
- iRet = queueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
+ iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
#if 0
qLinkedList_t *pEntry;
@@ -657,11 +662,11 @@ static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr)
static rsRetVal
-queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t __attribute__((unused)) *pThis)
+qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pThis)
{
DEFiRet;
ISOBJ_TYPE_assert(pStrm, strm);
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
CHKiRet(strmSetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
finalize_it:
RETiRet;
@@ -673,14 +678,14 @@ finalize_it:
* rgerhards, 2008-01-15
*/
static rsRetVal
-queueHaveQIF(queue_t *pThis)
+qqueueHaveQIF(qqueue_t *pThis)
{
DEFiRet;
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
struct stat stat_buf;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pszFilePrefix == NULL)
ABORT_FINALIZE(RS_RET_NO_FILEPREFIX);
@@ -710,7 +715,7 @@ finalize_it:
* rgerhards, 2008-01-11
*/
static rsRetVal
-queueTryLoadPersistedInfo(queue_t *pThis)
+qqueueTryLoadPersistedInfo(qqueue_t *pThis)
{
DEFiRet;
strm_t *psQIF = NULL;
@@ -720,7 +725,7 @@ queueTryLoadPersistedInfo(queue_t *pThis)
int iUngottenObjs;
obj_t *pUsr;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
@@ -755,15 +760,15 @@ queueTryLoadPersistedInfo(queue_t *pThis)
while(iUngottenObjs > 0) {
/* fill the queue from disk */
CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL));
- queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
+ qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
--iUngottenObjs; /* one less */
}
/* and now the stream objects (some order as when persisted!) */
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF,
- (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
+ (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF,
- (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
+ (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead));
@@ -793,7 +798,7 @@ finalize_it:
* allowed file size at this point - that should be a config setting...
* rgerhards, 2008-01-10
*/
-static rsRetVal qConstructDisk(queue_t *pThis)
+static rsRetVal qConstructDisk(qqueue_t *pThis)
{
DEFiRet;
int bRestarted = 0;
@@ -801,7 +806,7 @@ static rsRetVal qConstructDisk(queue_t *pThis)
ASSERT(pThis != NULL);
/* and now check if there is some persistent information that needs to be read in */
- iRet = queueTryLoadPersistedInfo(pThis);
+ iRet = qqueueTryLoadPersistedInfo(pThis);
if(iRet == RS_RET_OK)
bRestarted = 1;
else if(iRet != RS_RET_FILE_NOT_FOUND)
@@ -843,7 +848,7 @@ finalize_it:
}
-static rsRetVal qDestructDisk(queue_t *pThis)
+static rsRetVal qDestructDisk(qqueue_t *pThis)
{
DEFiRet;
@@ -855,7 +860,7 @@ static rsRetVal qDestructDisk(queue_t *pThis)
RETiRet;
}
-static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
+static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
{
DEFiRet;
number_t nWriteCount;
@@ -882,7 +887,7 @@ finalize_it:
RETiRet;
}
-static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
+static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr)
{
DEFiRet;
@@ -913,18 +918,18 @@ finalize_it:
}
/* -------------------- direct (no queueing) -------------------- */
-static rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis)
+static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
{
return RS_RET_OK;
}
-static rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis)
+static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
{
return RS_RET_OK;
}
-static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
+static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
DEFiRet;
@@ -941,7 +946,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
RETiRet;
}
-static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
+static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
{
return RS_RET_OK;
}
@@ -956,12 +961,12 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__
* rgerhards, 2008-01-20
*/
static rsRetVal
-queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex)
+qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15
The second time I noticed it the queue was in destruction with NO worker threads
running. The pUsr ptr was totally off and provided no clue what it may be pointing
@@ -970,7 +975,7 @@ queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex)
dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr));
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
- iRet = queueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
+ iRet = qqueueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
++pThis->iUngottenObjs; /* indicate one more */
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -986,14 +991,14 @@ queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex)
* rgerhards, 2008-01-29
*/
static rsRetVal
-queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr)
+qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(ppUsr != NULL);
- iRet = queueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
+ iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
--pThis->iUngottenObjs; /* indicate one less */
dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr));
@@ -1007,7 +1012,7 @@ queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr)
* things truely different. -- rgerhards, 2008-02-12
*/
static rsRetVal
-queueAdd(queue_t *pThis, void *pUsr)
+qqueueAdd(qqueue_t *pThis, void *pUsr)
{
DEFiRet;
@@ -1030,7 +1035,7 @@ finalize_it:
* ungotten list and, if so, dequeue it first.
*/
static rsRetVal
-queueDel(queue_t *pThis, void *pUsr)
+qqueueDel(qqueue_t *pThis, void *pUsr)
{
DEFiRet;
@@ -1042,7 +1047,7 @@ queueDel(queue_t *pThis, void *pUsr)
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
if(pThis->iUngottenObjs > 0) {
- iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr);
+ iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
ATOMIC_DEC(pThis->iQueueSize);
@@ -1066,14 +1071,14 @@ queueDel(queue_t *pThis, void *pUsr)
* complex) if each would have its own shutdown. The function does not self check
* this condition - the caller must make sure it is not called with a parent.
*/
-static rsRetVal queueShutdownWorkers(queue_t *pThis)
+static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
{
DEFiRet;
DEFVARS_mutexProtection;
struct timespec tTimeout;
rsRetVal iRetLocal;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
@@ -1087,7 +1092,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* first try to shutdown the queue within the regular shutdown period */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(queueGetOverallQueueSize(pThis) > 0) {
+ if(qqueueGetOverallQueueSize(pThis) > 0) {
if(pThis->bRunsDA) {
/* We may have waited on the low water mark. As it may have changed, we
* see if we reactivate the worker.
@@ -1125,7 +1130,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
if(pThis->bRunsDA) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
- queueGetID(pThis->pqDA));
+ qqueueGetID(pThis->pqDA));
/* we use the same absolute timeout as above, so we do not use more than the configured
* timeout interval!
*/
@@ -1154,19 +1159,19 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
if(pThis->bRunsDA)
- queueWaitDAModeInitialized(pThis);
+ qqueueWaitDAModeInitialized(pThis);
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
- if(pThis->bIsDA && queueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
+ if(pThis->bIsDA && qqueueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
/* switch to enqueue-only mode so that no more actions happen */
if(pThis->bRunsDA == 0) {
- queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
+ qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
} else {
/* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1)
* but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27
*/
- queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
+ qqueueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
}
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
/* make sure we do not timeout before we are done */
@@ -1188,7 +1193,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* they will automatically terminate as there no longer is any message left to process.
*/
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(queueGetOverallQueueSize(pThis) > 0) {
+ if(qqueueGetOverallQueueSize(pThis) > 0) {
timeoutComp(&tTimeout, pThis->toActShutdown);
if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -1257,7 +1262,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running.
*/
- dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", queueGetOverallQueueSize(pThis));
+ dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", qqueueGetOverallQueueSize(pThis));
RETiRet;
}
@@ -1269,17 +1274,17 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* is done by queueStart(). The reason is that we want to give the caller a chance
* to modify some parameters before the queue is actually started.
*/
-rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
+rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*))
{
DEFiRet;
- queue_t *pThis;
+ qqueue_t *pThis;
ASSERT(ppThis != NULL);
ASSERT(pConsumer != NULL);
ASSERT(iWorkerThreads >= 0);
- if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) {
+ if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
@@ -1316,7 +1321,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
pThis->qConstruct = qConstructLinkedList;
pThis->qDestruct = qDestructLinkedList;
pThis->qAdd = qAddLinkedList;
- pThis->qDel = (rsRetVal (*)(queue_t*,void**)) qDelLinkedList;
+ pThis->qDel = (rsRetVal (*)(qqueue_t*,void**)) qDelLinkedList;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
@@ -1343,25 +1348,25 @@ finalize_it:
/* cancellation cleanup handler for queueWorker ()
* Updates admin structure and frees ressources.
* Params:
- * arg1 - user pointer (in this case a queue_t)
+ * arg1 - user pointer (in this case a qqueue_t)
* arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!])
* Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists!
* rgerhards, 2008-01-16
*/
static rsRetVal
-queueConsumerCancelCleanup(void *arg1, void *arg2)
+qqueueConsumerCancelCleanup(void *arg1, void *arg2)
{
DEFiRet;
- queue_t *pThis = (queue_t*) arg1;
+ qqueue_t *pThis = (qqueue_t*) arg1;
obj_t *pUsr = (obj_t*) arg2;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pUsr != NULL) {
/* make sure the data element is not lost */
dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
- CHKiRet(queueUngetObj(pThis, pUsr, LOCK_MUTEX));
+ CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX));
}
finalize_it:
@@ -1383,13 +1388,13 @@ finalize_it:
* the return state!
* rgerhards, 2008-01-24
*/
-static int queueChkDiscardMsg(queue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr)
+static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr)
{
DEFiRet;
rsRetVal iRetLocal;
int iSeverity;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_assert(pUsr);
if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) {
@@ -1414,7 +1419,7 @@ finalize_it:
* rgerhards, 2008-10-21
*/
static rsRetVal
-queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
+qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
void *pUsr;
@@ -1422,9 +1427,9 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
int bRunsDA; /* cache for early mutex release */
/* dequeue element (still protected from mutex) */
- iRet = queueDel(pThis, &pUsr);
- queueChkPersist(pThis);
- iQueueSize = queueGetOverallQueueSize(pThis); /* cache this for after mutex release */
+ iRet = qqueueDel(pThis, &pUsr);
+ qqueueChkPersist(pThis);
+ iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */
bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
/* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY
@@ -1475,7 +1480,7 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
* provide real-time creation of spool files.
* Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong.
*/
- CHKiRet(queueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
+ CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
finalize_it:
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
@@ -1524,7 +1529,7 @@ finalize_it:
* but you get the idea from the code above.
*/
static rsRetVal
-queueRateLimiter(queue_t *pThis)
+qqueueRateLimiter(qqueue_t *pThis)
{
DEFiRet;
int iDelay;
@@ -1532,7 +1537,7 @@ queueRateLimiter(queue_t *pThis)
time_t tCurr;
struct tm m;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
iDelay = 0;
if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */
@@ -1587,14 +1592,14 @@ queueRateLimiter(queue_t *pThis)
* rgerhards, 2008-01-21
*/
static rsRetVal
-queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
+qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
/* we now need to check if we should deliberately delay processing a bit
@@ -1621,15 +1626,15 @@ finalize_it:
* rgerhards, 2008-01-14
*/
static rsRetVal
-queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
+qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(queueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
+ CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
@@ -1645,7 +1650,7 @@ finalize_it:
* the DA queue
*/
static int
-queueChkStopWrkrDA(queue_t *pThis)
+qqueueChkStopWrkrDA(qqueue_t *pThis)
{
/* if our queue is in destruction, we drain to the DA queue and so we shall not terminate
* until we have done so.
@@ -1664,7 +1669,7 @@ queueChkStopWrkrDA(queue_t *pThis)
&& pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
/* this queue can never grow, so we can give up... */
bStopWrkr = 1;
- } else if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
+ } else if(qqueueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
bStopWrkr = 1;
} else {
bStopWrkr = 0;
@@ -1687,9 +1692,9 @@ queueChkStopWrkrDA(queue_t *pThis)
* the DA queue
*/
static int
-queueChkStopWrkrReg(queue_t *pThis)
+qqueueChkStopWrkrReg(qqueue_t *pThis)
{
- return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && queueGetOverallQueueSize(pThis) == 0);
+ return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0);
}
@@ -1697,26 +1702,26 @@ queueChkStopWrkrReg(queue_t *pThis)
* are not stable! DA queue version
*/
static int
-queueIsIdleDA(queue_t *pThis)
+qqueueIsIdleDA(qqueue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
/* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
- return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
}
/* must only be called when the queue mutex is locked, else results
* are not stable! Regular queue version
*/
static int
-queueIsIdleReg(queue_t *pThis)
+qqueueIsIdleReg(qqueue_t *pThis)
{
#if 0 /* enable for performance testing */
int ret;
- ret = queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk);
+ ret = qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk);
if(ret) fprintf(stderr, "queue is idle\n");
return ret;
#else
/* regular code! */
- return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
#endif
}
@@ -1735,11 +1740,11 @@ queueIsIdleReg(queue_t *pThis)
* I am telling this, because I, too, always get confused by those...
*/
static rsRetVal
-queueRegOnWrkrShutdown(queue_t *pThis)
+qqueueRegOnWrkrShutdown(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pqParent != NULL) {
pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
@@ -1756,11 +1761,11 @@ queueRegOnWrkrShutdown(queue_t *pThis)
* hook to indicate in the parent queue (if we are a child) that we are not done yet.
*/
static rsRetVal
-queueRegOnWrkrStartup(queue_t *pThis)
+qqueueRegOnWrkrStartup(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pqParent != NULL) {
pThis->pqParent->bChildIsDone = 0;
@@ -1773,7 +1778,7 @@ queueRegOnWrkrStartup(queue_t *pThis)
/* start up the queue - it must have been constructed and parameters defined
* before.
*/
-rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
+rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
rsRetVal iRetLocal;
@@ -1811,7 +1816,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
- queueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1);
+ qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
@@ -1822,13 +1827,13 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
- CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))queueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrStartup));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrShutdown));
+ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
@@ -1841,10 +1846,10 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
/* If we are disk-assisted, we need to check if there is a QIF file
* which we need to load. -- rgerhards, 2008-01-15
*/
- iRetLocal = queueHaveQIF(pThis);
+ iRetLocal = qqueueHaveQIF(pThis);
if(iRetLocal == RS_RET_OK) {
dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n");
- queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
+ qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
bInitialized = 1; /* we are done */
} else {
/* TODO: use logerror? -- rgerhards, 2008-01-16 */
@@ -1861,7 +1866,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
/* if the queue already contains data, we need to start the correct number of worker threads. This can be
* the case when a disk queue has been loaded. If we did not start it here, it would never start.
*/
- queueAdviseMaxWorkers(pThis);
+ qqueueAdviseMaxWorkers(pThis);
pThis->bQueueStarted = 1;
finalize_it:
@@ -1876,7 +1881,7 @@ finalize_it:
* and 0 otherwise.
* rgerhards, 2008-01-10
*/
-static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
+static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
{
DEFiRet;
strm_t *psQIF = NULL; /* Queue Info File */
@@ -1887,7 +1892,7 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
ASSERT(pThis != NULL);
if(pThis->qType != QUEUETYPE_DISK) {
- if(queueGetOverallQueueSize(pThis) > 0) {
+ if(qqueueGetOverallQueueSize(pThis) > 0) {
/* This error code is OK, but we will probably not implement this any time
* The reason is that persistence happens via DA queues. But I would like to
* leave the code as is, as we so have a hook in case we need one.
@@ -1898,13 +1903,13 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
FINALIZE; /* if the queue is empty, we are happy and done... */
}
- dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", queueGetOverallQueueSize(pThis));
+ dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", qqueueGetOverallQueueSize(pThis));
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
(char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
- if((bIsCheckpoint != QUEUE_CHECKPOINT) && (queueGetOverallQueueSize(pThis) == 0)) {
+ if((bIsCheckpoint != QUEUE_CHECKPOINT) && (qqueueGetOverallQueueSize(pThis) == 0)) {
if(pThis->bNeedDelQIF) {
unlink((char*)pszQIFNam);
pThis->bNeedDelQIF = 0;
@@ -1938,7 +1943,7 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
* to the regular files. -- rgerhards, 2008-01-29
*/
while(pThis->iUngottenObjs > 0) {
- CHKiRet(queueGetUngottenObj(pThis, &pUsr));
+ CHKiRet(qqueueGetUngottenObj(pThis, &pUsr));
CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
objDestruct(pUsr);
}
@@ -1972,14 +1977,14 @@ finalize_it:
* abide to our regular call interface)...
* rgerhards, 2008-01-13
*/
-rsRetVal queueChkPersist(queue_t *pThis)
+rsRetVal qqueueChkPersist(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
- queuePersist(pThis, QUEUE_CHECKPOINT);
+ qqueuePersist(pThis, QUEUE_CHECKPOINT);
pThis->iUpdsSincePersist = 0;
}
@@ -1988,8 +1993,8 @@ rsRetVal queueChkPersist(queue_t *pThis)
/* destructor for the queue object */
-BEGINobjDestruct(queue) /* be sure to specify the object type also in END and CODESTART macros! */
-CODESTARTobjDestruct(queue)
+BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */
+CODESTARTobjDestruct(qqueue)
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
/* shut down all workers (handles *all* of the persistence logic)
@@ -1999,7 +2004,7 @@ CODESTARTobjDestruct(queue)
* with a child! -- rgerhards, 2008-01-28
*/
if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL)
- queueShutdownWorkers(pThis);
+ qqueueShutdownWorkers(pThis);
/* finally destruct our (regular) worker thread pool
* Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,
@@ -2024,7 +2029,7 @@ CODESTARTobjDestruct(queue)
wtpDestruct(&pThis->pWtpDA);
}
if(pThis->pqDA != NULL) {
- queueDestruct(&pThis->pqDA);
+ qqueueDestruct(&pThis->pqDA);
}
/* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty)
@@ -2034,7 +2039,7 @@ CODESTARTobjDestruct(queue)
* disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here
* if need arises (what I doubt...) -- rgerhards, 2008-01-25
*/
- CHKiRet_Hdlr(queuePersist(pThis, QUEUE_NO_CHECKPOINT)) {
+ CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) {
dbgoprint((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet);
}
@@ -2059,7 +2064,7 @@ CODESTARTobjDestruct(queue)
if(pThis->pszSpoolDir != NULL)
free(pThis->pszSpoolDir);
-ENDobjDestruct(queue)
+ENDobjDestruct(qqueue)
/* set the queue's file prefix
@@ -2068,7 +2073,7 @@ ENDobjDestruct(queue)
* rgerhards, 2008-01-09
*/
rsRetVal
-queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
+qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
{
DEFiRet;
@@ -2091,11 +2096,11 @@ finalize_it:
* rgerhards, 2008-01-09
*/
rsRetVal
-queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize)
+qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(iMaxFileSize < 1024) {
ABORT_FINALIZE(RS_RET_VALUE_TOO_LOW);
@@ -2112,13 +2117,13 @@ finalize_it:
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
{
DEFiRet;
int iCancelStateSave;
struct timespec t;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
/* first check if we need to discard this message (which will cause CHKiRet() to exit)
* rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize
@@ -2127,7 +2132,7 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
* threading tools may point this access to be an error, but this is done
* intentional. I do not see this causes problems to us.
*/
- CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
+ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
@@ -2142,7 +2147,7 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
/* then check if we need to add an assistance disk queue */
if(pThis->bIsDA)
- CHKiRet(queueChkStrtDA(pThis));
+ CHKiRet(qqueueChkStrtDA(pThis));
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
@@ -2195,13 +2200,13 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
}
/* and finally enqueue the message */
- CHKiRet(queueAdd(pThis, pUsr));
- queueChkPersist(pThis);
+ CHKiRet(qqueueAdd(pThis, pUsr));
+ qqueueChkPersist(pThis);
finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
/* make sure at least one worker is running. */
- queueAdviseMaxWorkers(pThis);
+ qqueueAdviseMaxWorkers(pThis);
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -2228,12 +2233,12 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static rsRetVal
-queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex)
+qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
/* for simplicity, we do one big mutex lock. This method is extremely seldom
* called, so that doesn't matter... -- rgerhards, 2008-01-16
@@ -2272,24 +2277,24 @@ finalize_it:
/* some simple object access methods */
-DEFpropSetMeth(queue, iPersistUpdCnt, int)
-DEFpropSetMeth(queue, iDeqtWinFromHr, int)
-DEFpropSetMeth(queue, iDeqtWinToHr, int)
-DEFpropSetMeth(queue, toQShutdown, long)
-DEFpropSetMeth(queue, toActShutdown, long)
-DEFpropSetMeth(queue, toWrkShutdown, long)
-DEFpropSetMeth(queue, toEnq, long)
-DEFpropSetMeth(queue, iHighWtrMrk, int)
-DEFpropSetMeth(queue, iLowWtrMrk, int)
-DEFpropSetMeth(queue, iDiscardMrk, int)
-DEFpropSetMeth(queue, iFullDlyMrk, int)
-DEFpropSetMeth(queue, iDiscardSeverity, int)
-DEFpropSetMeth(queue, bIsDA, int)
-DEFpropSetMeth(queue, iMinMsgsPerWrkr, int)
-DEFpropSetMeth(queue, bSaveOnShutdown, int)
-DEFpropSetMeth(queue, pUsr, void*)
-DEFpropSetMeth(queue, iDeqSlowdown, int)
-DEFpropSetMeth(queue, sizeOnDiskMax, int64)
+DEFpropSetMeth(qqueue, iPersistUpdCnt, int)
+DEFpropSetMeth(qqueue, iDeqtWinFromHr, int)
+DEFpropSetMeth(qqueue, iDeqtWinToHr, int)
+DEFpropSetMeth(qqueue, toQShutdown, long)
+DEFpropSetMeth(qqueue, toActShutdown, long)
+DEFpropSetMeth(qqueue, toWrkShutdown, long)
+DEFpropSetMeth(qqueue, toEnq, long)
+DEFpropSetMeth(qqueue, iHighWtrMrk, int)
+DEFpropSetMeth(qqueue, iLowWtrMrk, int)
+DEFpropSetMeth(qqueue, iDiscardMrk, int)
+DEFpropSetMeth(qqueue, iFullDlyMrk, int)
+DEFpropSetMeth(qqueue, iDiscardSeverity, int)
+DEFpropSetMeth(qqueue, bIsDA, int)
+DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
+DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
+DEFpropSetMeth(qqueue, pUsr, void*)
+DEFpropSetMeth(qqueue, iDeqSlowdown, int)
+DEFpropSetMeth(qqueue, sizeOnDiskMax, int64)
/* This function can be used as a generic way to set properties. Only the subset
@@ -2298,11 +2303,11 @@ DEFpropSetMeth(queue, sizeOnDiskMax, int64)
* rgerhards, 2008-01-11
*/
#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1)
-static rsRetVal queueSetProperty(queue_t *pThis, var_t *pProp)
+static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pProp != NULL);
if(isProp("iQueueSize")) {
@@ -2324,19 +2329,19 @@ finalize_it:
#undef isProp
/* dummy */
-rsRetVal queueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
+rsRetVal qqueueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
/* Initialize the stream class. Must be called as the very first method
* before anything else is called inside this class.
* rgerhards, 2008-01-09
*/
-BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE)
+BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
CHKiRet(objUse(glbl, CORE_COMPONENT));
/* now set our own handlers */
- OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
-ENDObjClassInit(queue)
+ OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty);
+ENDObjClassInit(qqueue)
/* vi:set ai:
*/
diff --git a/runtime/queue.h b/runtime/queue.h
index a2dd594f..a267862d 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -160,7 +160,7 @@ typedef struct queue_s {
strm_t *pRead; /* current file to be read */
} disk;
} tVars;
-} queue_t;
+} qqueue_t;
/* some symbolic constants for easier reference */
#define QUEUE_MODE_ENQDEQ 0
@@ -177,30 +177,30 @@ typedef struct queue_s {
#define QUEUE_TIMEOUT_ETERNAL 24 * 60 * 60 * 1000
/* prototypes */
-rsRetVal queueDestruct(queue_t **ppThis);
-rsRetVal queueEnqObj(queue_t *pThis, flowControl_t flwCtlType, void *pUsr);
-rsRetVal queueStart(queue_t *pThis);
-rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize);
-rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
-rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
+rsRetVal qqueueDestruct(qqueue_t **ppThis);
+rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr);
+rsRetVal qqueueStart(qqueue_t *pThis);
+rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
+rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
+rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*));
-PROTOTYPEObjClassInit(queue);
-PROTOTYPEpropSetMeth(queue, iPersistUpdCnt, int);
-PROTOTYPEpropSetMeth(queue, iDeqtWinFromHr, int);
-PROTOTYPEpropSetMeth(queue, iDeqtWinToHr, int);
-PROTOTYPEpropSetMeth(queue, toQShutdown, long);
-PROTOTYPEpropSetMeth(queue, toActShutdown, long);
-PROTOTYPEpropSetMeth(queue, toWrkShutdown, long);
-PROTOTYPEpropSetMeth(queue, toEnq, long);
-PROTOTYPEpropSetMeth(queue, iHighWtrMrk, int);
-PROTOTYPEpropSetMeth(queue, iLowWtrMrk, int);
-PROTOTYPEpropSetMeth(queue, iDiscardMrk, int);
-PROTOTYPEpropSetMeth(queue, iDiscardSeverity, int);
-PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int);
-PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int);
-PROTOTYPEpropSetMeth(queue, pUsr, void*);
-PROTOTYPEpropSetMeth(queue, iDeqSlowdown, int);
-PROTOTYPEpropSetMeth(queue, sizeOnDiskMax, int64);
-#define queueGetID(pThis) ((unsigned long) pThis)
+PROTOTYPEObjClassInit(qqueue);
+PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
+PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int);
+PROTOTYPEpropSetMeth(qqueue, iDeqtWinToHr, int);
+PROTOTYPEpropSetMeth(qqueue, toQShutdown, long);
+PROTOTYPEpropSetMeth(qqueue, toActShutdown, long);
+PROTOTYPEpropSetMeth(qqueue, toWrkShutdown, long);
+PROTOTYPEpropSetMeth(qqueue, toEnq, long);
+PROTOTYPEpropSetMeth(qqueue, iHighWtrMrk, int);
+PROTOTYPEpropSetMeth(qqueue, iLowWtrMrk, int);
+PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int);
+PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int);
+PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int);
+PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int);
+PROTOTYPEpropSetMeth(qqueue, pUsr, void*);
+PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int);
+PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64);
+#define qqueueGetID(pThis) ((unsigned long) pThis)
#endif /* #ifndef QUEUE_H_INCLUDED */
diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c
index 54db12c2..8df100a1 100644
--- a/runtime/rsyslog.c
+++ b/runtime/rsyslog.c
@@ -157,7 +157,7 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF)
if(ppErrObj != NULL) *ppErrObj = "wtp";
CHKiRet(wtpClassInit(NULL));
if(ppErrObj != NULL) *ppErrObj = "queue";
- CHKiRet(queueClassInit(NULL));
+ CHKiRet(qqueueClassInit(NULL));
if(ppErrObj != NULL) *ppErrObj = "vmstk";
CHKiRet(vmstkClassInit(NULL));
if(ppErrObj != NULL) *ppErrObj = "sysvar";
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 00290ee5..899f5e13 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -254,6 +254,10 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_INVLD_TIME = -2107, /**< invalid timestamp (e.g. could not be parsed) */
RS_RET_NO_ZIP = -2108, /**< ZIP functionality is not present */
RS_RET_CODE_ERR = -2109, /**< program code (internal) error */
+ RS_RET_FUNC_NO_LPAREN = -2110, /**< left parenthesis missing after function call (rainerscript) */
+ RS_RET_FUNC_MISSING_EXPR = -2111, /**< no expression after comma in function call (rainerscript) */
+ RS_RET_INVLD_NBR_ARGUMENTS = -2112, /**< invalid number of arguments for function call (rainerscript) */
+ RS_RET_INVLD_FUNC = -2113, /**< invalid function name for function call (rainerscript) */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
diff --git a/runtime/srutils.c b/runtime/srutils.c
index 1280e40d..d01ca20d 100644
--- a/runtime/srutils.c
+++ b/runtime/srutils.c
@@ -458,7 +458,7 @@ srSleep(int iSeconds, int iuSeconds)
* Added 2008-01-30
*/
char *rs_strerror_r(int errnum, char *buf, size_t buflen) {
-#ifdef __hpux
+#ifndef HAVE_STRERROR_R
char *pszErr;
pszErr = strerror(errnum);
snprintf(buf, buflen, "%s", pszErr);
diff --git a/runtime/vm.c b/runtime/vm.c
index bc6c3dd2..a25476c2 100644
--- a/runtime/vm.c
+++ b/runtime/vm.c
@@ -331,6 +331,34 @@ finalize_it:
ENDop(PUSHSYSVAR)
+/* The function call operation is only very roughly implemented. While the plumbing
+ * to reach this instruction is fine, the instruction itself currently supports only
+ * functions with a single argument AND with a name that we know.
+ * TODO: later, we can add here the real logic, that involves looking up function
+ * names, loading them dynamically ... and all that...
+ * implementation begun 2009-03-10 by rgerhards
+ */
+BEGINop(FUNC_CALL) /* remember to set the instruction also in the ENDop macro! */
+ var_t *numOperands;
+ var_t *operand1;
+ int iStrlen;
+CODESTARTop(FUNC_CALL)
+ vmstk.PopNumber(pThis->pStk, &numOperands);
+ if(numOperands->val.num != 1)
+ ABORT_FINALIZE(RS_RET_INVLD_NBR_ARGUMENTS);
+ vmstk.PopString(pThis->pStk, &operand1); /* guess there's just one ;) */
+ if(!rsCStrSzStrCmp(pOp->operand.pVar->val.pStr, (uchar*) "strlen", 6)) { /* only one supported so far ;) */
+RUNLOG_VAR("%s", rsCStrGetSzStr(operand1->val.pStr));
+ iStrlen = strlen((char*) rsCStrGetSzStr(operand1->val.pStr));
+RUNLOG_VAR("%d", iStrlen);
+ } else
+ ABORT_FINALIZE(RS_RET_INVLD_FUNC);
+ PUSHRESULTop(operand1, iStrlen); // TODO: dummy, FIXME
+ var.Destruct(&numOperands); /* no longer needed */
+finalize_it:
+ENDop(FUNC_CALL)
+
+
/* ------------------------------ end instruction set implementation ------------------------------ */
@@ -412,6 +440,7 @@ execProg(vm_t *pThis, vmprg_t *pProg)
doOP(DIV);
doOP(MOD);
doOP(UNARY_MINUS);
+ doOP(FUNC_CALL);
default:
ABORT_FINALIZE(RS_RET_INVALID_VMOP);
dbgoprint((obj_t*) pThis, "invalid instruction %d in vmprg\n", pCurrOp->opcode);
diff --git a/runtime/vmop.c b/runtime/vmop.c
index fcacb15b..a343481e 100644
--- a/runtime/vmop.c
+++ b/runtime/vmop.c
@@ -61,24 +61,25 @@ rsRetVal vmopConstructFinalize(vmop_t __attribute__((unused)) *pThis)
/* destructor for the vmop object */
BEGINobjDestruct(vmop) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(vmop)
- if( pThis->opcode == opcode_PUSHSYSVAR
- || pThis->opcode == opcode_PUSHMSGVAR
- || pThis->opcode == opcode_PUSHCONSTANT) {
- if(pThis->operand.pVar != NULL)
- var.Destruct(&pThis->operand.pVar);
- }
+ if(pThis->operand.pVar != NULL)
+ var.Destruct(&pThis->operand.pVar);
ENDobjDestruct(vmop)
/* DebugPrint support for the vmop object */
BEGINobjDebugPrint(vmop) /* be sure to specify the object type also in END and CODESTART macros! */
uchar *pOpcodeName;
+ cstr_t *pStrVar;
CODESTARTobjDebugPrint(vmop)
vmopOpcode2Str(pThis, &pOpcodeName);
- dbgoprint((obj_t*) pThis, "opcode: %d\t(%s), next %p, var in next line\n", (int) pThis->opcode, pOpcodeName,
- pThis->pNext);
- if(pThis->operand.pVar != NULL)
- var.DebugPrint(pThis->operand.pVar);
+ CHKiRet(rsCStrConstruct(&pStrVar));
+ CHKiRet(rsCStrFinish(&pStrVar));
+ if(pThis->operand.pVar != NULL) {
+ CHKiRet(var.Obj2Str(pThis->operand.pVar, pStrVar));
+ }
+ dbgoprint((obj_t*) pThis, "%.12s\t%s\n", pOpcodeName, rsCStrGetSzStrNoNULL(pStrVar));
+ rsCStrDestruct(&pStrVar);
+finalize_it:
ENDobjDebugPrint(vmop)
@@ -158,37 +159,37 @@ vmopOpcode2Str(vmop_t *pThis, uchar **ppName)
*ppName = (uchar*) "and";
break;
case opcode_PLUS:
- *ppName = (uchar*) "+";
+ *ppName = (uchar*) "add";
break;
case opcode_MINUS:
- *ppName = (uchar*) "-";
+ *ppName = (uchar*) "sub";
break;
case opcode_TIMES:
- *ppName = (uchar*) "*";
+ *ppName = (uchar*) "mul";
break;
case opcode_DIV:
- *ppName = (uchar*) "/";
+ *ppName = (uchar*) "div";
break;
case opcode_MOD:
- *ppName = (uchar*) "%";
+ *ppName = (uchar*) "mod";
break;
case opcode_NOT:
*ppName = (uchar*) "not";
break;
case opcode_CMP_EQ:
- *ppName = (uchar*) "==";
+ *ppName = (uchar*) "cmp_==";
break;
case opcode_CMP_NEQ:
- *ppName = (uchar*) "!=";
+ *ppName = (uchar*) "cmp_!=";
break;
case opcode_CMP_LT:
- *ppName = (uchar*) "<";
+ *ppName = (uchar*) "cmp_<";
break;
case opcode_CMP_GT:
- *ppName = (uchar*) ">";
+ *ppName = (uchar*) "cmp_>";
break;
case opcode_CMP_LTEQ:
- *ppName = (uchar*) "<=";
+ *ppName = (uchar*) "cmp_<=";
break;
case opcode_CMP_CONTAINS:
*ppName = (uchar*) "contains";
@@ -197,28 +198,31 @@ vmopOpcode2Str(vmop_t *pThis, uchar **ppName)
*ppName = (uchar*) "startswith";
break;
case opcode_CMP_GTEQ:
- *ppName = (uchar*) ">=";
+ *ppName = (uchar*) "cmp_>=";
break;
case opcode_PUSHSYSVAR:
- *ppName = (uchar*) "PUSHSYSVAR";
+ *ppName = (uchar*) "push_sysvar";
break;
case opcode_PUSHMSGVAR:
- *ppName = (uchar*) "PUSHMSGVAR";
+ *ppName = (uchar*) "push_msgvar";
break;
case opcode_PUSHCONSTANT:
- *ppName = (uchar*) "PUSHCONSTANT";
+ *ppName = (uchar*) "push_const";
break;
case opcode_POP:
- *ppName = (uchar*) "POP";
+ *ppName = (uchar*) "pop";
break;
case opcode_UNARY_MINUS:
- *ppName = (uchar*) "UNARY_MINUS";
+ *ppName = (uchar*) "unary_minus";
break;
case opcode_STRADD:
- *ppName = (uchar*) "STRADD";
+ *ppName = (uchar*) "strconcat";
+ break;
+ case opcode_FUNC_CALL:
+ *ppName = (uchar*) "func_call";
break;
default:
- *ppName = (uchar*) "INVALID opcode";
+ *ppName = (uchar*) "!invalid_opcode!";
break;
}
diff --git a/runtime/vmop.h b/runtime/vmop.h
index c3d5d5f4..938b08fd 100644
--- a/runtime/vmop.h
+++ b/runtime/vmop.h
@@ -59,17 +59,44 @@ typedef enum { /* do NOT start at 0 to detect uninitialized types after calloc(
opcode_PUSHMSGVAR = 1002, /* requires var operand */
opcode_PUSHCONSTANT = 1003, /* requires var operand */
opcode_UNARY_MINUS = 1010,
- opcode_END_PROG = 1011
+ opcode_FUNC_CALL = 1012,
+ opcode_END_PROG = 2000
} opcode_t;
+/* Additional doc, operation specific
+
+ FUNC_CALL
+ All parameter passing is via the stack. Parameters are placed onto the stack in reverse order,
+ that means the last parameter is on top of the stack, the first at the bottom location.
+ At the actual top of the stack is the number of parameters. This permits functions to be
+ called with variable number of arguments. The function itself is responsible for poping
+ the right number of parameters of the stack and complaining if the number is incorrect.
+ On exit, a single return value must be pushed onto the stack. The FUNC_CALL operation
+ is generic. Its pVar argument contains the function name string (TODO: very slow, make
+ faster in later releases).
+
+ Sample Function call: sampleFunc(p1, p2, p3) ; returns number 4711 (sample)
+ Stacklayout on entry (order is top to bottom):
+ 3
+ p3
+ p2
+ p1
+ ... other vars ...
+
+ Stack on exit
+ 4711
+ ... other vars ...
+
+ */
+
+
/* the vmop object */
typedef struct vmop_s {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
opcode_t opcode;
union {
- var_t *pVar;
- /* TODO: add function pointer */
+ var_t *pVar; /* for function call, this is the name (string) of function to be called */
} operand;
struct vmop_s *pNext; /* next operation or NULL, if end of program (logically this belongs to vmprg) */
} vmop_t;
diff --git a/runtime/vmprg.c b/runtime/vmprg.c
index 705e6948..75915025 100644
--- a/runtime/vmprg.c
+++ b/runtime/vmprg.c
@@ -74,7 +74,7 @@ ENDobjDestruct(vmprg)
BEGINobjDebugPrint(vmprg) /* be sure to specify the object type also in END and CODESTART macros! */
vmop_t *pOp;
CODESTARTobjDebugPrint(vmprg)
- dbgoprint((obj_t*) pThis, "program contents:\n");
+ dbgoprint((obj_t*) pThis, "VM Program:\n");
for(pOp = pThis->vmopRoot ; pOp != NULL ; pOp = pOp->pNext) {
vmop.DebugPrint(pOp);
}
diff --git a/runtime/wti.c b/runtime/wti.c
index e8a77474..9b3450e6 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -39,6 +39,11 @@
#include <pthread.h>
#include <errno.h>
+#ifdef OS_SOLARIS
+# include <sched.h>
+# define pthread_yield() sched_yield()
+#endif
+
#include "rsyslog.h"
#include "stringbuf.h"
#include "srUtils.h"
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 06173e04..41903576 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -40,6 +40,11 @@
#include <unistd.h>
#include <errno.h>
+#ifdef OS_SOLARIS
+# include <sched.h>
+# define pthread_yield() sched_yield()
+#endif
+
#include "rsyslog.h"
#include "stringbuf.h"
#include "srUtils.h"
diff --git a/tools/Makefile.am b/tools/Makefile.am
index 776279a5..e523b854 100644
--- a/tools/Makefile.am
+++ b/tools/Makefile.am
@@ -22,7 +22,7 @@ rsyslogd_SOURCES = \
\
../dirty.h
rsyslogd_CPPFLAGS = $(PTHREADS_CFLAGS) $(RSRT_CFLAGS)
-rsyslogd_LDADD = $(ZLIB_LIBS) $(PTHREADS_LIBS) $(RSRT_LIBS)
+rsyslogd_LDADD = $(ZLIB_LIBS) $(PTHREADS_LIBS) $(RSRT_LIBS) $(SOL_LIBS)
rsyslogd_LDFLAGS = -export-dynamic
if ENABLE_DIAGTOOLS
diff --git a/tools/omfile.c b/tools/omfile.c
index 1539ae19..ea91d6ef 100644
--- a/tools/omfile.c
+++ b/tools/omfile.c
@@ -44,6 +44,10 @@
#include <unistd.h>
#include <sys/file.h>
+#ifdef OS_SOLARIS
+# include <fcntl.h>
+#endif
+
#include "syslogd.h"
#include "syslogd-types.h"
#include "srUtils.h"
diff --git a/tools/syslogd.c b/tools/syslogd.c
index eb496521..235bc52e 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -72,13 +72,18 @@
#include <stdarg.h>
#include <time.h>
#include <assert.h>
-#include <libgen.h>
-#ifdef __sun
+#ifdef OS_SOLARIS
# include <errno.h>
+# include <fcntl.h>
+# include <stropts.h>
+# include <sys/termios.h>
+# include <sys/types.h>
#else
+# include <libgen.h>
# include <sys/errno.h>
#endif
+
#include <sys/ioctl.h>
#include <sys/wait.h>
#include <sys/file.h>
@@ -279,7 +284,7 @@ static int gidDropPriv = 0; /* group-id to which priveleges should be dropped to
extern int errno;
/* main message queue and its configuration parameters */
-static queue_t *pMsgQueue = NULL; /* the main message queue */
+static qqueue_t *pMsgQueue = NULL; /* the main message queue */
static int iMainMsgQueueSize = 10000; /* size of the main message queue above */
static int iMainMsgQHighWtrMark = 8000; /* high water mark for disk-assisted queues */
static int iMainMsgQLowWtrMark = 2000; /* low water mark for disk-assisted queues */
@@ -1622,7 +1627,7 @@ submitMsg(msg_t *pMsg)
ISOBJ_TYPE_assert(pMsg, msg);
MsgPrepareEnqueue(pMsg);
- queueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg);
+ qqueueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg);
RETiRet;
}
@@ -1683,7 +1688,7 @@ logmsg(msg_t *pMsg, int flags)
/* now submit the message to the main queue - then we are done */
pMsg->msgFlags = flags;
MsgPrepareEnqueue(pMsg);
- queueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg);
+ qqueueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg);
ENDfunc
}
@@ -1981,7 +1986,7 @@ die(int sig)
/* drain queue (if configured so) and stop main queue worker thread pool */
dbgprintf("Terminating main queue...\n");
- queueDestruct(&pMsgQueue);
+ qqueueDestruct(&pMsgQueue);
pMsgQueue = NULL;
/* Free ressources and close connections. This includes flushing any remaining
@@ -2271,8 +2276,8 @@ static void dbgPrintInitInfo(void)
static int iMainMsgQtoWrkMinMsgs = 100;
static int iMainMsgQbSaveOnShutdown = 1;
iMainMsgQueMaxDiskSpace = 0;
- setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100);
- setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1);
+ setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100);
+ setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1);
*/
dbgprintf("Work Directory: '%s'.\n", glbl.GetWorkDir());
}
@@ -2334,7 +2339,7 @@ init(void)
/* delete the message queue, which also flushes all messages left over */
if(pMsgQueue != NULL) {
dbgprintf("deleting main message queue\n");
- queueDestruct(&pMsgQueue); /* delete pThis here! */
+ qqueueDestruct(&pMsgQueue); /* delete pThis here! */
pMsgQueue = NULL;
}
@@ -2446,7 +2451,7 @@ init(void)
}
/* create message queue */
- CHKiRet_Hdlr(queueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueNumWorkers, iMainMsgQueueSize, msgConsumer)) {
+ CHKiRet_Hdlr(qqueueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueNumWorkers, iMainMsgQueueSize, msgConsumer)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
exit(1);
@@ -2464,29 +2469,29 @@ init(void)
errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
}
- setQPROP(queueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize);
- setQPROP(queueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace);
- setQPROPstr(queueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName);
- setQPROP(queueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt);
- setQPROP(queueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown );
- setQPROP(queueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown);
- setQPROP(queueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", iMainMsgQtoWrkShutdown);
- setQPROP(queueSettoEnq, "$MainMsgQueueTimeoutEnqueue", iMainMsgQtoEnq);
- setQPROP(queueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", iMainMsgQHighWtrMark);
- setQPROP(queueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", iMainMsgQLowWtrMark);
- setQPROP(queueSetiDiscardMrk, "$MainMsgQueueDiscardMark", iMainMsgQDiscardMark);
- setQPROP(queueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity);
- setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs);
- setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown);
- setQPROP(queueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown);
- setQPROP(queueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", iMainMsgQueueDeqtWinFromHr);
- setQPROP(queueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", iMainMsgQueueDeqtWinToHr);
+ setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize);
+ setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace);
+ setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName);
+ setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt);
+ setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown );
+ setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown);
+ setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", iMainMsgQtoWrkShutdown);
+ setQPROP(qqueueSettoEnq, "$MainMsgQueueTimeoutEnqueue", iMainMsgQtoEnq);
+ setQPROP(qqueueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", iMainMsgQHighWtrMark);
+ setQPROP(qqueueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", iMainMsgQLowWtrMark);
+ setQPROP(qqueueSetiDiscardMrk, "$MainMsgQueueDiscardMark", iMainMsgQDiscardMark);
+ setQPROP(qqueueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity);
+ setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs);
+ setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown);
+ setQPROP(qqueueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown);
+ setQPROP(qqueueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", iMainMsgQueueDeqtWinFromHr);
+ setQPROP(qqueueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", iMainMsgQueueDeqtWinToHr);
# undef setQPROP
# undef setQPROPstr
/* ... and finally start the queue! */
- CHKiRet_Hdlr(queueStart(pMsgQueue)) {
+ CHKiRet_Hdlr(qqueueStart(pMsgQueue)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not start message queue - rsyslogd can not run!\n", iRet);
exit(1);
@@ -3095,7 +3100,7 @@ GlobalClassExit(void)
CHKiRet(strmClassInit(NULL));
CHKiRet(wtiClassInit(NULL));
CHKiRet(wtpClassInit(NULL));
- CHKiRet(queueClassInit(NULL));
+ CHKiRet(qqueueClassInit(NULL));
CHKiRet(vmstkClassInit(NULL));
CHKiRet(sysvarClassInit(NULL));
CHKiRet(vmClassInit(NULL));