summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-24 17:55:09 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-24 17:55:09 +0000
commit5c686c8adcc473cbdbb14e4b2d736f9123210ee6 (patch)
treeeb83fbca0d98ac4948b6d9ca22d8a0e4828815a9
parent76782c240db52c81825c907c40c31ca8b48218de (diff)
downloadrsyslog-5c686c8adcc473cbdbb14e4b2d736f9123210ee6.tar.gz
rsyslog-5c686c8adcc473cbdbb14e4b2d736f9123210ee6.tar.xz
rsyslog-5c686c8adcc473cbdbb14e4b2d736f9123210ee6.zip
redesigned queue to utilize helper classes for threading support. This is
finally in a running state for regular (non disk-assisted) queues, with a minor nit at shutdown. So I can finally commit the work again to CVS...
-rw-r--r--Makefile.am6
-rw-r--r--action.c10
-rw-r--r--cfsysline.c38
-rw-r--r--debug.c6
-rw-r--r--doc/status.html4
-rw-r--r--iminternal.c12
-rw-r--r--linkedlist.c24
-rw-r--r--module-template.h48
-rw-r--r--modules.c12
-rw-r--r--msg.c36
-rw-r--r--net.c28
-rw-r--r--obj-types.h38
-rw-r--r--obj.c36
-rw-r--r--obj.h14
-rw-r--r--objomsr.c2
-rw-r--r--omfile.c28
-rw-r--r--omfwd.c8
-rw-r--r--parse.c50
-rw-r--r--plugins/imuxsock/imuxsock.c9
-rw-r--r--queue.c1579
-rw-r--r--queue.h40
-rw-r--r--rsyslog.h11
-rwxr-xr-xsrUtils.c4
-rw-r--r--stream.c46
-rwxr-xr-xstringbuf.c36
-rw-r--r--syslogd.c156
-rw-r--r--tcpsyslog.c4
-rw-r--r--template.c2
-rw-r--r--threads.c9
-rw-r--r--wti.c53
-rw-r--r--wti.h9
-rw-r--r--wtp.c312
-rw-r--r--wtp.h31
33 files changed, 1012 insertions, 1689 deletions
diff --git a/Makefile.am b/Makefile.am
index f15fe567..06602861 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -5,6 +5,8 @@ rfc3195d_SOURCES = rfc3195d.c rsyslog.h
rsyslogd_SOURCES = \
syslogd.c \
syslogd.h \
+ debug.c \
+ debug.h \
glbl.h \
pidfile.c \
pidfile.h \
@@ -24,6 +26,10 @@ rsyslogd_SOURCES = \
threads.h \
stream.c \
stream.h \
+ wtp.c \
+ wtp.h \
+ wti.c \
+ wti.h \
queue.c \
queue.h \
sync.c \
diff --git a/action.c b/action.c
index 04e72a94..2410c76e 100644
--- a/action.c
+++ b/action.c
@@ -84,7 +84,7 @@ rsRetVal actionConstruct(action_t **ppThis)
finalize_it:
*ppThis = pThis;
- return iRet;
+ RETiRet;
}
@@ -97,7 +97,7 @@ static rsRetVal actionResume(action_t *pThis)
assert(pThis != NULL);
pThis->bSuspended = 0;
- return iRet;
+ RETiRet;
}
@@ -121,7 +121,7 @@ rsRetVal actionSuspend(action_t *pThis)
pThis->ttResumeRtry = time(NULL) + pThis->iResumeInterval;
pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */
- return iRet;
+ RETiRet;
}
/* try to resume an action -- rgerhards, 2007-08-02
@@ -162,7 +162,7 @@ rsRetVal actionTryResume(action_t *pThis)
dbgprintf("actionTryResume: iRet: %d, next retry (if applicable): %u [now %u]\n",
iRet, (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
- return iRet;
+ RETiRet;
}
@@ -187,7 +187,7 @@ rsRetVal actionDbgPrint(action_t *pThis)
printf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
printf("\n");
- return iRet;
+ RETiRet;
}
diff --git a/cfsysline.c b/cfsysline.c
index 337d362a..7cbd5884 100644
--- a/cfsysline.c
+++ b/cfsysline.c
@@ -74,7 +74,7 @@ static rsRetVal doGetChar(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *
}
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -92,7 +92,7 @@ static rsRetVal doCustomHdlr(uchar **pp, rsRetVal (*pSetHdlr)(uchar**, void*), v
CHKiRet(pSetHdlr(pp, pVal));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -128,7 +128,7 @@ static rsRetVal parseIntVal(uchar **pp, size_t *pVal)
*pp = p;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -158,7 +158,7 @@ static rsRetVal doGetInt(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *p
*pp = p;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -209,7 +209,7 @@ static rsRetVal doGetSize(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *
}
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -270,7 +270,7 @@ static rsRetVal doFileCreateMode(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t),
*pp = p;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -347,7 +347,7 @@ static rsRetVal doGetGID(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *p
skipWhiteSpace(pp); /* skip over any whitespace */
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -389,7 +389,7 @@ static rsRetVal doGetUID(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *p
skipWhiteSpace(pp); /* skip over any whitespace */
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -420,7 +420,7 @@ static rsRetVal doBinaryOptionLine(uchar **pp, rsRetVal (*pSetHdlr)(void*, int),
skipWhiteSpace(pp); /* skip over any whitespace */
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -481,7 +481,7 @@ finalize_it:
rsCStrDestruct(pStrB);
}
- return iRet;
+ RETiRet;
}
@@ -514,7 +514,7 @@ static rsRetVal cslchConstruct(cslCmdHdlr_t **ppThis)
finalize_it:
*ppThis = pThis;
- return iRet;
+ RETiRet;
}
/* destructor for linked list keys. As we do not use any dynamic memory,
@@ -606,7 +606,7 @@ static rsRetVal cslchCallHdlr(cslCmdHdlr_t *pThis, uchar **ppConfLine)
CHKiRet(pHdlr(ppConfLine, pThis->cslCmdHdlr, pThis->pData));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -655,7 +655,7 @@ static rsRetVal cslcConstruct(cslCmd_t **ppThis, int bChainingPermitted)
finalize_it:
*ppThis = pThis;
- return iRet;
+ RETiRet;
}
@@ -678,7 +678,7 @@ finalize_it:
cslchDestruct(pCmdHdlr);
}
- return iRet;
+ RETiRet;
}
@@ -692,7 +692,7 @@ rsRetVal cfsyslineInit(void)
CHKiRet(llInit(&llCmdList, cslcDestruct, cslcKeyDestruct, strcasecmp));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -740,7 +740,7 @@ rsRetVal regCfSysLineHdlr(uchar *pCmdName, int bChainingPermitted, ecslCmdHdrlTy
}
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -774,7 +774,7 @@ DEFFUNC_llExecFunc(unregHdlrsHeadExec)
}
finalize_it:
- return iRet;
+ RETiRet;
}
/* unregister and destroy cfSysLineHandlers for a specific owner. This method is
* most importantly used before unloading a loadable module providing some handlers.
@@ -790,7 +790,7 @@ rsRetVal unregCfSysLineHdlrs4Owner(void *pOwnerCookie)
*/
iRet = llExecFunc(&llCmdList, unregHdlrsHeadExec, pOwnerCookie);
- return iRet;
+ RETiRet;
}
@@ -844,7 +844,7 @@ rsRetVal processCfSysLineCommand(uchar *pCmdName, uchar **p)
iRet = iRetLL;
finalize_it:
- return iRet;
+ RETiRet;
}
diff --git a/debug.c b/debug.c
index 1ccb3d60..e473504e 100644
--- a/debug.c
+++ b/debug.c
@@ -289,11 +289,6 @@ dbgMutLog_t *dbgMutLogAddEntry(pthread_mutex_t *pmut, short mutexOp, dbgFuncDB_t
pLog->pFuncDB = pFuncDB;
DLL_Add(MutLog, pLog);
-//RUNLOG_VAR("%p", pLog);
-//RUNLOG_VAR("%p", dbgMutLogListRoot);
-//RUNLOG_VAR("%p", dbgMutLogListLast);
-//RUNLOG_VAR("%p", pLog->pNext);
-//RUNLOG_VAR("%p", pLog->pPrev);
return pLog;
}
@@ -520,7 +515,6 @@ int dbgCondTimedWait(pthread_cond_t *cond, pthread_mutex_t *pmut, const struct t
dbgprintf("%s:%d:%s: mutex %p waiting on condition %p (with timeout)\n", pFuncDB->file, pFuncDB->line, pFuncDB->func,
(void*)pmut, (void*)cond);
ret = pthread_cond_timedwait(cond, pmut, abstime);
-RUNLOG;
dbgMutexLockLog(pmut, pFuncDB, ln);
return ret;
}
diff --git a/doc/status.html b/doc/status.html
index 7950cfdb..226a2eb2 100644
--- a/doc/status.html
+++ b/doc/status.html
@@ -13,8 +13,8 @@
rsyslog v3 compatibility document!</a></font></b><br>
Documentation for 3.x is currently sparse. If you need assistance, please
<a href="http://www.rsyslog.com/PNphpBB2.phtml">post in the rsyslog forums</a>!</p>
-<p><b>stable:</b> 2.0.0 - <a href="http://www.rsyslog.com/Article155.phtml">change log</a> -
-<a href="http://www.rsyslog.com/Downloads-index-req-getit-lid-70.phtml">download</a></p>
+<p><b>stable:</b> 2.0.1 - <a href="http://www.rsyslog.com/Article165.phtml">change log</a> -
+<a href="http://www.rsyslog.com/Downloads-index-req-getit-lid-73.phtml">download</a></p>
<p>&nbsp;(<a href="version_naming.html">How are versions named?</a>)</p>
<h2>Platforms</h2>
<p>Thankfully, a number of folks have begin to build packages and help port
diff --git a/iminternal.c b/iminternal.c
index 86d5097c..fb70d062 100644
--- a/iminternal.c
+++ b/iminternal.c
@@ -53,7 +53,7 @@ static rsRetVal iminternalDestruct(iminternal_t *pThis)
free(pThis);
- return iRet;
+ RETiRet;
}
@@ -78,7 +78,7 @@ finalize_it:
*ppThis = pThis;
- return iRet;
+ RETiRet;
}
@@ -111,7 +111,7 @@ finalize_it:
iminternalDestruct(pThis);
}
- return iRet;
+ RETiRet;
}
@@ -142,7 +142,7 @@ rsRetVal iminternalRemoveMsg(int *pPri, msg_t **ppMsg, int *pFlags)
}
finalize_it:
- return iRet;
+ RETiRet;
}
/* tell the caller if we have any messages ready for processing.
@@ -166,7 +166,7 @@ rsRetVal modInitIminternal(void)
iRet = llInit(&llMsgs, iminternalDestruct, NULL, NULL);
- return iRet;
+ RETiRet;
}
@@ -182,7 +182,7 @@ rsRetVal modExitIminternal(void)
iRet = llDestroy(&llMsgs);
- return iRet;
+ RETiRet;
}
/*
diff --git a/linkedlist.c b/linkedlist.c
index ff1320e5..9adf40c4 100644
--- a/linkedlist.c
+++ b/linkedlist.c
@@ -79,7 +79,7 @@ static rsRetVal llDestroyElt(linkedList_t *pList, llElt_t *pElt)
free(pElt);
pList->iNumElts--; /* one less */
- return iRet;
+ RETiRet;
}
@@ -106,7 +106,7 @@ rsRetVal llDestroy(linkedList_t *pThis)
pThis->pRoot = NULL;
pThis->pLast = NULL;
- return iRet;
+ RETiRet;
}
/* llDestroyRootElt - destroy the root element but otherwise
@@ -134,7 +134,7 @@ rsRetVal llDestroyRootElt(linkedList_t *pThis)
CHKiRet(llDestroyElt(pThis, pPrev));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -166,7 +166,7 @@ rsRetVal llGetNextElt(linkedList_t *pThis, linkedListCookie_t *ppElt, void **ppU
*ppElt = pElt;
- return iRet;
+ RETiRet;
}
@@ -204,7 +204,7 @@ static rsRetVal llEltConstruct(llElt_t **ppThis, void *pKey, void *pData)
finalize_it:
*ppThis = pThis;
- return iRet;
+ RETiRet;
}
@@ -227,7 +227,7 @@ rsRetVal llAppend(linkedList_t *pThis, void *pKey, void *pData)
pThis->pLast = pElt;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -267,7 +267,7 @@ static rsRetVal llUnlinkAndDelteElt(linkedList_t *pThis, llElt_t *pElt, llElt_t
CHKiRet(llDestroyElt(pThis, pElt));
finalize_it:
- return iRet;
+ RETiRet;
}
/* find a user element based on the provided key - this is the
@@ -305,7 +305,7 @@ static rsRetVal llFindElt(linkedList_t *pThis, void *pKey, llElt_t **ppElt, llEl
} else
iRet = RS_RET_NOT_FOUND;
- return iRet;
+ RETiRet;
}
@@ -323,7 +323,7 @@ rsRetVal llFind(linkedList_t *pThis, void *pKey, void **ppData)
*ppData = pElt->pData;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -345,7 +345,7 @@ rsRetVal llFindAndDelete(linkedList_t *pThis, void *pKey)
CHKiRet(llUnlinkAndDelteElt(pThis, pElt, pEltPrev));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -360,7 +360,7 @@ rsRetVal llGetNumElts(linkedList_t *pThis, int *piCnt)
*piCnt = pThis->iNumElts;
- return iRet;
+ RETiRet;
}
@@ -404,7 +404,7 @@ rsRetVal llExecFunc(linkedList_t *pThis, rsRetVal (*pFunc)(void*, void*), void*
iRet = iRetLL;
finalize_it:
- return iRet;
+ RETiRet;
}
diff --git a/module-template.h b/module-template.h
index ecd6fe6d..c92ec25a 100644
--- a/module-template.h
+++ b/module-template.h
@@ -100,12 +100,13 @@ static rsRetVal createInstance(instanceData **ppData)\
#define CODESTARTcreateInstance \
if((pData = calloc(1, sizeof(instanceData))) == NULL) {\
*ppData = NULL;\
+ ENDfunc \
return RS_RET_OUT_OF_MEMORY;\
}
#define ENDcreateInstance \
*ppData = pData;\
- return iRet;\
+ RETiRet;\
}
/* freeInstance()
@@ -129,7 +130,7 @@ static rsRetVal freeInstance(void* pModData)\
#define ENDfreeInstance \
if(pData != NULL)\
free(pData); /* we need to free this in any case */\
- return iRet;\
+ RETiRet;\
}
/* isCompatibleWithFeature()
@@ -137,12 +138,13 @@ static rsRetVal freeInstance(void* pModData)\
#define BEGINisCompatibleWithFeature \
static rsRetVal isCompatibleWithFeature(syslogFeature __attribute__((unused)) eFeat)\
{\
- rsRetVal iRet = RS_RET_INCOMPATIBLE;
+ rsRetVal iRet = RS_RET_INCOMPATIBLE; \
+ BEGINfunc
#define CODESTARTisCompatibleWithFeature
#define ENDisCompatibleWithFeature \
- return iRet;\
+ RETiRet;\
}
/* doAction()
@@ -156,7 +158,7 @@ static rsRetVal doAction(uchar __attribute__((unused)) **ppString, unsigned __at
/* ppString may be NULL if the output module requested no strings */
#define ENDdoAction \
- return iRet;\
+ RETiRet;\
}
@@ -174,7 +176,7 @@ static rsRetVal dbgPrintInstInfo(void *pModData)\
pData = (instanceData*) pModData;
#define ENDdbgPrintInstInfo \
- return iRet;\
+ RETiRet;\
}
@@ -189,13 +191,14 @@ static rsRetVal dbgPrintInstInfo(void *pModData)\
static rsRetVal needUDPSocket(void *pModData)\
{\
rsRetVal iRet = RS_RET_FALSE;\
- instanceData *pData = NULL;
+ instanceData *pData = NULL; \
+ BEGINfunc
#define CODESTARTneedUDPSocket \
pData = (instanceData*) pModData;
#define ENDneedUDPSocket \
- return iRet;\
+ RETiRet;\
}
@@ -245,7 +248,7 @@ finalize_it:\
}
#define ENDparseSelectorAct \
- return iRet;\
+ RETiRet;\
}
@@ -267,7 +270,7 @@ static rsRetVal tryResume(instanceData __attribute__((unused)) *pData)\
assert(pData != NULL);
#define ENDtryResume \
- return iRet;\
+ RETiRet;\
}
@@ -281,14 +284,16 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\
DEFiRet;
#define CODESTARTqueryEtryPt \
- if((name == NULL) || (pEtryPoint == NULL))\
+ if((name == NULL) || (pEtryPoint == NULL)) {\
+ ENDfunc \
return RS_RET_PARAM_ERROR;\
+ } \
*pEtryPoint = NULL;
#define ENDqueryEtryPt \
if(iRet == RS_RET_OK)\
iRet = (*pEtryPoint == NULL) ? RS_RET_NOT_FOUND : RS_RET_OK;\
- return iRet;\
+ RETiRet;\
}
/* the following definition is the standard block for queryEtryPt for all types
@@ -372,13 +377,15 @@ rsRetVal modInit##uniqName(int iIFVersRequested __attribute__((unused)), int *ip
#define CODESTARTmodInit \
assert(pHostQueryEtryPt != NULL);\
- if((pQueryEtryPt == NULL) || (ipIFVersProvided == NULL))\
- return RS_RET_PARAM_ERROR;
+ if((pQueryEtryPt == NULL) || (ipIFVersProvided == NULL)) {\
+ ENDfunc \
+ return RS_RET_PARAM_ERROR; \
+ }
#define ENDmodInit \
finalize_it:\
*pQueryEtryPt = queryEtryPt;\
- return iRet;\
+ RETiRet;\
}
@@ -408,7 +415,7 @@ static rsRetVal modExit(void)\
#define CODESTARTmodExit
#define ENDmodExit \
- return iRet;\
+ RETiRet;\
}
@@ -423,10 +430,11 @@ static rsRetVal runInput(thrdInfo_t __attribute__((unused)) *pThrd)\
{\
DEFiRet;
-#define CODESTARTrunInput
+#define CODESTARTrunInput \
+ dbgSetThrdName((uchar*)__FILE__); /* we need to provide something better later */
#define ENDrunInput \
- return iRet;\
+ RETiRet;\
}
@@ -446,7 +454,7 @@ static rsRetVal willRun(void)\
#define CODESTARTwillRun
#define ENDwillRun \
- return iRet;\
+ RETiRet;\
}
@@ -465,7 +473,7 @@ static rsRetVal afterRun(void)\
#define CODESTARTafterRun
#define ENDafterRun \
- return iRet;\
+ RETiRet;\
}
diff --git a/modules.c b/modules.c
index b8fc54ef..5c252dd3 100644
--- a/modules.c
+++ b/modules.c
@@ -1,6 +1,6 @@
/* modules.c
* This is the implementation of syslogd modules object.
- * This object handles plug-ins and buil-in modules of all kind.
+ * This object handles plug-ins and build-in modules of all kind.
*
* File begun on 2007-07-22 by RGerhards
*
@@ -99,7 +99,7 @@ rsRetVal queryHostEtryPt(uchar *name, rsRetVal (**pEtryPoint)())
if(iRet == RS_RET_OK)
iRet = (*pEtryPoint == NULL) ? RS_RET_NOT_FOUND : RS_RET_OK;
- return iRet;
+ RETiRet;
}
@@ -204,7 +204,7 @@ static rsRetVal modPrepareUnload(modInfo_t *pThis)
/* END DEVEL */
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -288,7 +288,7 @@ finalize_it:
moduleDestruct(pNew);
}
- return iRet;
+ RETiRet;
}
/* Print loaded modules. This is more or less a
@@ -348,7 +348,7 @@ rsRetVal modUnloadAndDestructAll(void)
moduleDestruct(pModPrev);
}
- return iRet;
+ RETiRet;
}
@@ -374,7 +374,7 @@ rsRetVal modUnloadAndDestructDynamic(void)
}
}
- return iRet;
+ RETiRet;
}
/* vi:set ai:
*/
diff --git a/msg.c b/msg.c
index 0961afd4..3bdc1f92 100644
--- a/msg.c
+++ b/msg.c
@@ -225,7 +225,7 @@ rsRetVal MsgConstruct(msg_t **ppThis)
*ppThis = pM;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -422,7 +422,7 @@ static rsRetVal MsgSerialize(msg_t *pThis, strm_t *pStrm)
CHKiRet(objEndSerialize(pStrm));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -457,7 +457,7 @@ msg_t *MsgAddRef(msg_t *pM)
static rsRetVal aquirePROCIDFromTAG(msg_t *pM)
{
register int i;
- int iRet;
+ DEFiRet;
assert(pM != NULL);
if(pM->pCSPROCID != NULL)
@@ -480,8 +480,9 @@ static rsRetVal aquirePROCIDFromTAG(msg_t *pM)
return RS_RET_OBJ_CREATION_FAILED; /* best we can do... */
rsCStrSetAllocIncrement(pM->pCSPROCID, 16);
while((i < pM->iLenTAG) && (pM->pszTAG[i] != ']')) {
- if((iRet = rsCStrAppendChar(pM->pCSPROCID, pM->pszTAG[i])) != RS_RET_OK)
- return iRet;
+ if((iRet = rsCStrAppendChar(pM->pCSPROCID, pM->pszTAG[i])) != RS_RET_OK) {
+ RETiRet;
+ }
++i;
}
@@ -497,10 +498,11 @@ static rsRetVal aquirePROCIDFromTAG(msg_t *pM)
}
/* OK, finaally we could obtain a PROCID. So let's use it ;) */
- if((iRet = rsCStrFinish(pM->pCSPROCID)) != RS_RET_OK)
- return iRet;
+ if((iRet = rsCStrFinish(pM->pCSPROCID)) != RS_RET_OK) {
+ RETiRet;
+ }
- return RS_RET_OK;
+ RETiRet;
}
@@ -523,7 +525,7 @@ static rsRetVal aquirePROCIDFromTAG(msg_t *pM)
static rsRetVal aquireProgramName(msg_t *pM)
{
register int i;
- int iRet;
+ DEFiRet;
assert(pM != NULL);
if(pM->pCSProgName == NULL) {
@@ -538,13 +540,15 @@ static rsRetVal aquireProgramName(msg_t *pM)
&& (pM->pszTAG[i] != '\0') && (pM->pszTAG[i] != ':')
&& (pM->pszTAG[i] != '[') && (pM->pszTAG[i] != '/')
; ++i) {
- if((iRet = rsCStrAppendChar(pM->pCSProgName, pM->pszTAG[i])) != RS_RET_OK)
- return iRet;
+ if((iRet = rsCStrAppendChar(pM->pCSProgName, pM->pszTAG[i])) != RS_RET_OK) {
+ RETiRet;
+ }
+ }
+ if((iRet = rsCStrFinish(pM->pCSProgName)) != RS_RET_OK) {
+ RETiRet;
}
- if((iRet = rsCStrFinish(pM->pCSProgName)) != RS_RET_OK)
- return iRet;
}
- return RS_RET_OK;
+ RETiRet;
}
@@ -924,7 +928,7 @@ rsRetVal MsgSetAPPNAME(msg_t *pMsg, char* pszAPPNAME)
/* if we reach this point, we have the object */
iRet = rsCStrSetSzStr(pMsg->pCSAPPNAME, (uchar*) pszAPPNAME);
- return iRet;
+ RETiRet;
}
@@ -2128,7 +2132,7 @@ rsRetVal MsgSetProperty(msg_t *pThis, property_t *pProp)
memcpy(&pThis->tTIMESTAMP, &pProp->val.vSyslogTime, sizeof(struct syslogTime));
}
- return iRet;
+ RETiRet;
}
#undef isProp
diff --git a/net.c b/net.c
index d5785e48..e5807ec3 100644
--- a/net.c
+++ b/net.c
@@ -214,7 +214,7 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS
*/
logerrorInt("Internal error caused AllowedSender to be ignored, AF = %d",
iAllow->addr.NetAddr->sa_family);
- return RS_RET_ERR;
+ ABORT_FINALIZE(RS_RET_ERR);
}
/* OK, entry constructed, now lets add it to the ACL list */
iRet = AddAllowedSenderEntry(ppRoot, ppLast, iAllow, iSignificantBits);
@@ -222,7 +222,7 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS
/* we need to process a hostname ACL */
if (DisableDNS) {
logerror ("Ignoring hostname based ACLs because DNS is disabled.");
- return RS_RET_OK;
+ ABORT_FINALIZE(RS_RET_OK);
}
if (!strchr (iAllow->addr.HostWildcard, '*') &&
@@ -246,10 +246,11 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS
if (ACLAddHostnameOnFail) {
logerrorSz("Adding hostname \"%s\" to ACL as a wildcard entry.", iAllow->addr.HostWildcard);
- return AddAllowedSenderEntry(ppRoot, ppLast, iAllow, iSignificantBits);
+ iRet = AddAllowedSenderEntry(ppRoot, ppLast, iAllow, iSignificantBits);
+ FINALIZE;
} else {
logerrorSz("Hostname \"%s\" WON\'T be added to ACL.", iAllow->addr.HostWildcard);
- return RS_RET_NOENTRY;
+ ABORT_FINALIZE(RS_RET_NOENTRY);
}
}
@@ -260,13 +261,13 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS
allowIP.flags = 0;
if((allowIP.addr.NetAddr = malloc(res->ai_addrlen)) == NULL) {
glblHadMemShortage = 1;
- return RS_RET_OUT_OF_MEMORY;
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
memcpy(allowIP.addr.NetAddr, res->ai_addr, res->ai_addrlen);
if((iRet = AddAllowedSenderEntry(ppRoot, ppLast, &allowIP, iSignificantBits))
!= RS_RET_OK)
- return(iRet);
+ FINALIZE;
break;
case AF_INET6: /* IPv6 - but need to check if it is a v6-mapped IPv4 */
if(IN6_IS_ADDR_V4MAPPED (&SIN6(res->ai_addr)->sin6_addr)) {
@@ -277,7 +278,7 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS
if((allowIP.addr.NetAddr = malloc(sizeof(struct sockaddr_in)))
== NULL) {
glblHadMemShortage = 1;
- return RS_RET_OUT_OF_MEMORY;
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
SIN(allowIP.addr.NetAddr)->sin_family = AF_INET;
#ifdef HAVE_STRUCT_SOCKADDR_SA_LEN
@@ -291,7 +292,7 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS
if((iRet = AddAllowedSenderEntry(ppRoot, ppLast, &allowIP,
iSignificantBits))
!= RS_RET_OK)
- return(iRet);
+ FINALIZE;
} else {
/* finally add IPv6 */
@@ -299,14 +300,14 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS
allowIP.flags = 0;
if((allowIP.addr.NetAddr = malloc(res->ai_addrlen)) == NULL) {
glblHadMemShortage = 1;
- return RS_RET_OUT_OF_MEMORY;
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
memcpy(allowIP.addr.NetAddr, res->ai_addr, res->ai_addrlen);
if((iRet = AddAllowedSenderEntry(ppRoot, ppLast, &allowIP,
iSignificantBits))
!= RS_RET_OK)
- return(iRet);
+ FINALIZE;
}
break;
}
@@ -321,7 +322,8 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS
}
}
- return iRet;
+finalize_it:
+ RETiRet;
}
@@ -700,7 +702,7 @@ rsRetVal gethname(struct sockaddr_storage *f, uchar *pszHostFQDN)
}
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -833,7 +835,7 @@ rsRetVal cvthname(struct sockaddr_storage *f, uchar *pszHost, uchar *pszHostFQDN
}
finalize_it:
- return iRet;
+ RETiRet;
}
diff --git a/obj-types.h b/obj-types.h
index 9d430b72..7eff503d 100644
--- a/obj-types.h
+++ b/obj-types.h
@@ -58,9 +58,11 @@ typedef enum { /* IDs of known object "types/classes" */
OBJNull = 0, /* no valid object (we do not start at zero so we can detect calloc()) */
OBJMsg = 1,
OBJstrm = 2,
- OBJqueue = 3 /* remeber to UPDATE OBJ_NUM_IDS (below) if you add one! */
+ OBJwtp = 3,
+ OBJwti = 4,
+ OBJqueue = 5 /* remeber to UPDATE OBJ_NUM_IDS (below) if you add one! */
} objID_t;
-#define OBJ_NUM_IDS 4
+#define OBJ_NUM_IDS 6
typedef enum { /* IDs of base methods supported by all objects - used for jump table, so
* they must start at zero and be incremented. -- rgerahrds, 2008-01-04
@@ -114,6 +116,31 @@ typedef struct obj { /* the dummy struct that each derived class can be casted t
# define ISOBJ_assert(pObj)
#endif
+#define DEFpropSetMethPTR(obj, prop, dataType)\
+ rsRetVal obj##Set##prop(obj##_t *pThis, dataType *pVal)\
+ { \
+ /* DEV debug: dbgprintf("%sSet%s()\n", #obj, #prop); */\
+ pThis->prop = pVal; \
+ return RS_RET_OK; \
+ }
+#define PROTOTYPEpropSetMethPTR(obj, prop, dataType)\
+ rsRetVal obj##Set##prop(obj##_t *pThis, dataType*)
+#define DEFpropSetMeth(obj, prop, dataType)\
+ rsRetVal obj##Set##prop(obj##_t *pThis, dataType pVal)\
+ { \
+ /* DEV debug: dbgprintf("%sSet%s()\n", #obj, #prop); */\
+ pThis->prop = pVal; \
+ return RS_RET_OK; \
+ }
+#define DEFpropSetMethFP(obj, prop, dataType)\
+ rsRetVal obj##Set##prop(obj##_t *pThis, dataType)\
+ { \
+ /* DEV debug: dbgprintf("%sSet%s()\n", #obj, #prop); */\
+ pThis->prop = pVal; \
+ return RS_RET_OK; \
+ }
+#define PROTOTYPEpropSetMethFP(obj, prop, dataType)\
+ rsRetVal obj##Set##prop(obj##_t *pThis, dataType)
#define DEFpropSetMeth(obj, prop, dataType)\
rsRetVal obj##Set##prop(obj##_t *pThis, dataType pVal)\
{ \
@@ -135,7 +162,7 @@ rsRetVal objName##ClassInit(void) \
#define ENDObjClassInit(objName) \
objRegisterObj(OBJ##objName, pObjInfoOBJ); \
finalize_it: \
- return iRet; \
+ RETiRet; \
}
/* this defines both the constructor and initializer
@@ -148,7 +175,7 @@ finalize_it: \
#define ENDobjConstruct(obj) \
/* use finalize_it: before calling the macro (if you need it)! */ \
- return iRet; \
+ RETiRet; \
} \
rsRetVal obj##Construct(obj##_t **ppThis) \
{ \
@@ -166,8 +193,9 @@ finalize_it: \
\
finalize_it: \
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP \
- return iRet; \
+ RETiRet; \
}
+
#endif /* #ifndef OBJ_TYPES_H_INCLUDED */
diff --git a/obj.c b/obj.c
index e28c2747..dfd3edd0 100644
--- a/obj.c
+++ b/obj.c
@@ -100,7 +100,7 @@ rsRetVal objInfoConstruct(objInfo_t **ppThis, objID_t objID, uchar *pszName, int
*ppThis = pThis;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -152,7 +152,7 @@ static rsRetVal objSerializeHeader(strm_t *pStrm, obj_t *pObj, uchar *pszRecType
CHKiRet(strmWriteChar(pStrm, '\n'));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -171,7 +171,7 @@ dbgprintf("objBeginSerialize obj type: %x\n", objGetObjID(pStrm));
CHKiRet(objSerializeHeader(pStrm, pObj, (uchar*) "Obj"));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -195,7 +195,7 @@ rsRetVal objBeginSerializePropBag(strm_t *pStrm, obj_t *pObj)
CHKiRet(objSerializeHeader(pStrm, pObj, (uchar*) "OPB"));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -286,7 +286,7 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t prop
CHKiRet(strmWriteChar(pStrm, '\n'));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -307,7 +307,7 @@ rsRetVal objEndSerialize(strm_t *pStrm)
CHKiRet(strmRecordEnd(pStrm));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -334,7 +334,7 @@ static rsRetVal objDeserializeLong(long *pInt, strm_t *pStrm)
*pInt = i;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -368,7 +368,7 @@ finalize_it:
if(iRet != RS_RET_OK && pCStr != NULL)
rsCStrDestruct(pCStr);
- return iRet;
+ RETiRet;
}
@@ -400,7 +400,7 @@ static rsRetVal objDeserializeSyslogTime(syslogTime_t *pTime, strm_t *pStrm)
GETVAL(OffsetMinute);
finalize_it:
- return iRet;
+ RETiRet;
}
#undef GETVAL
@@ -443,7 +443,7 @@ static rsRetVal objDeserializeHeader(uchar *pszRecType, objID_t *poID, int* poVe
*poVers = oVers;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -515,7 +515,7 @@ static rsRetVal objDeserializeProperty(property_t *pProp, strm_t *pStrm)
if(c != '\n') ABORT_FINALIZE(RS_RET_INVALID_PROPFRAME);
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -538,7 +538,7 @@ static rsRetVal objDeserializeTrailer(strm_t *pStrm)
NEXTC; if(c != '\n') ABORT_FINALIZE(RS_RET_INVALID_TRAILER);
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -578,7 +578,7 @@ static rsRetVal objDeserializeTryRecover(strm_t *pStrm)
finalize_it:
dbgprintf("deserializer has possibly been able to re-sync and recover, state %d\n", iRet);
- return iRet;
+ RETiRet;
}
@@ -608,7 +608,7 @@ static rsRetVal objDeserializeProperties(obj_t *pObj, objID_t oID, strm_t *pStrm
CHKiRet(objDeserializeTrailer(pStrm)); /* do trailer checks */
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -668,7 +668,7 @@ finalize_it:
if(iRet != RS_RET_OK && pObj != NULL)
free(pObj); // TODO: check if we can call destructor 2008-01-13 rger
- return iRet;
+ RETiRet;
}
@@ -711,7 +711,7 @@ dbgprintf("objDese...AsPropBag 2\n");
CHKiRet(objDeserializeProperties(pObj, oID, pStrm));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -756,7 +756,7 @@ rsRetVal objDeserializePropBag(obj_t *pObj, strm_t *pStrm)
CHKiRet(objDeserializeProperties(pObj, oID, pStrm));
finalize_it:
- return iRet;
+ RETiRet;
}
#undef NEXTC /* undef helper macro */
@@ -780,7 +780,7 @@ rsRetVal objRegisterObj(objID_t oID, objInfo_t *pInfo)
arrObjInfo[oID] = pInfo;
finalize_it:
- return iRet;
+ RETiRet;
}
diff --git a/obj.h b/obj.h
index 1f2aae63..6b965e10 100644
--- a/obj.h
+++ b/obj.h
@@ -85,20 +85,6 @@
#define OBJSetMethodHandler(methodID, pHdlr) \
CHKiRet(objInfoSetMethod(pObjInfoOBJ, methodID, (rsRetVal (*)(void*)) pHdlr))
-/* debug aides */
-#if 0
-#define d_pthread_mutex_lock(x) {dbgprintf("mutex %p lock %s, %s(), line %d\n", (void*)x, __FILE__, __func__, __LINE__); \
- pthread_mutex_lock(x); \
- if(1)dbgprintf("mutex %p lock aquired %s, %s(), line %d\n",(void*)x, __FILE__, __func__, __LINE__); \
- }
-#define d_pthread_mutex_unlock(x) {dbgprintf("mutex %p UNlock %s, %s(), line %d\n", (void*)x ,__FILE__, __func__, __LINE__);\
- pthread_mutex_unlock(x); \
- if(1)dbgprintf("mutex %p UNlock done %s, %s(), line %d\n", (void*)x, __FILE__, __func__, __LINE__); \
- }
-#else
-#define d_pthread_mutex_lock(x) pthread_mutex_lock(x)
-#define d_pthread_mutex_unlock(x) pthread_mutex_unlock(x)
-#endif
/* prototypes */
rsRetVal objInfoConstruct(objInfo_t **ppThis, objID_t objID, uchar *pszName, int iObjVers, rsRetVal (*pConstruct)(void *), rsRetVal (*pDestruct)(void *));
rsRetVal objInfoSetMethod(objInfo_t *pThis, objMethod_t objMethod, rsRetVal (*pHandler)(void*));
diff --git a/objomsr.c b/objomsr.c
index 0158c856..6a617ad1 100644
--- a/objomsr.c
+++ b/objomsr.c
@@ -92,7 +92,7 @@ rsRetVal OMSRconstruct(omodStringRequest_t **ppThis, int iNumEntries)
abort_it:
*ppThis = pThis;
- return iRet;
+ RETiRet;
}
/* set a template name and option to the object. Index must be given. The pTplName must be
diff --git a/omfile.c b/omfile.c
index bcfaeabc..36627574 100644
--- a/omfile.c
+++ b/omfile.c
@@ -172,7 +172,7 @@ rsRetVal setDynaFileCacheSize(void __attribute__((unused)) *pVal, int iNewVal)
iDynaFileCacheSize = iNewVal;
dbgprintf("DynaFileCacheSize changed to %d.\n", iNewVal);
- return iRet;
+ RETiRet;
}
@@ -218,7 +218,7 @@ static rsRetVal cflineParseOutchannel(instanceData *pData, uchar* p, omodStringR
"outchannel '%s' not found - ignoring action line",
szBuf);
logerror(errMsg);
- return RS_RET_NOT_FOUND;
+ ABORT_FINALIZE(RS_RET_NOT_FOUND);
}
/* check if there is a file name in the outchannel... */
@@ -229,7 +229,7 @@ static rsRetVal cflineParseOutchannel(instanceData *pData, uchar* p, omodStringR
"outchannel '%s' has no file name template - ignoring action line",
szBuf);
logerror(errMsg);
- return RS_RET_ERR;
+ ABORT_FINALIZE(RS_RET_ERR);
}
/* OK, we finally got a correct template. So let's use it... */
@@ -242,7 +242,8 @@ static rsRetVal cflineParseOutchannel(instanceData *pData, uchar* p, omodStringR
iRet = cflineParseTemplateName(&p, pOMSR, iEntry, iTplOpts, (uchar*) " TradFmt");
- return(iRet);
+finalize_it:
+ RETiRet;
}
@@ -516,7 +517,7 @@ static rsRetVal writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pDa
*/
if(pData->bDynamicName) {
if(prepareDynFile(pData, ppString[1], iMsgOpts) != 0)
- return RS_RET_ERR;
+ ABORT_FINALIZE(RS_RET_ERR);
}
/* create the message based on format specified */
@@ -541,7 +542,7 @@ again:
pData->f_fname, (long long) pData->f_sizeLimit, (long long) actualFileSize);
errno = 0;
logerror(errMsg);
- return RS_RET_DISABLE_ACTION;
+ ABORT_FINALIZE(RS_RET_DISABLE_ACTION);
} else {
snprintf(errMsg, sizeof(errMsg),
"file %s had grown beyond configured file size of %lld bytes, actual size was %lld - configured command resolved situation",
@@ -558,14 +559,14 @@ again:
/* If a named pipe is full, just ignore it for now
- mrn 24 May 96 */
if (pData->fileType == eTypePIPE && e == EAGAIN)
- return RS_RET_OK;
+ ABORT_FINALIZE(RS_RET_OK);
/* If the filesystem is filled up, just ignore
* it for now and continue writing when possible
* based on patch for sysklogd by Martin Schulze on 2007-05-24
*/
if (pData->fileType == eTypeFILE && e == ENOSPC)
- return RS_RET_OK;
+ ABORT_FINALIZE(RS_RET_OK);
(void) close(pData->fd);
/*
@@ -593,7 +594,9 @@ again:
}
} else if (pData->bSyncFile)
fsync(pData->fd);
- return(iRet);
+
+finalize_it:
+ RETiRet;
}
@@ -638,12 +641,15 @@ CODESTARTparseSelectorAct
* the code further changes. -- rgerhards, 2007-07-25
*/
if(*p == '$' || *p == '?' || *p == '|' || *p == '/' || *p == '-') {
- if((iRet = createInstance(&pData)) != RS_RET_OK)
- return iRet;
+ if((iRet = createInstance(&pData)) != RS_RET_OK) {
+ ENDfunc
+ return iRet; /* this can not use RET_iRet! */
+ }
} else {
/* this is not clean, but we need it for the time being
* TODO: remove when cleaning up modularization
*/
+ ENDfunc
return RS_RET_CONFLINE_UNPROCESSED;
}
diff --git a/omfwd.c b/omfwd.c
index a4a546df..66b8a055 100644
--- a/omfwd.c
+++ b/omfwd.c
@@ -201,7 +201,7 @@ static rsRetVal UDPSend(instanceData *pData, char *msg, size_t len)
}
}
- return iRet;
+ RETiRet;
}
/* CODE FOR SENDING TCP MESSAGES */
@@ -241,7 +241,7 @@ static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len)
/* TODO: we need to revisit this code -- rgerhards, 2007-12-28 */
}
- return iRet;
+ RETiRet;
}
@@ -274,7 +274,7 @@ static rsRetVal TCPSendInit(void *pvData)
iRet = RS_RET_TCP_SOCKCREATE_ERR;
}
- return iRet;
+ RETiRet;
}
@@ -322,7 +322,7 @@ static rsRetVal doTryResume(instanceData *pData)
break;
}
- return iRet;
+ RETiRet;
}
diff --git a/parse.c b/parse.c
index f8c8e785..3132b570 100644
--- a/parse.c
+++ b/parse.c
@@ -179,7 +179,7 @@ rsRetVal parsInt(rsParsObj *pThis, int* pInt)
rsRetVal parsSkipAfterChar(rsParsObj *pThis, char c)
{
register unsigned char *pC;
- rsRetVal iRet;
+ DEFiRet;
rsCHECKVALIDOBJECT(pThis, OIDrsPars);
@@ -203,7 +203,7 @@ rsRetVal parsSkipAfterChar(rsParsObj *pThis, char c)
iRet = RS_RET_NOT_FOUND;
}
- return iRet;
+ RETiRet;
}
/* Skip whitespace. Often used to trim parsable entries.
@@ -243,12 +243,12 @@ rsRetVal parsDelimCStr(rsParsObj *pThis, rsCStrObj **ppCStr, char cDelim, int bT
{
register unsigned char *pC;
rsCStrObj *pCStr;
- rsRetVal iRet;
+ DEFiRet;
rsCHECKVALIDOBJECT(pThis, OIDrsPars);
if((pCStr = rsCStrConstruct()) == NULL)
- return RS_RET_OUT_OF_MEMORY;
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
if(bTrimLeading)
parsSkipWhitespace(pThis);
@@ -274,20 +274,21 @@ rsRetVal parsDelimCStr(rsParsObj *pThis, rsCStrObj **ppCStr, char cDelim, int bT
*/
if((iRet = rsCStrFinish(pCStr)) != RS_RET_OK) {
rsCStrDestruct (pCStr);
- return(iRet);
+ FINALIZE;
}
if(bTrimTrailing) {
if((iRet = rsCStrTrimTrailingWhiteSpace(pCStr))
!= RS_RET_OK) {
rsCStrDestruct (pCStr);
- return iRet;
+ FINALIZE;
}
}
/* done! */
*ppCStr = pCStr;
- return RS_RET_OK;
+finalize_it:
+ RETiRet;
}
/* Parse a quoted string ("-some-data") from the given position.
@@ -309,17 +310,17 @@ rsRetVal parsQuotedCStr(rsParsObj *pThis, rsCStrObj **ppCStr)
{
register unsigned char *pC;
rsCStrObj *pCStr;
- rsRetVal iRet;
+ DEFiRet;
rsCHECKVALIDOBJECT(pThis, OIDrsPars);
if((iRet = parsSkipAfterChar(pThis, '"')) != RS_RET_OK)
- return iRet;
+ FINALIZE;
pC = rsCStrGetBufBeg(pThis->pCStr) + pThis->iCurrPos;
/* OK, we most probably can obtain a value... */
if((pCStr = rsCStrConstruct()) == NULL)
- return RS_RET_OUT_OF_MEMORY;
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
while(pThis->iCurrPos < rsCStrLen(pThis->pCStr)) {
if(*pC == '"') {
@@ -334,13 +335,13 @@ rsRetVal parsQuotedCStr(rsParsObj *pThis, rsCStrObj **ppCStr)
*/
if((iRet = rsCStrAppendChar(pCStr, *pC)) != RS_RET_OK) {
rsCStrDestruct (pCStr);
- return(iRet);
+ FINALIZE;
}
}
} else { /* regular character */
if((iRet = rsCStrAppendChar(pCStr, *pC)) != RS_RET_OK) {
rsCStrDestruct (pCStr);
- return(iRet);
+ FINALIZE;
}
}
++pThis->iCurrPos;
@@ -352,18 +353,19 @@ rsRetVal parsQuotedCStr(rsParsObj *pThis, rsCStrObj **ppCStr)
} else {
/* error - improperly quoted string! */
rsCStrDestruct (pCStr);
- return RS_RET_MISSING_TRAIL_QUOTE;
+ ABORT_FINALIZE(RS_RET_MISSING_TRAIL_QUOTE);
}
/* We got the string, let's finish it... */
if((iRet = rsCStrFinish(pCStr)) != RS_RET_OK) {
rsCStrDestruct (pCStr);
- return(iRet);
+ FINALIZE;
}
/* done! */
*ppCStr = pCStr;
- return RS_RET_OK;
+finalize_it:
+ RETiRet;
}
/*
@@ -382,7 +384,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits)
uchar *pszTmp;
struct addrinfo hints, *res = NULL;
rsCStrObj *pCStr;
- rsRetVal iRet;
+ DEFiRet;
rsCHECKVALIDOBJECT(pThis, OIDrsPars);
assert(pIP != NULL);
@@ -401,7 +403,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits)
&& *pC != '/' && *pC != ',' && !isspace((int)*pC)) {
if((iRet = rsCStrAppendChar(pCStr, *pC)) != RS_RET_OK) {
rsCStrDestruct (pCStr);
- return(iRet);
+ FINALIZE;
}
++pThis->iCurrPos;
++pC;
@@ -410,7 +412,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits)
/* We got the string, let's finish it... */
if((iRet = rsCStrFinish(pCStr)) != RS_RET_OK) {
rsCStrDestruct (pCStr);
- return(iRet);
+ FINALIZE;
}
/* now we have the string and must check/convert it to
@@ -424,7 +426,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits)
pszTmp = (uchar*)strchr ((char*)pszIP, ']');
if (pszTmp == NULL) {
free (pszIP);
- return RS_RET_INVALID_IP;
+ ABORT_FINALIZE(RS_RET_INVALID_IP);
}
*pszTmp = '\0';
@@ -449,7 +451,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits)
default:
free (pszIP);
free (*pIP);
- return RS_RET_ERR;
+ ABORT_FINALIZE(RS_RET_ERR);
}
if(*pC == '/') {
@@ -458,7 +460,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits)
if((iRet = parsInt(pThis, pBits)) != RS_RET_OK) {
free (pszIP);
free (*pIP);
- return(iRet);
+ FINALIZE;
}
/* we need to refresh pointer (changed by parsInt()) */
pC = rsCStrGetBufBeg(pThis->pCStr) + pThis->iCurrPos;
@@ -488,7 +490,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits)
default:
free (pszIP);
free (*pIP);
- return RS_RET_ERR;
+ ABORT_FINALIZE(RS_RET_ERR);
}
if(*pC == '/') {
@@ -497,7 +499,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits)
if((iRet = parsInt(pThis, pBits)) != RS_RET_OK) {
free (pszIP);
free (*pIP);
- return(iRet);
+ FINALIZE;
}
/* we need to refresh pointer (changed by parsInt()) */
pC = rsCStrGetBufBeg(pThis->pCStr) + pThis->iCurrPos;
@@ -518,7 +520,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits)
iRet = RS_RET_OK;
finalize_it:
- return iRet;
+ RETiRet;
}
#endif /* #ifdef SYSLOG_INET */
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index 863f0d50..530bb2d4 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -170,12 +170,11 @@ static rsRetVal readSocket(int fd, int bParseHost)
} else if (iRcvd < 0 && errno != EINTR) {
char errStr[1024];
strerror_r(errno, errStr, sizeof(errStr));
- dbgprintf("UNIX socket error: %d = %s.\n", \
- errno, errStr);
+ dbgprintf("UNIX socket error: %d = %s.\n", errno, errStr);
logerror("recvfrom UNIX");
}
- return iRet;
+ RETiRet;
}
@@ -228,7 +227,7 @@ CODESTARTrunInput
}
}
- return iRet;
+ RETiRet;
ENDrunInput
@@ -247,7 +246,7 @@ CODESTARTwillRun
dbgprintf("Opened UNIX socket '%s' (fd %d).\n", funixn[i], funix[i]);
}
- return RS_RET_OK;
+ RETiRet;
ENDwillRun
diff --git a/queue.c b/queue.c
index 89e0168f..c3666db2 100644
--- a/queue.c
+++ b/queue.c
@@ -52,491 +52,23 @@
#include "stringbuf.h"
#include "srUtils.h"
#include "obj.h"
+#include "wtp.h"
+#include "wti.h"
/* static data */
DEFobjStaticHelpers
-/* debug aides */
-#if 0
-#define d_pthread_mutex_lock(x) {dbgprintf("mutex %p lock %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \
- pthread_mutex_lock(x); \
- if(1)dbgprintf("mutex %p lock aquired %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \
- }
-#define d_pthread_mutex_unlock(x) {dbgprintf("mutex %p UNlock %s, %s(), line %d\n", x ,__FILE__, __func__, __LINE__);\
- pthread_mutex_unlock(x); \
- if(1)dbgprintf("mutex %p UNlock done %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \
- }
-#else
-#define d_pthread_mutex_lock(x) pthread_mutex_lock(x)
-#define d_pthread_mutex_unlock(x) pthread_mutex_unlock(x)
-#endif
-
-
/* forward-definitions */
rsRetVal queueChkPersist(queue_t *pThis);
-static void *queueWorker(void *arg);
-static rsRetVal queueChkWrkThrdChanges(queue_t *pThis);
static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly);
+static int queueChkStopWrkrDA(queue_t *pThis);
+static int queueIsIdleDA(queue_t *pThis);
+static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
+static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2);
/* methods */
-/* cancellation cleanup handler - frees provided mutex
- * rgerhards, 2008-01-14
- */
-static void queueMutexCleanup(void *arg)
-{
- assert(arg != NULL);
- d_pthread_mutex_unlock((pthread_mutex_t*) arg);
-}
-
-
-/* get the current worker state. For simplicity and speed, we have
- * NOT used our regular calling interface this time. I hope that won't
- * bite in the long term... -- rgerhards, 2008-01-17
- */
-static inline qWrkCmd_t
-qWrkrGetState(qWrkThrd_t *pThis)
-{
- assert(pThis != NULL);
- return pThis->tCurrCmd;
-}
-
-
-/* indicate worker thread startup
- * (it would be best if we could do this with an atomic operation)
- * rgerhards, 2008-01-19
- */
-static void
-queueWrkrThrdStartupIndication(queue_t *pThis)
-{
- int iCancelStateSave;
-
- ISOBJ_TYPE_assert(pThis, queue);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pThis->mutThrdMgmt);
- pThis->iCurNumWrkThrd++;
- d_pthread_mutex_unlock(&pThis->mutThrdMgmt);
- pthread_setcancelstate(iCancelStateSave, NULL);
-}
-
-
-/* indicate worker thread shutdown
- * (it would be best if we could do this with an atomic operation)
- * rgerhards, 2008-01-19
- */
-static void
-queueWrkrThrdShutdownIndication(queue_t *pThis)
-{
- int iCancelStateSave;
-
- ISOBJ_TYPE_assert(pThis, queue);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pThis->mutThrdMgmt);
- pThis->iCurNumWrkThrd--;
- pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
- d_pthread_mutex_unlock(&pThis->mutThrdMgmt);
- pthread_setcancelstate(iCancelStateSave, NULL);
-}
-
-
-/* send a command to a specific thread
- */
-static rsRetVal
-qWrkrSetState(qWrkThrd_t *pThis, qWrkCmd_t tCmd)
-{
- DEFiRet;
-
- assert(pThis != NULL);
-
-dbgprintf("Queue 0x%lx: trying to send command %d to thread %d\n", queueGetID(pThis->pQueue), tCmd, pThis->iThrd);
- if(pThis->tCurrCmd == eWRKTHRD_SHUTDOWN_IMMEDIATE && tCmd != eWRKTHRD_TERMINATING)
- FINALIZE;
-
- dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis->pQueue), tCmd, pThis->iThrd);
-
- /* change some admin structures */
- switch(tCmd) {
- case eWRKTHRD_TERMINATING:
- pthread_cond_destroy(&pThis->condInitDone);
- pthread_mutex_destroy(&pThis->mut);
- dbgprintf("Queue 0x%lx/w%d: thread terminating with %d entries left in queue, %d workers running.\n",
- queueGetID(pThis->pQueue), pThis->iThrd, pThis->pQueue->iQueueSize,
- pThis->pQueue->iCurNumWrkThrd);
- break;
- case eWRKTHRD_RUN_CREATED:
- pthread_cond_init(&pThis->condInitDone, NULL);
- pthread_mutex_init(&pThis->mut, NULL);
- break;
- case eWRKTHRD_RUN_INIT:
- break;
- case eWRKTHRD_RUNNING:
- pthread_cond_signal(&pThis->condInitDone);
- break;
- /* these cases just to satisfy the compiler, we do (yet) not act an them: */
- case eWRKTHRD_STOPPED:
- case eWRKTHRD_SHUTDOWN:
- case eWRKTHRD_SHUTDOWN_IMMEDIATE:
- /* DO NOTHING */
- break;
- }
-
- pThis->tCurrCmd = tCmd;
-
-finalize_it:
- return iRet;
-}
-
-/* send a command to a specific active thread. If the thread is not
- * active, the command is not sent.
- */
-static inline rsRetVal
-queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, queue);
- assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads);
-
- if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_CREATED) {
- qWrkrSetState(&pThis->pWrkThrds[iIdx], tCmd);
- } else {
- dbgprintf("Queue 0x%lx: command %d NOT sent to inactive thread %d\n", queueGetID(pThis), tCmd, iIdx);
- }
-
- return iRet;
-}
-
-
-/* Finalize construction of a wWrkrThrd_t "object"
- * rgerhards, 2008-01-17
- */
-static inline rsRetVal
-qWrkrConstructFinalize(qWrkThrd_t *pThis, queue_t *pQueue, int i)
-{
- assert(pThis != NULL);
- ISOBJ_TYPE_assert(pQueue, queue);
-
- dbgprintf("Queue 0x%lx: finalizing construction of worker %d instance data\n", queueGetID(pQueue), i);
-
- /* initialize our thread instance descriptor */
- pThis = pQueue->pWrkThrds + i;
- pThis->pQueue = pQueue;
- pThis->iThrd = i;
- pThis->pUsr = NULL;
-
- qWrkrSetState(pThis, eWRKTHRD_STOPPED);
-
- return RS_RET_OK;
-}
-
-
-/* Waits until the specified worker thread
- * changed to full running state (aka have started up). This function
- * MUST NOT be called while the queue mutex is locked as it does
- * this itself. The wait is without timeout.
- * rgerhards, 2008-01-17
- */
-static inline rsRetVal
-qWrkrWaitStartup(qWrkThrd_t *pThis)
-{
- int iCancelStateSave;
-
- assert(pThis != NULL);
-
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(pThis->pQueue->mut);
- if((pThis->tCurrCmd == eWRKTHRD_RUN_CREATED) || (pThis->tCurrCmd == eWRKTHRD_RUN_CREATED)) {
- dbgprintf("Queue 0x%lx: waiting on worker thread %d startup\n", queueGetID(pThis->pQueue),
- pThis->iThrd);
- pthread_cond_wait(&pThis->condInitDone, pThis->pQueue->mut);
-dbgprintf("worker startup done!\n");
- }
- d_pthread_mutex_unlock(pThis->pQueue->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
-
- return RS_RET_OK;
-}
-
-
-/* waits until all worker threads that a currently initializing are fully started up
- * rgerhards, 2008-01-18
- */
-static rsRetVal
-qWrkrWaitAllWrkrStartup(queue_t *pThis)
-{
- DEFiRet;
- int i;
-
- for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- qWrkrWaitStartup(pThis->pWrkThrds + i);
- }
-
- return iRet;
-}
-
-
-/* initialize the qWrkThrd_t structure - this MUST be called right after
- * startup of a worker thread. -- rgerhards, 2008-01-17
- */
-static inline rsRetVal
-qWrkrInit(qWrkThrd_t **ppThis, queue_t *pQueue)
-{
- qWrkThrd_t *pThis;
- int i;
-
- assert(ppThis != NULL);
- ISOBJ_TYPE_assert(pQueue, queue);
-
- /* find myself in the queue's thread table */
- for(i = 0 ; i <= pQueue->iNumWorkerThreads ; ++i)
- if(pQueue->pWrkThrds[i].thrdID == pthread_self())
- break;
-dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pQueue,
- (unsigned) pQueue->pWrkThrds[i].thrdID, i, (unsigned) pthread_self());
- assert(pQueue->pWrkThrds[i].thrdID == pthread_self());
-
- /* initialize our thread instance descriptor */
- pThis = pQueue->pWrkThrds + i;
- pThis->pQueue = pQueue;
- pThis->iThrd = i;
- pThis->pUsr = NULL;
-
- *ppThis = pThis;
- qWrkrSetState(pThis, eWRKTHRD_RUN_INIT);
-
- return RS_RET_OK;
-}
-
-
-/* join a specific worker thread
- */
-static inline rsRetVal
-queueJoinWrkThrd(queue_t *pThis, int iIdx)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, queue);
- assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads);
- assert(pThis->pWrkThrds[iIdx].tCurrCmd != eWRKTHRD_STOPPED);
-
- dbgprintf("Queue 0x%lx: thread %d state %d, waiting for exit\n", queueGetID(pThis), iIdx,
- pThis->pWrkThrds[iIdx].tCurrCmd);
- pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL);
- qWrkrSetState(&pThis->pWrkThrds[iIdx], eWRKTHRD_STOPPED); /* back to virgin... */
- pThis->pWrkThrds[iIdx].thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
- dbgprintf("Queue 0x%lx: thread %d state %d, has stopped\n", queueGetID(pThis), iIdx,
- pThis->pWrkThrds[iIdx].tCurrCmd);
-
- return iRet;
-}
-
-
-/* Starts a worker thread (on a specific index [i]!)
- */
-static inline rsRetVal
-queueStrtWrkThrd(queue_t *pThis, int i)
-{
- DEFiRet;
- int iState;
-
- ISOBJ_TYPE_assert(pThis, queue);
- assert(i >= 0 && i <= pThis->iNumWorkerThreads);
- assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRD_RUN_CREATED);
-
- qWrkrSetState(&pThis->pWrkThrds[i], eWRKTHRD_RUN_CREATED);
- iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
- dbgprintf("Queue 0x%lx: starting Worker thread %x, index %d with state %d.\n",
- (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState);
-
- return iRet;
-}
-
-
-/* start the DA worker thread (if not already running)
- */
-static inline rsRetVal
-queueStrtDAWrkr(queue_t *pThis)
-{
- DEFiRet;
- int iCancelStateSave;
-
- ISOBJ_TYPE_assert(pThis, queue);
-
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pThis->pWrkThrds[0].mut);
- if(pThis->pWrkThrds[0].tCurrCmd == eWRKTHRD_STOPPED) {
- iRet = queueStrtWrkThrd(pThis, 0);
- }
- d_pthread_mutex_unlock(&pThis->pWrkThrds[0].mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
-
- return iRet;
-}
-
-/* Starts a *new* worker thread. Function searches itself for a free index spot. It must only
- * be called when we have less than max workers active. Pending wrkr thread requests MUST have
- * been processed before calling this function. -- rgerhards, 2008-01-16
- */
-static inline rsRetVal
-queueStrtNewWrkThrd(queue_t *pThis)
-{
- DEFiRet;
- int i;
- int iStartingUp;
- int iState;
-
- ISOBJ_TYPE_assert(pThis, queue);
-
- /* find free spot in thread table. If we find at least one worker that is in initializiation,
- * we do NOT start a new one. Let's give the other one a chance, first.
- */
- iStartingUp = -1;
- for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) {
-dbgprintf("Queue %p: search thrd tbl slot: i %d, CuccCmd %d\n", pThis, i, pThis->pWrkThrds[i].tCurrCmd);
- if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_STOPPED) {
- break;
- } else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_CREATED) {
- iStartingUp = i;
- break;
- }
- }
-
-dbgprintf("Queue %p: after thrd search: i %d, iStartingUp %d\n", pThis, i, iStartingUp);
- if(iStartingUp > -1)
- ABORT_FINALIZE(RS_RET_ALREADY_STARTING);
-
- assert(i <= pThis->iNumWorkerThreads); /* now there must be a free spot, else something is really wrong! */
-
- qWrkrSetState(&pThis->pWrkThrds[i], eWRKTHRD_RUN_CREATED);
- iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
- dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n",
- (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState);
- /* we try to give the starting worker a little boost. It won't help much as we still
- * hold the queue's mutex, but at least it has a chance to start on a single-CPU system.
- */
- pthread_yield();
-
-finalize_it:
- return iRet;
-}
-
-
-/* send a command to all active worker threads. A start index can be
- * given. Usually, this is 0 or 1. Thread 0 is reserved to disk-assisted
- * mode and this start index take care of the special handling it needs to
- * receive. -- rgerhards, 2008-01-16
- */
-static inline rsRetVal
-queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd)
-{
- DEFiRet;
- int i;
-
- ISOBJ_TYPE_assert(pThis, queue);
- assert(iStartIdx == 0 || iStartIdx == 1);
-
- /* tell the workers our request */
- for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i)
- if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING)
- queueTellActWrkThrd(pThis, i, tCmd);
-
- return iRet;
-}
-
-
-/* compute an absolute time timeout suitable for calls to pthread_cond_timedwait()
- * rgerhards, 2008-01-14
- */
-static rsRetVal
-queueTimeoutComp(struct timespec *pt, int iTimeout)
-{
- assert(pt != NULL);
- /* compute timeout */
- clock_gettime(CLOCK_REALTIME, pt);
- pt->tv_nsec += (iTimeout % 1000) * 1000000; /* think INTEGER arithmetic! */
- if(pt->tv_nsec > 999999999) { /* overrun? */
- pt->tv_nsec -= 1000000000;
- ++pt->tv_sec;
- }
- pt->tv_sec += iTimeout / 1000;
- return RS_RET_OK; /* so far, this is static... */
-}
-
-
-/* wake up all worker threads. Param bWithDAWrk tells if the DA worker
- * is to be awaken, too. It needs special handling because it waits on
- * two different conditions depending on processing state.
- * rgerhards, 2008-01-16
- */
-static inline rsRetVal
-queueWakeupWrkThrds(queue_t *pThis, int bWithDAWrk)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, queue);
-
- pthread_cond_broadcast(&pThis->notEmpty);
- if(bWithDAWrk && pThis->qRunsDA != QRUNS_REGULAR) {
- /* if running disk-assisted, workers may wait on that condition, too */
- pthread_cond_broadcast(&pThis->condDA);
- }
-
- return iRet;
-}
-
-
-/* This function checks if (another) worker threads needs to be started. It
- * must be called while the caller holds a lock on the queue mutex. So it must not
- * do anything that either reaquires the mutex or forces somebody else to aquire
- * it (that would lead to a deadlock).
- * rgerhards, 2008-01-16
- */
-static inline rsRetVal
-queueChkAndStrtWrk(queue_t *pThis)
-{
- DEFiRet;
- int iCancelStateSave;
-
- ISOBJ_TYPE_assert(pThis, queue);
-
- /* process any pending thread requests */
- queueChkWrkThrdChanges(pThis);
-
- if(pThis->bEnqOnly == 1)
- FINALIZE; /* in enqueue-only mode we have no workers */
-
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pThis->mutThrdMgmt);
- pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt);
- pthread_setcancelstate(iCancelStateSave, NULL);
- /* check if we need to start up another worker */
- if(pThis->qRunsDA == QRUNS_REGULAR) {
- if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) {
-dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qRunsDA: %d\n",
- pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA);
- /* check if we satisfy the min nbr of messages per worker to start a new one */
- if(pThis->iCurNumWrkThrd == 0 ||
- pThis->iQueueSize / pThis->iCurNumWrkThrd > pThis->iMinMsgsPerWrkr) {
- dbgprintf("Queue 0x%lx: high activity - starting additional worker thread.\n",
- queueGetID(pThis));
- queueStrtNewWrkThrd(pThis);
- }
- }
- } else {
- if(pThis->iCurNumWrkThrd == 0 && pThis->bEnqOnly == 0) {
-dbgprintf("Queue %p: DA worker is no longer running, restarting, qsize %d, workers %d, qRunsDA: %d\n",
- pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA);
- /* DA worker has timed out and needs to be restarted */
- iRet = queueStrtDAWrkr(pThis);
- }
- }
- pthread_cleanup_pop(1);
-
-finalize_it:
- return iRet;
-}
-
/* --------------- code for disk-assisted (DA) queue modes -------------------- */
@@ -555,7 +87,7 @@ queueTurnOffDAMode(queue_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, queue);
- assert(pThis->qRunsDA != QRUNS_REGULAR);
+ assert(pThis->bRunsDA);
/* if we need to pull any data that we still need from the (child) disk queue,
* now would be the time to do so. At present, we do not need this, but I'd like to
@@ -572,62 +104,18 @@ queueTurnOffDAMode(queue_t *pThis)
dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQueInDestr %d, NumWrkd %d\n",
queueGetID(pThis),
pThis->bEnqOnly,pThis->bQueueInDestruction,pThis->iCurNumWrkThrd);
- // TODO: think about this code - there is a race
- if(pThis->bEnqOnly == 0 && pThis->bQueueInDestruction == 0 && pThis->iCurNumWrkThrd < 2)
- queueStrtNewWrkThrd(pThis);
- pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */
+ // TODO: mutex?
+ pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
* this will be quick.
*/
queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
- /* now free the remaining resources */
- pthread_mutex_destroy(&pThis->mutDA);
- pthread_cond_destroy(&pThis->condDA);
-
- queueTellActWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */
dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
queueGetID(pThis), iRet);
- return iRet;
-}
-
-/* check if we had any worker thread changes and, if so, act
- * on them. At a minimum, terminated threads are harvested (joined).
- * This function MUST NEVER block on the queue mutex!
- */
-static rsRetVal
-queueChkWrkThrdChanges(queue_t *pThis)
-{
- DEFiRet;
- int i;
-
- ISOBJ_TYPE_assert(pThis, queue);
-
- if(pThis->bThrdStateChanged == 0)
- FINALIZE;
-
- /* go through all threads (including DA thread) */
- for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- switch(pThis->pWrkThrds[i].tCurrCmd) {
- case eWRKTHRD_TERMINATING:
- queueJoinWrkThrd(pThis, i);
- break;
- /* these cases just to satisfy the compiler, we do not act an them: */
- case eWRKTHRD_STOPPED:
- case eWRKTHRD_RUN_CREATED:
- case eWRKTHRD_RUN_INIT:
- case eWRKTHRD_RUNNING:
- case eWRKTHRD_SHUTDOWN:
- case eWRKTHRD_SHUTDOWN_IMMEDIATE:
- /* DO NOTHING */
- break;
- }
- }
-
-finalize_it:
- return iRet;
+ RETiRet;
}
@@ -644,6 +132,7 @@ queueChkIsDA(queue_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, queue);
+RUNLOG_VAR("%s", pThis->pszFilePrefix);
if(pThis->pszFilePrefix != NULL) {
pThis->bIsDA = 1;
dbgprintf("Queue 0x%lx: is disk-assisted, disk will be used on demand\n", queueGetID(pThis));
@@ -651,7 +140,7 @@ queueChkIsDA(queue_t *pThis)
dbgprintf("Queue 0x%lx: is NOT disk-assisted\n", queueGetID(pThis));
}
- return iRet;
+ RETiRet;
}
@@ -660,11 +149,11 @@ queueChkIsDA(queue_t *pThis)
* chore of this function is to create the DA queue object. If that function fails,
* the DA worker should return with an appropriate state, which in turn should lead to
* a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this
- * function is called, else a race on pThis->qRunsDA may happen.
+ * function is called, else a race on pThis->bRunsDA may happen.
* rgerhards, 2008-01-15
*/
static rsRetVal
-queueStrtDA(queue_t *pThis)
+queueStartDA(queue_t *pThis)
{
DEFiRet;
@@ -682,30 +171,19 @@ dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pT
/* as the created queue is the same object class, we take the
* liberty to access its properties directly.
*/
- pThis->pqDA->condSignalOnEmpty = &pThis->condDA;
- pThis->pqDA->mutSignalOnEmpty = &pThis->mutDA;
- pThis->pqDA->condSignalOnEmpty2 = &pThis->notEmpty;
- pThis->pqDA->bSignalOnEmpty = 2;
pThis->pqDA->pqParent = pThis;
-dbgprintf("Queue %p: queueSTrtDA after assign\n", pThis);
CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
-dbgprintf("Queue %p: queueSTrtDA 10\n", pThis);
CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly));
-dbgprintf("Queue %p: queueSTrtDA 15\n", pThis);
CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
-dbgprintf("Queue %p: queueSTrtDA 20\n", pThis);
CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
-dbgprintf("Queue %p: queueSTrtDA 25\n", pThis);
if(pThis->toQShutdown == 0) {
-dbgprintf("Queue %p: queueSTrtDA 30a\n", pThis);
CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
} else {
-dbgprintf("Queue %p: queueSTrtDA 30b\n", pThis);
/* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND
* have an obviously large backlog, we can't finish it in any case. So there is no point
* in holding shutdown longer than necessary. -- rgerhards, 2008-01-15
@@ -713,30 +191,21 @@ dbgprintf("Queue %p: queueSTrtDA 30b\n", pThis);
CHKiRet(queueSettoQShutdown(pThis->pqDA, 1));
}
-dbgprintf("Queue %p: queueSTrtDA pre start\n", pThis);
+dbgprintf("Queue %p: queueStartDA pre start\n", pThis);
iRet = queueStart(pThis->pqDA);
/* file not found is expected, that means it is no previous QIF available */
if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
FINALIZE; /* something is wrong */
- /* tell our fellow workers to shut down
- * NOTE: we do NOT join them by intension! If we did, we would hold draining
- * the queue until some potentially long-running actions are finished. Having
- * the ability to immediatly drain the queue was the primary intension of
- * reserving worker thread 0 for DA queues. So if we would join the other
- * workers, we would screw up and do against our design goal.
- */
- CHKiRet(queueTellActWrkThrds(pThis, 1, eWRKTHRD_SHUTDOWN_IMMEDIATE));
-
/* as we are right now starting DA mode because we are so busy, it is
- * extremely unlikely that any worker is sleeping on empty queue. HOWEVER,
+ * extremely unlikely that any regular worker is sleeping on empty queue. HOWEVER,
* we want to be on the safe side, and so we awake anyone that is waiting
* on one. So even if the scheduler plays badly with us, things should be
* quite well. -- rgerhards, 2008-01-15
*/
- queueWakeupWrkThrds(pThis, 0); /* awake all workers, but not ourselves ;) */
+ wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */
- pThis->qRunsDA = QRUNS_DA; /* we are now in DA mode! */
+ pThis->bRunsDA = 1; /* we are now in DA mode! */
dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n",
queueGetID(pThis), queueGetID(pThis->pqDA));
@@ -751,31 +220,63 @@ finalize_it:
pThis->bIsDA = 0;
}
- return iRet;
+ RETiRet;
}
/* initiate DA mode
* param bEnqOnly tells if the disk queue is to be run in enqueue-only mode. This may
* be needed during shutdown of memory queues which need to be persisted to disk.
+ * If this function fails (should not happen), DA mode is not turned on.
* rgerhards, 2008-01-16
*/
static inline rsRetVal
-queueInitDA(queue_t *pThis, int bEnqOnly)
+queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
{
DEFiRet;
+ DEFVARS_mutexProtection;
+ uchar pszBuf[64];
+ size_t lenBuf;
+
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
+ /* check if we already have a DA worker pool. If not, initiate one. Please note that the
+ * pool is created on first need but never again destructed (until the queue is). This
+ * is intentional. We assume that when we need it once, we may also need it on another
+ * occasion. Ressources used are quite minimal when no worker is running.
+ * rgerhards, 2008-01-24
+ */
+ if(pThis->pWtpDA == NULL) {
+ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx/DA", (unsigned long) pThis);
+ CHKiRet(wtpConstruct (&pThis->pWtpDA));
+ CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, queueChkStopWrkrDA));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, queueIsIdleDA));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, queueConsumerDA));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, queueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, queueStartDA));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, queueTurnOffDAMode));
+ CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
+ CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
+ CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
+ CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis));
+ CHKiRet(wtpConstructFinalize (pThis->pWtpDA));
+ }
+ /* if we reach this point, we have a "good" DA worker pool */
/* indicate we now run in DA mode - this is reset by the DA worker if it fails */
- pThis->qRunsDA = QRUNS_DA_INIT;
+ pThis->bRunsDA = 1;
pThis->bDAEnqOnly = bEnqOnly;
- /* now we must start our DA worker thread - it does the rest of the initialization
- * In enqueue-only mode, we do not start any workers.
+ /* now we must now adivse the wtp that we need one worker. If none is yet active,
+ * that will also start one up. If we forgot that step, everything would be stalled
+ * until the next enqueue request.
*/
if(pThis->bEnqOnly == 0)
- iRet = queueStrtDAWrkr(pThis);
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* one worker only for disk queues! */
- return iRet;
+finalize_it:
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ RETiRet;
}
@@ -783,7 +284,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly)
* keep it running if we are already in it.
* rgerhards, 2008-01-14
*/
-static rsRetVal
+static inline rsRetVal
queueChkStrtDA(queue_t *pThis)
{
DEFiRet;
@@ -794,35 +295,31 @@ queueChkStrtDA(queue_t *pThis)
if(pThis->iQueueSize != pThis->iHighWtrMrk)
ABORT_FINALIZE(RS_RET_OK);
- if(pThis->qRunsDA != QRUNS_REGULAR) {
+dbgprintf("Queue %p: chkStartDA\n", pThis);
+ if(pThis->bRunsDA) {
/* then we need to signal that we are at the high water mark again. If that happens
* on our way down the queue, that doesn't matter, because then nobody is waiting
* on the condition variable.
+ * (Remember that a DA queue stops draining the queue once it has reached the low
+ * water mark and restarts it when the high water mark is reached again - this is
+ * what this code here is responsible for. Please note that all workers may have been
+ * terminated due to the inactivity timeout, thus we need to advise the pool that
+ * we need at least one).
*/
dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n",
queueGetID(pThis), pThis->iQueueSize);
- //pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- // TODO: mutex call order check! this must aquire the queue mutex
- //d_pthread_mutex_lock(&pThis->mutDA);
- pthread_cond_signal(&pThis->condDA);
- //d_pthread_mutex_unlock(&pThis->mutDA);
- //pthread_setcancelstate(iCancelStateSave, NULL);
- queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */
- }
-
- /* we need to re-check if we run disk-assisted, because that status may have changed
- * in our high water mark processing.
- */
- if(pThis->qRunsDA == QRUNS_REGULAR) {
- /* if we reach this point, we are NOT currently running in DA mode. */
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* run again [see comment above] ;) */
+ } else {
+ /* this is the case when we are currently not running in DA mode. So it is time
+ * to turn it back on.
+ */
dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n",
queueGetID(pThis), pThis->iQueueSize);
-
- queueInitDA(pThis, QUEUE_MODE_ENQDEQ); /* initiate DA mode */
+ queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
}
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -855,7 +352,7 @@ static rsRetVal qConstructFixedArray(queue_t *pThis)
queueChkIsDA(pThis);
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -868,7 +365,7 @@ static rsRetVal qDestructFixedArray(queue_t *pThis)
if(pThis->tVars.farray.pBuf != NULL)
free(pThis->tVars.farray.pBuf);
- return iRet;
+ RETiRet;
}
static rsRetVal qAddFixedArray(queue_t *pThis, void* in)
@@ -881,7 +378,7 @@ static rsRetVal qAddFixedArray(queue_t *pThis, void* in)
if (pThis->tVars.farray.tail == pThis->iMaxQueueSize)
pThis->tVars.farray.tail = 0;
- return iRet;
+ RETiRet;
}
static rsRetVal qDelFixedArray(queue_t *pThis, void **out)
@@ -895,7 +392,7 @@ static rsRetVal qDelFixedArray(queue_t *pThis, void **out)
if (pThis->tVars.farray.head == pThis->iMaxQueueSize)
pThis->tVars.farray.head = 0;
- return iRet;
+ RETiRet;
}
@@ -911,7 +408,7 @@ static rsRetVal qConstructLinkedList(queue_t *pThis)
queueChkIsDA(pThis);
- return iRet;
+ RETiRet;
}
@@ -925,7 +422,7 @@ static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis)
* dynamic left with the linked list.
*/
- return iRet;
+ RETiRet;
}
static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr)
@@ -949,7 +446,7 @@ static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr)
}
finalize_it:
- return iRet;
+ RETiRet;
}
static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
@@ -971,7 +468,7 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
}
free(pEntry);
- return iRet;
+ RETiRet;
}
@@ -986,7 +483,7 @@ queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
CHKiRet(strmSetDir(pStrm, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -1024,7 +521,7 @@ queueHaveQIF(queue_t *pThis)
/* If we reach this point, we have a .qi file */
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -1092,7 +589,7 @@ finalize_it:
queueGetID(pThis), iRet);
}
- return iRet;
+ RETiRet;
}
@@ -1150,7 +647,7 @@ CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize));
CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -1166,7 +663,7 @@ static rsRetVal qDestructDisk(queue_t *pThis)
if(pThis->pszSpoolDir != NULL)
free(pThis->pszSpoolDir);
- return iRet;
+ RETiRet;
}
static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
@@ -1179,7 +676,7 @@ static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
CHKiRet(strmFlush(pThis->tVars.disk.pWrite));
finalize_it:
- return iRet;
+ RETiRet;
}
static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
@@ -1216,7 +713,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
* queue anything ;)
*/
- return iRet;
+ RETiRet;
}
static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
@@ -1228,6 +725,7 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__
/* --------------- end type-specific handlers -------------------- */
+
/* generic code to add a queue entry */
static rsRetVal
queueAdd(queue_t *pThis, void *pUsr)
@@ -1242,7 +740,7 @@ queueAdd(queue_t *pThis, void *pUsr)
dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", queueGetID(pThis), pThis->iQueueSize);
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -1265,121 +763,31 @@ queueDel(queue_t *pThis, void *pUsr)
dbgprintf("Queue 0x%lx: entry deleted, state %d, size now %d entries\n",
queueGetID(pThis), iRet, pThis->iQueueSize);
- return iRet;
-}
-
-
-/* Send a shutdown command to all workers and awake them. This function
- * does NOT wait for them to terminate. Set bIncludeDAWRk to send the
- * termination command to the DA worker, too (else this does not happen).
- * rgerhards, 2008-01-16
- */
-static inline rsRetVal
-queueWrkThrdReqTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int bIncludeDAWrk)
-{
- DEFiRet;
-
- if(bIncludeDAWrk) {
- queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */
- queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
- } else {
- queueTellActWrkThrds(pThis, 1, tShutdownCmd);/* first tell the workers our request */
- queueWakeupWrkThrds(pThis, 0); /* awake all workers but not DA-worker */
- }
-
- return iRet;
-}
-
-
-/* Send a shutdown command to all workers and see if they terminate.
- * A timeout may be specified.
- * rgerhards, 2008-01-14
- */
-static rsRetVal
-queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
-{
- DEFiRet;
- int bTimedOut;
- struct timespec t;
- int iCancelStateSave;
-
- queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */
- queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
- /* race: must make sure all are running! */
- queueTimeoutComp(&t, iTimeout);/* get timeout */
-
- /* and wait for their termination */
-dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pThis->mutThrdMgmt);
- pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt);
- pthread_setcancelstate(iCancelStateSave, NULL);
- bTimedOut = 0;
- while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
- dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n",
- queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd);
-
- if(pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutThrdMgmt, &t) != 0) {
- dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", queueGetID(pThis));
- bTimedOut = 1; /* we exit the loop on timeout */
- }
- }
- pthread_cleanup_pop(1);
-
- if(bTimedOut)
- iRet = RS_RET_TIMED_OUT;
-
- return iRet;
+ RETiRet;
}
-/* Unconditionally cancel all running worker threads.
- * rgerhards, 2008-01-14
- */
-static rsRetVal
-queueWrkThrdCancel(queue_t *pThis)
-{
- DEFiRet;
- int i;
- // TODO: we need to implement peek(), without it (today!) we lose one message upon
- // worker cancellation! -- rgerhards, 2008-01-14
-
- /* process any pending thread requests so that we know who actually is still running */
- queueChkWrkThrdChanges(pThis);
-
- /* awake the workers one more time, just to be sure */
- queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
-
- /* first tell the workers our request */
- for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) {
- dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i);
- pthread_cancel(pThis->pWrkThrds[i].thrdID);
- }
- }
-
- return iRet;
-}
-
-
-/* Worker thread management function carried out when the main
- * worker is about to terminate.
+/* This function shuts down all worker threads and waits until they
+ * have terminated. If they timeout, they are cancelled. Parameters have been set
+ * before this function is called so that DA queues will be fully persisted to
+ * disk (if configured to do so).
+ * rgerhards, 2008-01-24
*/
static rsRetVal queueShutdownWorkers(queue_t *pThis)
{
DEFiRet;
int i;
- assert(pThis != NULL);
+ ISOBJ_TYPE_assert(pThis, queue);
- dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis);
+ dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis));
/* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This
* is necessary so that all threads get sent the termination command. With a timeout of 0, however,
* the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as
* good.
*/
- iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN, pThis->toQShutdown);
+ wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, pThis->toQShutdown);
if(iRet == RS_RET_TIMED_OUT) {
if(pThis->toQShutdown == 0) {
iRet = RS_RET_OK;
@@ -1387,418 +795,69 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* OK, we now need to try force the shutdown */
dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n",
queueGetID(pThis));
- iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown);
+ iRet = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, pThis->toActShutdown);
}
}
if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */
/* still didn't work out - so we now need to cancel the workers */
dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis);
- iRet = queueWrkThrdCancel(pThis);
+ iRet = wtpCancelAll(pThis->pWtpReg);
+ }
+
+ // TODO: do it just once but right ;)
+ if(pThis->pWtpDA != NULL) {
+ wtpShutdownAll(pThis->pWtpDA, pThis->toQShutdown, pThis->toQShutdown);
+ if(iRet == RS_RET_TIMED_OUT) {
+ if(pThis->toQShutdown == 0) {
+ iRet = RS_RET_OK;
+ } else {
+ /* OK, we now need to try force the shutdown */
+ dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n",
+ queueGetID(pThis));
+ iRet = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, pThis->toActShutdown);
+ }
+ }
+
+ if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */
+ /* still didn't work out - so we now need to cancel the workers */
+ dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis);
+ iRet = wtpCancelAll(pThis->pWtpDA);
+ }
}
+
/* finally join the threads
* In case of a cancellation, this may actually take some time. This is also
* needed to clean up the thread descriptors, even with a regular termination.
* And, most importantly, this is needed if we have an indifitite termination
* time set (timeout == 0)! -- rgerhards, 2008-01-14
*/
+#if 0 // totally wrong, we must implement something along these lines in wtp!
+RUNLOG;
for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRD_STOPPED) {
- queueJoinWrkThrd(pThis, i);
+ if(pThis->pWtpReg->pWrkr[i]->tCurrCmd != eWRKTHRD_STOPPED) {
+ wtiJoinThrd(pThis->pWtpReg->pWrkr[i]);
}
}
- dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
- queueGetID(pThis), pThis->iQueueSize);
-
- return iRet;
-}
-
-/* This is a special consumer to feed the disk-queue in disk-assited mode.
- * When active, our own queue more or less acts as a memory buffer to the disk.
- * So this consumer just needs to drain the memory queue and submit entries
- * to the disk queue. The disk queue will then call the actual consumer from
- * the app point of view (we chain two queues here).
- * rgerhards, 2008-01-14
- */
-static inline rsRetVal
-queueDAConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst)
-{
- DEFiRet;
- int iCancelStateSave;
-
- ISOBJ_TYPE_assert(pThis, queue);
- assert(pThis->qRunsDA != QRUNS_REGULAR);
- ISOBJ_assert(pWrkrInst->pUsr);
-
-dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize);/* dirty iQueueSize! */
- CHKiRet(queueEnqObj(pThis->pqDA, pWrkrInst->pUsr));
-
- /* We check if we reached the low water mark (but only if we are not in shutdown mode)
- * Note that the child queue now in almost all cases is non-empty, because we just enqueued
- * a message. Note that we need a quick check below to see if we are still in running state.
- * If not, we do not go into the wait, because that's not a good thing to do. We do not
- * do a full termination check, as this is done when we go back to the main worker loop.
- * We need to re-aquire the queue mutex here, because we need to have a consistent
- * access to the queue's admin data.
- */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
-dbgprintf("pre mutex lock (think about CLEANUP!)\n");
- d_pthread_mutex_lock(pThis->mut);
- pthread_cleanup_push(queueMutexCleanup, pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
-dbgprintf("mutex locked (think about CLEANUP!)\n");
- if(pThis->iQueueSize <= pThis->iLowWtrMrk && pWrkrInst->tCurrCmd == eWRKTHRD_RUNNING) {
- dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n",
- queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize);
- /* wait for either passing the high water mark or the child disk queue drain */
- pthread_cond_wait(&pThis->condDA, pThis->mut);
- }
- pthread_cleanup_pop(1); /* release mutex in an atomic way via cleanup handler */
-
-finalize_it:
-dbgprintf("DAConsumer returns with iRet %d\n", iRet);
- return iRet;
-}
-
-
-/* This is a helper for queueWorker () it either calls the configured
- * consumer or the DA-consumer (if in disk-assisted mode). It is
- * protected by the queue mutex, but MUST release it as soon as possible.
- * Most importantly, it must release it before the consumer is called.
- * rgerhards, 2008-01-14
- */
-static inline rsRetVal
-queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancelStateSave)
-{
- DEFiRet;
- rsRetVal iRetLocal;
- int iSeverity;
- int iQueueSize;
- void *pUsr;
- int qRunsDA;
- int iMyThrdIndx;
-
- ISOBJ_TYPE_assert(pThis, queue);
- assert(pWrkrInst != NULL);
-
- iMyThrdIndx = pWrkrInst->iThrd;
-
- /* dequeue element (still protected from mutex) */
- iRet = queueDel(pThis, &pUsr);
- queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
- iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */
- pWrkrInst->pUsr = pUsr; /* save it for the cancel cleanup handler */
- qRunsDA = pThis->qRunsDA;
- d_pthread_mutex_unlock(pThis->mut);
- pthread_cond_signal(&pThis->notFull);
- pthread_setcancelstate(iCancelStateSave, NULL);
- /* WE ARE NO LONGER PROTECTED FROM THE MUTEX */
-
- /* do actual processing (the lengthy part, runs in parallel)
- * If we had a problem while dequeing, we do not call the consumer,
- * but we otherwise ignore it. This is in the hopes that it will be
- * self-healing. However, this is really not a good thing.
- * rgerhards, 2008-01-03
- */
- if(iRet != RS_RET_OK)
- FINALIZE;
-
- /* call consumer depending on queue mode (in DA mode, we have just one thread, so it can not change) */
- if(qRunsDA == QRUNS_DA) {
- queueDAConsumer(pThis, pWrkrInst);
- } else {
- /* we are running in normal, non-disk-assisted mode
- * do a quick check if we need to drain the queue. It is OK to use the cached
- * iQueueSize here, because it does not hurt if it is slightly wrong.
- */
- if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) {
- iRetLocal = objGetSeverity(pUsr, &iSeverity);
- if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
- dbgprintf("Queue 0x%lx/w%d: dequeue/queue nearly full (%d entries), "
- "discarded severity %d message\n",
- queueGetID(pThis), iMyThrdIndx, iQueueSize, iSeverity);
- objDestruct(pUsr);
- }
- } else {
- dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n",
- queueGetID(pThis), iMyThrdIndx);
- iRetLocal = pThis->pConsumer(pUsr);
- if(iRetLocal != RS_RET_OK) {
- dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n",
- queueGetID(pThis), iMyThrdIndx, iRetLocal);
+RUNLOG;
+ if(pThis->pWtpDA != NULL) {
+ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
+ if(pThis->pWtpDA->pWrkr[i]->tCurrCmd != eWRKTHRD_STOPPED) {
+ wtiJoinThrd(pThis->pWtpDA->pWrkr[i]);
}
}
}
+#endif
-finalize_it:
- if(iRet != RS_RET_OK) {
- dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things "
- "may happen\n", queueGetID(pThis), iMyThrdIndx, iRet);
- }
-dbgprintf("CallConsumer returns %d\n", iRet);
- return iRet;
-}
-
-
-
-/* cancellation cleanup handler for queueWorker ()
- * Updates admin structure and frees ressources.
- * rgerhards, 2008-01-16
- */
-static void queueWorkerCancelCleanup(void *arg)
-{
- qWrkThrd_t *pWrkrInst = (qWrkThrd_t*) arg;
- queue_t *pThis;
- int iCancelStateSave;
-
- assert(pWrkrInst != NULL);
- ISOBJ_TYPE_assert(pWrkrInst->pQueue, queue);
- pThis = pWrkrInst->pQueue;
-
- dbgprintf("Queue 0x%lx/w%d: cancelation cleanup handler called (NOT FULLY IMPLEMENTED, one msg lost!)\n",
- queueGetID(pThis), pWrkrInst->iThrd);
-
- /* TODO: re-enqueue the data element! */
-
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pThis->mutThrdMgmt);
- qWrkrSetState(&pThis->pWrkThrds[pWrkrInst->iThrd], eWRKTHRD_TERMINATING);
- pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
-
- dbgprintf("Queue 0x%lx/w%d: thread CANCELED with %d entries left in queue, %d workers running.\n",
- queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize, pThis->iCurNumWrkThrd - 1);
-
- pThis->iCurNumWrkThrd--;
- pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
- d_pthread_mutex_unlock(&pThis->mutThrdMgmt);
- pthread_setcancelstate(iCancelStateSave, NULL);
-}
-
-
-/* This function is created to keep the code in queueWorker () short. Thus it
- * also does not abide to the usual calling conventions used in rsyslog. It is more
- * like a macro. Its sole purpose is to have a handy shortcut for the queue
- * termination condition. For the same reason, the calling parameters are a bit
- * more verbose than the need to be in theory. The reasoning is the Worker has
- * everything handy and so we do not need to access it from memory (OK, the
- * optimized would probably have created the same code, but why not do it
- * optimal right away...). The function returns 0 if the worker should terminate
- * and something else if it should continue to run.
- * rgerhards, 2008-01-18
- */
-static inline int
-queueWorkerRemainActive(queue_t *pThis, qWrkThrd_t *pWrkrInst)
-{
- register int b; /* this is a boolean! */
-
- /* first check the usual termination condition that applies to all workers */
- b = ( (qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING)
- || ((qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && (pThis->iQueueSize > 0)));
-dbgprintf("Queue %p/w%d: chk 1 pre empty queue, qsize %d (high wtr %d), cont run: %d, cmd %d, DA qsize %d\n", pThis,
- pWrkrInst->iThrd,
- pThis->iQueueSize, pThis->iHighWtrMrk, b, qWrkrGetState(pWrkrInst),
- (pThis->pqDA == NULL) ? -1 : pThis->pqDA->iQueueSize);
- if(b && pWrkrInst->iThrd == 0 && pThis->qRunsDA == QRUNS_DA) {
- b = pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->pqDA->iQueueSize != 0;
- }
+ dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
+ queueGetID(pThis), pThis->iQueueSize);
-dbgprintf("Queue %p/w%d: pre empty queue, qsize %d, cont run: %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize, b);
- return b;
+ RETiRet;
}
-/* Each queue has at least one associated worker (consumer) thread. It will pull
- * the message from the queue and pass it to a user-defined function.
- * This function was provided on construction. It MUST be thread-safe.
- * Worker thread 0 is always reserved for disk-assisted mode (if the queue
- * is not DA, this worker will be dormant). All other workers are for
- * regular operations mode. Workers are started and stopped as need arises.
- * rgerhards, 2008-01-15
- */
-static void *
-queueWorker(void *arg)
-{
- queue_t *pThis = (queue_t*) arg;
- sigset_t sigSet;
- struct timespec t;
- int iMyThrdIndx; /* index for this thread in queue thread table */
- int iCancelStateSave;
- qWrkThrd_t *pWrkrInst; /* for cleanup handler */
- int bContinueRun;
-
- ISOBJ_TYPE_assert(pThis, queue);
-
- sigfillset(&sigSet);
- pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
-
- /* do some one-time thread initialization */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pThis->mutThrdMgmt);
-
- /* initialize our thread instance descriptor */
- qWrkrInit(&pWrkrInst, pThis);
-
- iMyThrdIndx = pWrkrInst->iThrd;
-
- dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx);
-
-dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA);
- if((iMyThrdIndx == 0) && (pThis->qRunsDA != QRUNS_DA)) { /* are we the DA worker? */
- if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */
- /* if we could not init the DA queue, we have nothing to do, so shut down. */
- queueTellActWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE);
- }
- }
-
- /* finally change to RUNNING state. We need to check if we actually should still run,
- * because someone may have requested us to shut down even before we got a chance to do
- * our init. That would be a bad race... -- rgerhards, 2008-01-16
- */
- pThis->iCurNumWrkThrd++;
- if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT)
- qWrkrSetState(pWrkrInst, eWRKTHRD_RUNNING); /* we are running now! */
-
- pthread_cleanup_push(queueWorkerCancelCleanup, pWrkrInst);
-
- d_pthread_mutex_unlock(&pThis->mutThrdMgmt);
- pthread_setcancelstate(iCancelStateSave, NULL);
- /* end one-time stuff */
-
- /* now we have our identity, on to real processing */
- bContinueRun = 1; /* we need this variable, because we need to check the actual termination condition
- * while protected by mutex */
- while(bContinueRun) {
-dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n",
- queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd);
- /* process any pending thread requests */
- queueChkWrkThrdChanges(pThis);
-
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(pThis->mut);
-dbgprintf("pthis 2: %p\n", pThis);
-
- if((bContinueRun = queueWorkerRemainActive(pThis, pWrkrInst)) == 0) {
-dbgprintf("pthis 2a: %p\n", pThis);
- d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
-dbgprintf("pthis 2b: %p\n", pThis);
- continue; /* and break loop */
- }
-
- /* if we reach this point, we are still protected by the mutex */
-dbgprintf("pthis 3: %p\n", pThis);
-
- if(pThis->iQueueSize == 0) {
- dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
- queueGetID(pThis), iMyThrdIndx);
- /* check if the parent DA worker is running and, if not, initiate it. Thanks
- * to queueStrtDAWrkr (), we do not actually need to check (that routines does
- * that for us, but we need to aquire the parent queue's mutex to call it.
- */
- if(pThis->pqParent != NULL) {
- dbgprintf("Queue %p: pre start parent %p worker\n", pThis, pThis->pqParent);
- queueStrtDAWrkr(pThis->pqParent);
- }
-
- if(pThis->bSignalOnEmpty > 0) {
- /* we need to signal our parent queue that we are empty */
- dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx);
- pthread_cond_signal(pThis->condSignalOnEmpty);
- dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx);
- }
- if(pThis->bSignalOnEmpty > 1) {
- /* no mutex associated with this condition, it's just a try (but needed
- * to wakeup a parent worker if e.g. the queue was restarted from disk) */
- pthread_cond_signal(pThis->condSignalOnEmpty2);
- }
- /* If we arrive here, we have the regular case, where we can safely assume that
- * iQueueSize and tCmd have not changed since the while().
- */
- dbgprintf("Queue %p/w%d: pre condwait ->notEmpty, worker shutdown %d\n", pThis, iMyThrdIndx, pThis->toWrkShutdown);
- /* DA worker and first worker never have an inactivity timeout */
- if(pWrkrInst->iThrd < 2 || pThis->toWrkShutdown == -1) {
- //xxx if(pThis->toWrkShutdown == -1) {
- dbgprintf("worker never times out!\n");
- /* never shut down any started worker */
- pthread_cond_wait(&pThis->notEmpty, pThis->mut);
- } else {
- queueTimeoutComp(&t, pThis->toWrkShutdown);/* get absolute timeout */
- if(pthread_cond_timedwait(&pThis->notEmpty, pThis->mut, &t) != 0) {
- dbgprintf("Queue 0x%lx/w%d: inactivity timeout, worker terminating...\n",
- queueGetID(pThis), iMyThrdIndx);
- /* we use SHUTDOWN (and not SHUTDOWN_IMMEDIATE) so that the worker
- * does not terminate if in the mean time a new message arrived.
- */
- qWrkrSetState(pWrkrInst, eWRKTHRD_SHUTDOWN);
- }
- }
- dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx);
- d_pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
- pthread_testcancel(); /* see big comment below */
- pthread_yield(); /* see big comment below */
- continue; /* request next iteration */
- }
-
- /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */
- queueWorkerChkAndCallConsumer(pThis, pWrkrInst, iCancelStateSave);
-
- /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
- * a cancellation point in itself. As we run most of the time without cancel enabled, I fear
- * we may never get cancelled if we do not create a cancellation point ourselfs.
- */
- pthread_testcancel();
- /* We now yield to give the other threads a chance to obtain the mutex. If we do not
- * do that, this thread may very well aquire the mutex again before another thread
- * has even a chance to run. The reason is that mutex operations are free to be
- * implemented in the quickest possible way (and they typically are!). That is, the
- * mutex lock/unlock most probably just does an atomic memory swap and does not necessarily
- * schedule other threads waiting on the same mutex. That can lead to the same thread
- * aquiring the mutex ever and ever again while all others are starving for it. We
- * have exactly seen this behaviour when we deliberately introduced a long-running
- * test action which basically did a sleep. I understand that with real actions the
- * likelihood of this starvation condition is very low - but it could still happen
- * and would be very hard to debug. The yield() is a sure fix, its performance overhead
- * should be well accepted given the above facts. -- rgerhards, 2008-01-10
- */
- pthread_yield();
- if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0)
- dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has "
- " %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
- }
-
- /* indicate termination */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
-dbgprintf("Queue %p: worker waiting for mutex\n", pThis);
- d_pthread_mutex_lock(&pThis->mutThrdMgmt);
- /* check if we are the DA worker and, if so, switch back to regular mode */
- if(pWrkrInst->iThrd == 0) {
- queueTurnOffDAMode(pThis);
- }
- pthread_cleanup_pop(0); /* remove cleanup handler */
-
- pThis->iCurNumWrkThrd--; /* one less ;) */
- /* if we ever need finalize_it, here would be the place for it! */
- if(qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN ||
- qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN_IMMEDIATE ||
- qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT ||
- qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_CREATED) {
- /* in shutdown case, we need to flag termination. All other commands
- * have a meaning to the thread harvester, so we can not overwrite them
- */
-dbgprintf("Queue 0x%lx/w%d: setting termination state\n", queueGetID(pThis), iMyThrdIndx);
- qWrkrSetState(pWrkrInst, eWRKTHRD_TERMINATING);
- }
- pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
- pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
- d_pthread_mutex_unlock(&pThis->mutThrdMgmt);
- pthread_setcancelstate(iCancelStateSave, NULL);
-
- pthread_exit(0);
-}
-
/* Constructor for the queue object
* This constructs the data structure, but does not yet start the queue. That
@@ -1811,6 +870,8 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
DEFiRet;
queue_t *pThis;
+int *pBoom = NULL;
+//*pBoom = 'A';
assert(ppThis != NULL);
assert(pConsumer != NULL);
assert(iWorkerThreads >= 0);
@@ -1866,7 +927,205 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
finalize_it:
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
- return iRet;
+ RETiRet;
+}
+
+
+/* cancellation cleanup handler for queueWorker ()
+ * Updates admin structure and frees ressources.
+ * rgerhards, 2008-01-16
+ */
+static rsRetVal
+queueConsumerCancelCleanup(void *arg1, void *arg2)
+{
+ queue_t *pThis = (queue_t*) arg1;
+ wti_t *pWti = (wti_t*) arg2;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pWti, wti);
+
+ dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n",
+ queueGetID(pThis));
+
+ /* TODO: re-enqueue the data element! */
+
+ return RS_RET_OK;
+}
+
+
+
+/* This function checks if the provided message shall be discarded and does so, if needed.
+ * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
+ * provide real-time creation of spool files.
+ * Note: cached copies of iQueueSize and bRunsDA are provided so that no mutex locks are required.
+ * The caller must have obtained them while the mutex was locked. Of course, these values may no
+ * longer be current, but that is OK for the discard check. At worst, the message is either processed
+ * or discarded when it should not have been. As discarding is in itself somewhat racy and erratic,
+ * that is no problems for us. This function MUST NOT lock the queue mutex, it could result in
+ * deadlocks!
+ * If the message is discarded, it can no longer be processed by the caller. So be sure to check
+ * the return state!
+ * rgerhards, 2008-01-24
+ */
+static int queueChkDiscardMsg(queue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr)
+{
+ DEFiRet;
+ rsRetVal iRetLocal;
+ int iSeverity;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_assert(pUsr);
+
+ if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) {
+ iRetLocal = objGetSeverity(pUsr, &iSeverity);
+ if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
+ dbgprintf("Queue 0x%lx: queue nearly full (%d entries), discarded severity %d message\n",
+ queueGetID(pThis), iQueueSize, iSeverity);
+ objDestruct(pUsr);
+ ABORT_FINALIZE(RS_RET_QUEUE_FULL);
+ } else {
+ dbgprintf("Queue 0x%lx: queue nearly full (%d entries), but could not drop msg "
+ "(iRet: %d, severity %d)\n", queueGetID(pThis), iQueueSize,
+ iRetLocal, iSeverity);
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* dequeue the queued object for the queue consumers.
+ * rgerhards, 2008-10-21
+ */
+static rsRetVal
+queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
+{
+ DEFiRet;
+ void *pUsr;
+ int iQueueSize;
+ int bRunsDA; /* cache for early mutex release */
+
+ /* dequeue element (still protected from mutex) */
+ iRet = queueDel(pThis, &pUsr);
+ queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
+ iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */
+ bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
+ pWti->pUsrp = pUsr; /* save it for the cancel cleanup handler */
+ d_pthread_mutex_unlock(pThis->mut);
+ pthread_cond_signal(&pThis->notFull);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ /* WE ARE NO LONGER PROTECTED BY THE MUTEX */
+
+ /* do actual processing (the lengthy part, runs in parallel)
+ * If we had a problem while dequeing, we do not call the consumer,
+ * but we otherwise ignore it. This is in the hopes that it will be
+ * self-healing. However, this is really not a good thing.
+ * rgerhards, 2008-01-03
+ */
+ if(iRet != RS_RET_OK)
+ FINALIZE;
+
+ /* we are running in normal, non-disk-assisted mode do a quick check if we need to drain the queue.
+ * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
+ * provide real-time creation of spool files.
+ * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong.
+ */
+ CHKiRet(queueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
+
+finalize_it:
+ if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
+ dbgprintf("Queue 0x%lx/w?: error %d dequeueing element - ignoring, but strange things "
+ "may happen\n", queueGetID(pThis), iRet);
+ }
+ RETiRet;
+}
+
+
+/* This is the queue consumer in the regular (non-DA) case. It is
+ * protected by the queue mutex, but MUST release it as soon as possible.
+ * rgerhards, 2008-01-21
+ */
+static rsRetVal
+queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pWti, wti);
+
+ CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(pThis->pConsumer(pWti->pUsrp));
+
+finalize_it:
+dbgprintf("Queue %p: regular consumer returns %d\n", pThis, iRet);
+ RETiRet;
+}
+
+
+/* This is a special consumer to feed the disk-queue in disk-assited mode.
+ * When active, our own queue more or less acts as a memory buffer to the disk.
+ * So this consumer just needs to drain the memory queue and submit entries
+ * to the disk queue. The disk queue will then call the actual consumer from
+ * the app point of view (we chain two queues here).
+ * When this method is entered, the mutex is always locked and needs to be unlocked
+ * as part of the processing.
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pWti, wti);
+
+dbgprintf("Queue %p/w?: queueDAConsumer, queue size %d\n", pThis, pThis->iQueueSize);/* dirty iQueueSize! */
+ CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(queueEnqObj(pThis->pqDA, pWti->pUsrp));
+
+finalize_it:
+dbgprintf("DAConsumer returns with iRet %d\n", iRet);
+ RETiRet;
+}
+
+
+/* must only be called when the queue mutex is locked, else results
+ * are not stable!
+ * Version when running in DA mode.
+ */
+static int
+queueChkStopWrkrDA(queue_t *pThis)
+{
+ return pThis->bEnqOnly || !pThis->bRunsDA;
+}
+
+/* must only be called when the queue mutex is locked, else results
+ * are not stable!
+ * Version when running in non-DA mode.
+ */
+static int
+queueChkStopWrkrReg(queue_t *pThis)
+{
+ return pThis->bEnqOnly || pThis->bRunsDA;
+}
+
+
+/* must only be called when the queue mutex is locked, else results
+ * are not stable! DA version
+ */
+static int
+queueIsIdleDA(queue_t *pThis)
+{
+ return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
+}
+/* must only be called when the queue mutex is locked, else results
+ * are not stable! Regular version
+ */
+static int
+queueIsIdleReg(queue_t *pThis)
+{
+ return (pThis->iQueueSize == 0);
}
@@ -1878,7 +1137,8 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
DEFiRet;
rsRetVal iRetLocal;
int bInitialized = 0; /* is queue already initialized? */
- int i;
+ uchar pszBuf[64];
+ size_t lenBuf;
assert(pThis != NULL);
@@ -1888,7 +1148,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
if(pThis->pqParent == NULL) {
dbgprintf("Queue %p: no parent, alloc mutex\n", pThis);
pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
- pthread_mutex_init (pThis->mut, NULL);
+ pthread_mutex_init(pThis->mut, NULL);
} else {
/* child queue, we need to use parent's mutex */
pThis->mut = pThis->pqParent->mut;
@@ -1909,15 +1169,23 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
- /* initialize worker thread instances
- * TODO: move to separate function
+ /* create worker thread pools for regular operation. The DA pool is created on an as-needed
+ * basis, which potentially means never under most circumstances.
*/
- if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL)
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- for(i = 0 ; i < pThis->iNumWorkerThreads + 1 ; ++i) {
- qWrkrConstructFinalize(&pThis->pWrkThrds[i], pThis, i);
- }
-
+ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx/Reg", (unsigned long) pThis);
+ CHKiRet(wtpConstruct (&pThis->pWtpReg));
+ CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, queueChkStopWrkrReg));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, queueIsIdleReg));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, queueConsumerReg));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, queueConsumerCancelCleanup));
+ CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
+ CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
+ CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
+ CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis));
+ CHKiRet(wtpConstructFinalize (pThis->pWtpReg));
+
+ /* initialize worker thread instances */
if(pThis->bIsDA) {
/* If we are disk-assisted, we need to check if there is a QIF file
* which we need to load. -- rgerhards, 2008-01-15
@@ -1927,7 +1195,11 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n",
queueGetID(pThis));
- queueInitDA(pThis, QUEUE_MODE_ENQDEQ); /* initiate DA mode */
+ queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
+ /* we need to start the DA worker thread so that messages will be processed. So
+ * we advise the worker pool there is at least one needed. The wtp does the rest...
+ */
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
bInitialized = 1; /* we are done */
} else {
// TODO: use logerror? -- rgerhards, 2008-01-16
@@ -1938,16 +1210,14 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
if(!bInitialized) {
dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis));
- /* fire up the worker threads */
- if(pThis->bEnqOnly == 0)
- queueStrtNewWrkThrd(pThis);
- // TODO: preforked workers! queueStrtAllWrkThrds(pThis);
+ /* we do not fire up any worker threads here, this happens automatically when they are needed */
+ // TODO: preforked workers? queueStrtAllWrkThrds(pThis);
}
pThis->bQueueStarted = 1;
finalize_it:
dbgprintf("queueStart() exit, iret %d\n", iRet);
- return iRet;
+ RETiRet;
}
@@ -2023,7 +1293,7 @@ finalize_it:
if(psQIF != NULL)
strmDestruct(&psQIF);
- return iRet;
+ RETiRet;
}
@@ -2043,15 +1313,16 @@ rsRetVal queueChkPersist(queue_t *pThis)
pThis->iUpdsSincePersist = 0;
}
- return iRet;
+ RETiRet;
}
/* destructor for the queue object */
rsRetVal queueDestruct(queue_t **ppThis)
{
- queue_t *pThis;
DEFiRet;
+ queue_t *pThis;
+ DEFVARS_mutexProtection;
assert(ppThis != NULL);
pThis = *ppThis;
@@ -2064,52 +1335,38 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
/* we do not need to take care of any messages left in queue if we are in enqueue only mode */
if(!pThis->bEnqOnly) {
/* in regular mode, need look at termination */
+ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
if(pThis->bIsDA && pThis->iQueueSize > 0) { // TODO: atomic iQueueSize!
dbgprintf("IsDA queue, modifying params for draining\n");
pThis->iHighWtrMrk = 1; /* make sure we drain */
pThis->iLowWtrMrk = 0; /* disable low water mark algo */
- if(pThis->qRunsDA == QRUNS_REGULAR) {
+ if(pThis->bRunsDA == 0) {
if(pThis->iQueueSize > 0) {
- queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* initiate DA mode */
+ queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
}
} else {
queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */
+ /* worker may have been waited on low water mark, reactivate */
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
}
if(pThis->bSaveOnShutdown) {
dbgprintf("bSaveOnShutdown set, eternal timeout set\n");
pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL;
}
- /* now we need to activate workers (read doc/dev_queue.html) */
+ END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
-
- /* wait until all pending workers are started up */
- qWrkrWaitAllWrkrStartup(pThis);
-
- // We need to startup a worker if we are in non-DA mode and the queue is not empty and not in enque-only mode */
- dbgprintf("Queue %p: queueDestruct probing if any regular workers need to be started, CurWrkr %d, qsize %d, qRunsDA %d\n",
- pThis, pThis->iCurNumWrkThrd, pThis->iQueueSize, pThis->qRunsDA);
- d_pthread_mutex_lock(pThis->mut);
- dbgprintf("queueDestruct mutex locked\n");
- if(pThis->iCurNumWrkThrd == 0 && pThis->iQueueSize > 0 && !pThis->bEnqOnly) {
- dbgprintf("Queue %p: queueDestruct must start regular workers!\n", pThis);
- // TODO check mutex call order - doies function aquire mutex?
- queueStrtNewWrkThrd(pThis);
- }
- d_pthread_mutex_unlock(pThis->mut);
- dbgprintf("queueDestruct mutex unlocked\n");
-
- /* wait again in case a new worker was started */
- qWrkrWaitAllWrkrStartup(pThis);
}
- /* terminate our own worker threads */
- if(pThis->pWrkThrds != NULL) {
+ /* at this point, the queue is either empty with all workers being idle (or deact) or the queue
+ * is full and all workers are running. We now need to wait for everyone to become idle.
+ */
+ if(pThis->qType != QUEUETYPE_DIRECT) {
queueShutdownWorkers(pThis);
}
- /* if still running DA, terminate disk queue */
- if(pThis->qRunsDA != QRUNS_REGULAR)
+ /* if still running DA, terminate disk queue (note that the DA queue is NULL if it was never used) */
+ if(pThis->bRunsDA && pThis->pqDA != NULL)
queueDestruct(&pThis->pqDA);
/* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) */
@@ -2118,9 +1375,10 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
}
/* ... then free resources */
- if(pThis->pWrkThrds != NULL) {
- free(pThis->pWrkThrds);
- pThis->pWrkThrds = NULL;
+ if(pThis->qType != QUEUETYPE_DIRECT) {
+ wtpDestruct(&pThis->pWtpReg);
+ if(pThis->pWtpDA != NULL)
+ wtpDestruct(&pThis->pWtpDA);
}
if(pThis->pqParent == NULL) {
@@ -2141,7 +1399,7 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove
free(pThis);
*ppThis = NULL;
- return iRet;
+ RETiRet;
}
@@ -2167,7 +1425,7 @@ queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
pThis->lenFilePrefix = iLenPrefix;
finalize_it:
- return iRet;
+ RETiRet;
}
/* set the queue's maximum file size
@@ -2187,7 +1445,7 @@ queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize)
pThis->iMaxFileSize = iMaxFileSize;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -2203,65 +1461,64 @@ queueEnqObj(queue_t *pThis, void *pUsr)
{
DEFiRet;
int iCancelStateSave;
+ int iMaxWorkers;
int i;
struct timespec t;
- int iSeverity = 8;
- rsRetVal iRetLocal;
ISOBJ_TYPE_assert(pThis, queue);
- /* process any pending thread requests */
- queueChkWrkThrdChanges(pThis);
-
+dbgprintf("Queue %p: EnqObj() 1\n", pThis);
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
* during its execution. If that is not done, race conditions occur if the
* thread is canceled (most important use case is input module termination).
* rgerhards, 2008-01-08
*/
- if(pThis->pWrkThrds != NULL) {
+ if(pThis->qType != QUEUETYPE_DIRECT) {
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(pThis->mut);
}
- /* first check if we can discard anything */
- if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) {
- iRetLocal = objGetSeverity(pUsr, &iSeverity);
- if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
- dbgprintf("Queue 0x%lx: queue nearly full (%d entries), discarded severity %d message\n",
- queueGetID(pThis), pThis->iQueueSize, iSeverity);
- objDestruct(pUsr);
- ABORT_FINALIZE(RS_RET_QUEUE_FULL);
- } else {
- dbgprintf("Queue 0x%lx: queue nearly full (%d entries), but could not drop msg "
- "(iRet: %d, severity %d)\n", queueGetID(pThis), pThis->iQueueSize,
- iRetLocal, iSeverity);
- }
- }
+ /* first check if we need to discard this message (which will cause CHKiRet() to exit) */
+ CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
+dbgprintf("Queue %p: EnqObj() 10\n", pThis);
/* then check if we need to add an assistance disk queue */
if(pThis->bIsDA)
CHKiRet(queueChkStrtDA(pThis));
- /* re-process any new pending thread requests and see if we need to start workers */
- queueChkAndStrtWrk(pThis);
+RUNLOG_VAR("%d", pThis->bIsDA);
+ /* make sure at least one worker is running. */
+ if(pThis->bRunsDA) {
+RUNLOG;
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
+ } else {
+ if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
+ iMaxWorkers = 1;
+ } else {
+ iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1;
+ }
+RUNLOG;
+ wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
+ }
- /* and finally (try to) enqueue what is left over */
+ /* wait for the queue to be ready... */
while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis));
- queueTimeoutComp(&t, pThis->toEnq);
+ timeoutComp(&t, pThis->toEnq);
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", queueGetID(pThis));
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
}
+
+ /* and finally enqueue the message */
CHKiRet(queueAdd(pThis, pUsr));
queueChkPersist(pThis);
finalize_it:
- /* now awake sleeping worker threads */
- if(pThis->pWrkThrds != NULL) {
+ if(pThis->qType != QUEUETYPE_DIRECT) {
d_pthread_mutex_unlock(pThis->mut);
i = pthread_cond_signal(&pThis->notEmpty);
dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i);
@@ -2269,7 +1526,7 @@ finalize_it:
}
- return iRet;
+ RETiRet;
}
@@ -2307,7 +1564,8 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly)
/* this means we need to terminate all workers - that's it... */
dbgprintf("Queue 0x%lx: switching to enqueue-only mode, terminating all worker threads\n",
queueGetID(pThis));
- queueWrkThrdReqTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0);
+ wtpWakeupAllWrkr(pThis->pWtpDA);
+ wtpWakeupAllWrkr(pThis->pWtpReg);
} else {
/* switch back to regular mode */
ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */
@@ -2321,7 +1579,7 @@ finalize_it:
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
}
- return iRet;
+ RETiRet;
}
@@ -2360,19 +1618,18 @@ static rsRetVal queueSetProperty(queue_t *pThis, property_t *pProp)
}
finalize_it:
- return iRet;
+ RETiRet;
}
#undef isProp
-
-
/* Initialize the stream class. Must be called as the very first method
* before anything else is called inside this class.
* rgerhards, 2008-01-09
*/
BEGINObjClassInit(queue, 1)
- //OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize);
OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
//OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize);
+//fprintf(stdout, "queueChkStopWrkrReg: %p\n", queueChkStopWrkrReg);
+//fprintf(stdout, "queueChkStopWrkrDA: %p\n", queueChkStopWrkrDA);
ENDObjClassInit(queue)
/*
diff --git a/queue.h b/queue.h
index dca80ffd..ee2637da 100644
--- a/queue.h
+++ b/queue.h
@@ -25,27 +25,9 @@
#include <pthread.h>
#include "obj.h"
+#include "wtp.h"
#include "stream.h"
-/* some information about disk files used by the queue. In the long term, we may
- * export this settings to a separate file module - or not (if they are too
- * queue-specific. I just thought I mention it here so that everyone is aware
- * of this possibility. -- rgerhards, 2008-01-07
- */
-typedef struct {
- int fd; /* the file descriptor, -1 if closed */
- uchar *pszFileName; /* name of current file (if open) */
- int iCurrFileNum;/* current file number (NOT descriptor, but the number in the file name!) */
- size_t iCurrOffs;/* current offset */
- uchar *pIOBuf; /* io Buffer */
- int iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */
- int iBufPtr; /* pointer into current buffer */
- int iUngetC; /* char set via UngetChar() call or -1 if none set */
- int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */
-} queueFileDescription_t;
-#define qFILE_IOBUF_SIZE 4096 /* size of the IO buffer */
-
-
/* queue types */
typedef enum {
QUEUETYPE_FIXED_ARRAY = 0,/* a simple queue made out of a fixed (initially malloced) array fast but memoryhog */
@@ -60,17 +42,6 @@ typedef struct qLinkedList_S {
void *pUsr;
} qLinkedList_t;
-/* commands and states for worker threads. */
-typedef enum {
- eWRKTHRD_STOPPED = 0, /* worker thread is not running (either actually never ran or was shut down) */
- eWRKTHRD_TERMINATING = 1,/* worker thread has shut down, but some finalzing is still needed */
- /* ALL active states MUST be numerically higher than eWRKTHRD_TERMINATED and NONE must be lower! */
- eWRKTHRD_RUN_CREATED = 2,/* worker thread has been created, but not yet begun initialization (prob. not yet scheduled) */
- eWRKTHRD_RUN_INIT = 3, /* worker thread is initializing, but not yet fully running */
- eWRKTHRD_RUNNING = 4, /* worker thread is up and running and shall continue to do so */
- eWRKTHRD_SHUTDOWN = 5, /* worker thread is running but shall terminate when queue is empty */
- eWRKTHRD_SHUTDOWN_IMMEDIATE = 6/* worker thread is running but shall terminate even if queue is full */
-} qWrkCmd_t;
typedef struct qWrkThrd_s {
pthread_t thrdID; /* thread ID */
@@ -95,7 +66,8 @@ typedef struct queue_s {
int iNumWorkerThreads;/* number of worker threads to use */
int iCurNumWrkThrd;/* current number of active worker threads */
int iMinMsgsPerWrkr;/* minimum nbr of msgs per worker thread, if more, a new worker is started until max wrkrs */
- qWrkThrd_t *pWrkThrds;/* array with control structure for the worker thread(s) associated with this queue */
+ wtp_t *pWtpDA;
+ wtp_t *pWtpReg;
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
int iHighWtrMrk; /* high water mark for disk-assisted memory queues */
@@ -138,11 +110,7 @@ typedef struct queue_s {
int iNumberFiles; /* how many files make up the queue? */
size_t iMaxFileSize; /* max size for a single queue file */
int bIsDA; /* is this queue disk assisted? */
- enum {
- QRUNS_REGULAR,
- QRUNS_DA_INIT,
- QRUNS_DA
- } qRunsDA; /* is this queue actually *running* disk assisted? if so, which mode? */
+ int bRunsDA; /* is this queue actually *running* disk assisted? */
pthread_mutex_t mutDA; /* mutex for low water mark algo */
pthread_cond_t condDA; /* and its matching condition */
struct queue_s *pqDA; /* queue for disk-assisted modes */
diff --git a/rsyslog.h b/rsyslog.h
index e37263e3..dd7b1c1e 100644
--- a/rsyslog.h
+++ b/rsyslog.h
@@ -112,6 +112,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_TIMED_OUT = -2041, /**< timeout occured (not necessarily an error) */
RS_RET_QSIZE_ZERO = -2042, /**< queue size is zero where this is not supported */
RS_RET_ALREADY_STARTING = -2043, /**< something (a thread?) is already starting - not necessarily an error */
+ RS_RET_NO_MORE_THREADS = -2044, /**< no more threads available, not necessarily an error */
RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */
RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */
RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */
@@ -128,11 +129,9 @@ typedef enum rsRetVal_ rsRetVal; /**< friendly type for global return value */
#define CHKiRet_Hdlr(code) if((iRet = code) != RS_RET_OK)
/* macro below is used in conjunction with CHKiRet_Hdlr, else use ABORT_FINALIZE */
#define FINALIZE goto finalize_it;
-#if 0 /* DEV debug: set to 1 to get a rough call trace -- rgerhards, 2008-01-13 */
-# define DEFiRet dbgprintf("Entering %s, line %d\n", __FILE__, __LINE__); rsRetVal iRet = RS_RET_OK
-#else
-# define DEFiRet rsRetVal iRet = RS_RET_OK
-#endif
+#define DEFiRet BEGINfunc rsRetVal iRet = RS_RET_OK
+#define RETiRet do{ ENDfunc return iRet; }while(0)
+
#define ABORT_FINALIZE(errCode) \
do { \
iRet = errCode; \
@@ -197,6 +196,8 @@ typedef unsigned char uchar;
/* The following prototype is convenient, even though it may not be the 100% correct place.. -- rgerhards 2008-01-07 */
void dbgprintf(char *, ...) __attribute__((format(printf, 1, 2)));
+#include "debug.h"
+
#endif /* multi-include protection */
/*
* vi:set ai:
diff --git a/srUtils.c b/srUtils.c
index 35b437bd..590e7eff 100755
--- a/srUtils.c
+++ b/srUtils.c
@@ -291,7 +291,7 @@ rsRetVal genFileName(uchar **ppName, uchar *pDirName, size_t lenDirName, uchar *
*ppName = pName;
finalize_it:
- return iRet;
+ RETiRet;
}
/* get the number of digits required to represent a given number. We use an
@@ -337,8 +337,10 @@ timeoutComp(struct timespec *pt, int iTimeout)
void
mutexCancelCleanup(void *arg)
{
+ BEGINfunc
assert(arg != NULL);
d_pthread_mutex_unlock((pthread_mutex_t*) arg);
+ ENDfunc
}
diff --git a/stream.c b/stream.c
index 15d9dcf6..cb6d5e29 100644
--- a/stream.c
+++ b/stream.c
@@ -107,7 +107,7 @@ static rsRetVal strmOpenFile(strm_t *pThis)
iFlags, pThis->fd);
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -138,7 +138,7 @@ static rsRetVal strmCloseFile(strm_t *pThis)
pThis->pszCurrFName = NULL;
}
- return iRet;
+ RETiRet;
}
@@ -165,7 +165,7 @@ strmNextFile(strm_t *pThis)
pThis->iCurrFNum = (pThis->iCurrFNum + 1) % pThis->iMaxFiles;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -228,7 +228,7 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC)
//dbgprintf("ReadChar: read %c, offset %d\n", *pC, pThis->iCurrOffs);
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -285,7 +285,7 @@ finalize_it:
if(iRet != RS_RET_OK && pCStr != NULL)
rsCStrDestruct(pCStr);
- return iRet;
+ RETiRet;
}
#endif /* #if 0 - saved code */
@@ -293,13 +293,13 @@ finalize_it:
/* Standard-Constructor for the strm object
*/
-BEGINobjConstruct(strm)
+BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro! */
pThis->iCurrFNum = 1;
pThis->fd = -1;
pThis->iUngetC = -1;
pThis->sType = STREAMTYPE_FILE_SINGLE;
pThis->sIOBufSize = glblGetIOBufSize();
- pThis->tOpenMode = 0600;
+ pThis->tOpenMode = 0600; /* TODO: make configurable */
ENDobjConstruct(strm)
@@ -319,7 +319,7 @@ rsRetVal strmConstructFinalize(strm_t *pThis)
}
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -347,7 +347,7 @@ rsRetVal strmDestruct(strm_t **ppThis)
free(pThis);
*ppThis = NULL;
- return iRet;
+ RETiRet;
}
@@ -370,7 +370,7 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis)
}
finalize_it:
- return iRet;
+ RETiRet;
}
/* write memory buffer to a stream object.
@@ -413,7 +413,7 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
finalize_it:
pThis->iBufPtr = 0; /* see comment above */
- return iRet;
+ RETiRet;
}
@@ -432,7 +432,7 @@ rsRetVal strmFlush(strm_t *pThis)
iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr);
}
- return iRet;
+ RETiRet;
}
@@ -457,7 +457,7 @@ dbgprintf("seek(%d, %ld): %d\n", pThis->fd, offs, i);
pThis->iCurrOffs = offs; /* we are now at *this* offset */
pThis->iBufPtr = 0; /* buffer invalidated */
- return iRet;
+ RETiRet;
}
@@ -471,7 +471,7 @@ rsRetVal strmSeekCurrOffs(strm_t *pThis)
ISOBJ_TYPE_assert(pThis, strm);
iRet = strmSeek(pThis, pThis->iCurrOffs);
- return iRet;
+ RETiRet;
}
@@ -492,7 +492,7 @@ rsRetVal strmWriteChar(strm_t *pThis, uchar c)
pThis->iBufPtr++;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -508,7 +508,7 @@ rsRetVal strmWriteLong(strm_t *pThis, long i)
CHKiRet(strmWrite(pThis, szBuf, strlen((char*)szBuf)));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -551,7 +551,7 @@ rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
}
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -581,7 +581,7 @@ rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal)
pThis->iAddtlOpenFlags = iNewVal;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -608,7 +608,7 @@ strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName)
pThis->lenFName = iLenName;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -635,7 +635,7 @@ strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir)
pThis->lenDir = iLenDir;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -678,7 +678,7 @@ rsRetVal strmRecordEnd(strm_t *pThis)
pThis->bInRecord = 0;
iRet = strmCheckNextOutputFile(pThis); /* check if we need to switch files */
- return iRet;
+ RETiRet;
}
/* end stream record support functions */
@@ -723,7 +723,7 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
CHKiRet(objEndSerialize(pStrm));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -763,7 +763,7 @@ rsRetVal strmSetProperty(strm_t *pThis, property_t *pProp)
}
finalize_it:
- return iRet;
+ RETiRet;
}
#undef isProp
diff --git a/stringbuf.c b/stringbuf.c
index b777c40d..e89ce768 100755
--- a/stringbuf.c
+++ b/stringbuf.c
@@ -180,7 +180,7 @@ static rsRetVal rsCStrExtendBuf(rsCStrObj *pThis, size_t iMinNeeded)
pThis->pBuf = pNewBuf;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -191,7 +191,7 @@ finalize_it:
*/
rsRetVal rsCStrAppendStrWithLen(rsCStrObj *pThis, uchar* psz, size_t iStrLen)
{
- rsRetVal iRet;
+ DEFiRet;
rsCHECKVALIDOBJECT(pThis, OIDrsCStr);
assert(psz != NULL);
@@ -206,7 +206,7 @@ rsRetVal rsCStrAppendStrWithLen(rsCStrObj *pThis, uchar* psz, size_t iStrLen)
pThis->iStrLen += iStrLen;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -223,15 +223,16 @@ rsRetVal rsCStrAppendStr(rsCStrObj *pThis, uchar* psz)
rsRetVal rsCStrAppendInt(rsCStrObj *pThis, long i)
{
- rsRetVal iRet;
+ DEFiRet;
uchar szBuf[32];
rsCHECKVALIDOBJECT(pThis, OIDrsCStr);
- if((iRet = srUtilItoA((char*) szBuf, sizeof(szBuf), i)) != RS_RET_OK)
- return iRet;
+ CHKiRet(srUtilItoA((char*) szBuf, sizeof(szBuf), i));
- return rsCStrAppendStr(pThis, szBuf);
+ iRet = rsCStrAppendStr(pThis, szBuf);
+finalize_it:
+ RETiRet;
}
@@ -255,7 +256,7 @@ rsRetVal rsCStrAppendChar(rsCStrObj *pThis, uchar c)
}
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -420,7 +421,7 @@ finalize_it:
free(pThis->pBuf);
RSFREEOBJ(pThis);
- return(iRet);
+ RETiRet;
}
@@ -625,10 +626,12 @@ int rsCStrStartsWithSzStr(rsCStrObj *pCS1, uchar *psz, size_t iLenSz)
int rsCStrSzStrMatchRegex(rsCStrObj *pCS1, uchar *psz)
{
regex_t preq;
+ BEGINfunc
regcomp(&preq, (char*) rsCStrGetSzStr(pCS1), 0);
- int iRet = regexec(&preq, (char*) psz, 0, NULL, 0);
+ int ret = regexec(&preq, (char*) psz, 0, NULL, 0);
regfree(&preq);
- return iRet;
+ ENDfunc
+ return ret;
}
/* compare a rsCStr object with a classical sz string. This function
@@ -654,6 +657,7 @@ int rsCStrSzStrMatchRegex(rsCStrObj *pCS1, uchar *psz)
*/
int rsCStrOffsetSzStrCmp(rsCStrObj *pCS1, size_t iOffset, uchar *psz, size_t iLenSz)
{
+ BEGINfunc
rsCHECKVALIDOBJECT(pCS1, OIDrsCStr);
assert(iOffset < pCS1->iStrLen);
assert(psz != NULL);
@@ -662,9 +666,10 @@ int rsCStrOffsetSzStrCmp(rsCStrObj *pCS1, size_t iOffset, uchar *psz, size_t iLe
/* we are using iLenSz below, because the lengths
* are equal and iLenSz is faster to access
*/
- if(iLenSz == 0)
+ if(iLenSz == 0) {
return 0; /* zero-sized strings are equal ;) */
- else { /* we now have two non-empty strings of equal
+ ENDfunc
+ } else { /* we now have two non-empty strings of equal
* length, so we need to actually check if they
* are equal.
*/
@@ -675,10 +680,13 @@ int rsCStrOffsetSzStrCmp(rsCStrObj *pCS1, size_t iOffset, uchar *psz, size_t iLe
}
/* if we arrive here, the strings are equal */
return 0;
+ ENDfunc
}
}
- else
+ else {
return pCS1->iStrLen - iOffset - iLenSz;
+ ENDfunc
+ }
}
diff --git a/syslogd.c b/syslogd.c
index 7801d41f..09a246f9 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -178,6 +178,8 @@
#include "threads.h"
#include "queue.h"
#include "stream.h"
+#include "wti.h"
+#include "wtp.h"
/* We define our own set of syslog defintions so that we
* do not need to rely on (possibly different) implementations.
@@ -259,7 +261,6 @@ char ctty[] = _PATH_CONSOLE; /* this is read-only; used by omfile -- TODO: remov
static pid_t myPid; /* our pid for use in self-generated messages, e.g. on startup */
/* mypid is read-only after the initial fork() */
-static int debugging_on = 0; /* read-only, except on sig USR1 */
static int restart = 0; /* do restart (config read) - multithread safe */
int glblHadMemShortage = 0; /* indicates if we had memory shortage some time during the run */
@@ -368,7 +369,6 @@ static int bDropTrailingLF = 1; /* drop trailing LF's on reception? */
int iCompatibilityMode = 0; /* version we should be compatible with; 0 means sysklogd. It is
the default, so if no -c<n> option is given, we make ourselvs
as compatible to sysklogd as possible. */
-int Debug; /* debug flag - read-only after startup */
static int bDebugPrintTemplateList = 1;/* output template list in debug mode? */
static int bDebugPrintCfSysLineHandlerList = 1;/* output cfsyslinehandler list in debug mode? */
static int bDebugPrintModuleList = 1;/* output module list in debug mode? */
@@ -558,7 +558,6 @@ static void debug_switch();
static rsRetVal cfline(uchar *line, selector_t **pfCurr);
static int decode(uchar *name, struct code *codetab);
static void sighup_handler();
-//static void die(int sig);
static void freeSelectors(void);
static rsRetVal processConfFile(uchar *pConfFile);
static rsRetVal selectorAddList(selector_t *f);
@@ -1157,7 +1156,7 @@ finalize_it:
}
}
*ppThis = pThis;
- return iRet;
+ RETiRet;
}
@@ -1315,7 +1314,7 @@ rsRetVal printline(char *hname, char *msg, int bParseHost)
logmsg(pri, pMsg, SYNC_FILE);
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -1531,7 +1530,7 @@ logmsgInternal(int pri, char *msg, int flags)
logmsg(pri, pMsg, flags);
}
finalize_it:
- return iRet;
+ RETiRet;
}
/* This functions looks at the given message and checks if it matches the
@@ -1761,7 +1760,7 @@ finalize_it:
UnlockObj(pAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */
pthread_setcancelstate(iCancelStateSave, NULL);
- return iRet;
+ RETiRet;
}
@@ -1798,7 +1797,7 @@ DEFFUNC_llExecFunc(processMsgDoActions)
}
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -1812,6 +1811,7 @@ processMsg(msg_t *pMsg)
int bContinue;
processMsgDoActions_t DoActData;
+ BEGINfunc
assert(pMsg != NULL);
/* log the message to the particular outputs */
@@ -1829,6 +1829,7 @@ processMsg(msg_t *pMsg)
if(llExecFunc(&f->llActList, processMsgDoActions, (void*)&DoActData) == RS_RET_DISCARDMSG)
bContinue = 0;
}
+ ENDfunc
}
@@ -1843,6 +1844,7 @@ processMsg(msg_t *pMsg)
static rsRetVal
msgConsumer(void *pUsr)
{
+ DEFiRet;
msg_t *pMsg = (msg_t*) pUsr;
assert(pMsg != NULL);
@@ -1850,7 +1852,7 @@ msgConsumer(void *pUsr)
processMsg(pMsg);
MsgDestruct(&pMsg);
- return RS_RET_OK;
+ RETiRet;
}
@@ -2251,6 +2253,7 @@ logmsg(int pri, msg_t *pMsg, int flags)
char *msg;
char PRItext[20];
+ BEGINfunc
assert(pMsg != NULL);
assert(pMsg->pszUxTradMsg != NULL);
msg = (char*) pMsg->pszUxTradMsg;
@@ -2320,6 +2323,7 @@ logmsg(int pri, msg_t *pMsg, int flags)
pMsg->msgFlags = flags;
MsgPrepareEnqueue(pMsg);
queueEnqObj(pMsgQueue, (void*) pMsg);
+ ENDfunc
}
@@ -2432,7 +2436,7 @@ finalize_it:
pAction->f_pMsg = pMsgSave; /* restore it */
}
- return iRet;
+ RETiRet;
}
@@ -2545,6 +2549,7 @@ void logerror(char *type)
char buf[1024];
char errStr[1024];
+ BEGINfunc
dbgprintf("Called logerr, msg: %s\n", type);
if (errno == 0)
@@ -2556,6 +2561,7 @@ void logerror(char *type)
buf[sizeof(buf)/sizeof(char) - 1] = '\0'; /* just to be on the safe side... */
errno = 0;
logmsgInternal(LOG_SYSLOG|LOG_ERR, buf, ADDDATE);
+ ENDfunc
return;
}
@@ -2569,7 +2575,12 @@ void logerror(char *type)
*/
static void doDie(int sig)
{
+ static int iRetries = 0; /* debug aid */
dbgprintf("DoDie called.\n");
+ if(iRetries++ == 4) {
+ dbgprintf("DoDie called 5 times - unconditional exit\n");
+ exit(1);
+ }
bFinished = sig;
}
@@ -2663,6 +2674,9 @@ die(int sig)
if(pModDir != NULL)
free(pModDir);
+ /* exit classes... */
+ dbgClassExit();
+
dbgprintf("Clean shutdown completed, bye.\n");
exit(0); /* "good" exit, this is the terminator function for rsyslog [die()] */
}
@@ -2747,7 +2761,7 @@ finalize_it:
if(pDir != NULL)
closedir(pDir);
- return iRet;
+ RETiRet;
}
@@ -2797,7 +2811,7 @@ static rsRetVal doIncludeLine(uchar **pp, __attribute__((unused)) void* pVal)
globfree(&cfgFiles);
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -2873,7 +2887,7 @@ static rsRetVal doModLoad(uchar **pp, __attribute__((unused)) void* pVal)
skipWhiteSpace(pp); /* skip over any whitespace */
finalize_it:
- return iRet;
+ RETiRet;
}
/* parse and interpret a $-config line that starts with
@@ -2933,7 +2947,7 @@ static rsRetVal doNameLine(uchar **pp, void* pVal)
*pp = p;
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -2995,7 +3009,7 @@ rsRetVal cfsysline(uchar *p)
}
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -3059,7 +3073,7 @@ DEFFUNC_llExecFunc(dbgPrintInitInfoAction)
iRet = actionDbgPrint((action_t*) pData);
printf("\n");
- return iRet;
+ RETiRet;
}
/* print debug information as part of init(). This pretty much
@@ -3235,7 +3249,7 @@ finalize_it:
dbgprintf("error %d processing config file '%s'; os error (if any): %s\n",
iRet, pConfFile, errStr);
}
- return iRet;
+ RETiRet;
}
@@ -3263,6 +3277,7 @@ startInputModules(void)
pMod = modGetNxtType(pMod, eMOD_IN);
}
+ ENDfunc
return RS_RET_OK; /* intentional: we do not care about module errors */
}
@@ -3448,6 +3463,7 @@ init(void)
sigaction(SIGHUP, &sigAct, NULL);
dbgprintf(" (re)started.\n");
+ ENDfunc
}
@@ -3510,7 +3526,7 @@ rsRetVal cflineParseTemplateName(uchar** pp, omodStringRequest_t *pOMSR, int iEn
finalize_it:
*pp = p;
- return iRet;
+ RETiRet;
}
/* Helper to cfline(). Parses a file name up until the first
@@ -3540,7 +3556,7 @@ rsRetVal cflineParseFileName(uchar* p, uchar *pFileName, omodStringRequest_t *pO
iRet = cflineParseTemplateName(&p, pOMSR, iEntry, iTplOpts, (uchar*) " TradFmt");
- return iRet;
+ RETiRet;
}
@@ -3999,7 +4015,7 @@ finalize_it:
actionDestruct(pAction);
}
- return iRet;
+ RETiRet;
}
@@ -4037,7 +4053,7 @@ static rsRetVal cflineDoFilter(uchar **pp, selector_t *f)
return(iRet);
}
- return iRet;
+ RETiRet;
}
@@ -4087,7 +4103,7 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction)
}
*ppAction = pAction;
- return iRet;
+ RETiRet;
}
@@ -4105,7 +4121,7 @@ DEFFUNC_llExecFunc(selectorAddListCheckActionsChecker)
Forwarding++;
}
- return iRet;
+ RETiRet;
}
/* loop through a list of actions and perform necessary checks and
@@ -4123,7 +4139,7 @@ static rsRetVal selectorAddListCheckActions(selector_t *f)
CHKiRet(llExecFunc(&f->llActList, selectorAddListCheckActionsChecker, NULL));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -4174,7 +4190,7 @@ static rsRetVal selectorAddList(selector_t *f)
}
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -4215,7 +4231,7 @@ static rsRetVal cflineClassic(uchar *p, selector_t **pfCurr)
finalize_it:
*pfCurr = fCurr;
- return iRet;
+ RETiRet;
}
@@ -4249,7 +4265,7 @@ static rsRetVal cfline(uchar *line, selector_t **pfCurr)
break;
}
- return iRet;
+ RETiRet;
}
@@ -4278,7 +4294,7 @@ static rsRetVal setMainMsgQueType(void __attribute__((unused)) *pVal, uchar *psz
}
free(pszType); /* no longer needed */
- return iRet;
+ RETiRet;
}
@@ -4312,49 +4328,6 @@ int decode(uchar *name, struct code *codetab)
return (-1);
}
-extern void dbgprintf(char *fmt, ...) __attribute__((format(printf,1, 2)));
-void
-dbgprintf(char *fmt, ...)
-{
- static pthread_t ptLastThrdID = 0;
- static int bWasNL = FALSE;
- va_list ap;
-
- if ( !(Debug && debugging_on) )
- return;
-
- /* The bWasNL handler does not really work. It works if no thread
- * switching occurs during non-NL messages. Else, things are messed
- * up. Anyhow, it works well enough to provide useful help during
- * getting this up and running. It is questionable if the extra effort
- * is worth fixing it, giving the limited appliability.
- * rgerhards, 2005-10-25
- * I have decided that it is not worth fixing it - especially as it works
- * pretty well.
- * rgerhards, 2007-06-15
- */
- if(ptLastThrdID != pthread_self()) {
- if(!bWasNL) {
- fprintf(stdout, "\n");
- bWasNL = 1;
- }
- ptLastThrdID = pthread_self();
- }
-
- if(bWasNL) {
- fprintf(stdout, "%8.8x: ", (unsigned int) pthread_self());
- //fprintf(stderr, "%8.8x: ", (unsigned int) pthread_self());
- }
- bWasNL = (*(fmt + strlen(fmt) - 1) == '\n') ? TRUE : FALSE;
- va_start(ap, fmt);
- vfprintf(stdout, fmt, ap);
- //vfprintf(stderr, fmt, ap);
- va_end(ap);
-
- //fflush(stderr);
- fflush(stdout);
- return;
-}
/*
@@ -4448,6 +4421,7 @@ mainloop(void)
{
struct timeval tvSelectTimeout;
+ BEGINfunc
while(!bFinished){
/* first check if we have any internal messages queued and spit them out */
/* TODO: do we need this any longer? I doubt it, but let's care about it
@@ -4493,6 +4467,7 @@ mainloop(void)
continue;
}
}
+ ENDfunc
}
/* If user is not root, prints warnings or even exits
@@ -4542,16 +4517,20 @@ static rsRetVal loadBuildInModules(void)
{
DEFiRet;
- if((iRet = doModInit(modInitFile, (uchar*) "builtin-file", NULL)) != RS_RET_OK)
- return iRet;
+ if((iRet = doModInit(modInitFile, (uchar*) "builtin-file", NULL)) != RS_RET_OK) {
+ RETiRet;
+ }
#ifdef SYSLOG_INET
- if((iRet = doModInit(modInitFwd, (uchar*) "builtin-fwd", NULL)) != RS_RET_OK)
- return iRet;
+ if((iRet = doModInit(modInitFwd, (uchar*) "builtin-fwd", NULL)) != RS_RET_OK) {
+ RETiRet;
+ }
#endif
- if((iRet = doModInit(modInitShell, (uchar*) "builtin-shell", NULL)) != RS_RET_OK)
- return iRet;
- if((iRet = doModInit(modInitDiscard, (uchar*) "builtin-discard", NULL)) != RS_RET_OK)
- return iRet;
+ if((iRet = doModInit(modInitShell, (uchar*) "builtin-shell", NULL)) != RS_RET_OK) {
+ RETiRet;
+ }
+ if((iRet = doModInit(modInitDiscard, (uchar*) "builtin-discard", NULL)) != RS_RET_OK) {
+ RETiRet;
+ }
/* dirty, but this must be for the time being: the usrmsg module must always be
* loaded as last module. This is because it processes any time of action selector.
@@ -4563,7 +4542,7 @@ static rsRetVal loadBuildInModules(void)
* [a-zA-Z0-9_.]
*/
if((iRet = doModInit(modInitUsrMsg, (uchar*) "builtin-usrmsg", NULL)) != RS_RET_OK)
- return iRet;
+ RETiRet;
/* ok, initialization of the command handler probably does not 100% belong right in
* this space here. However, with the current design, this is actually quite a good
@@ -4607,7 +4586,7 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -4708,6 +4687,7 @@ static void mainThread()
*/
mainloop();
+ ENDfunc
}
@@ -4719,20 +4699,26 @@ static rsRetVal InitGlobalClasses(void)
DEFiRet;
CHKiRet(objClassInit()); /* *THIS* *MUST* always be the first class initilizere called! */
+fprintf(stdout, " calling dbgClassInit\n");
+ //CHKiRet(dbgClassInit());
CHKiRet(MsgClassInit());
CHKiRet(strmClassInit());
+ CHKiRet(wtiClassInit());
+ CHKiRet(wtpClassInit());
CHKiRet(queueClassInit());
finalize_it:
- return iRet;
+ RETiRet;
}
+
/* This is the main entry point into rsyslogd. Over time, we should try to
* modularize it a bit more...
*/
int main(int argc, char **argv)
{
+dbgClassInit();
DEFiRet;
register int i;
@@ -4987,6 +4973,8 @@ int main(int argc, char **argv)
memset(&sigAct, 0, sizeof (sigAct));
sigemptyset(&sigAct.sa_mask);
+ sigAct.sa_handler = sigsegvHdlr;
+ sigaction(SIGSEGV, &sigAct, NULL);
sigAct.sa_handler = doDie;
sigaction(SIGTERM, &sigAct, NULL);
sigAct.sa_handler = Debug ? doDie : SIG_IGN;
@@ -4995,7 +4983,8 @@ int main(int argc, char **argv)
sigAct.sa_handler = reapchild;
sigaction(SIGCHLD, &sigAct, NULL);
sigAct.sa_handler = Debug ? debug_switch : SIG_IGN;
- sigaction(SIGUSR1, &sigAct, NULL);
+// TODO: use signal 2
+ //sigaction(SIGUSR1, &sigAct, NULL);
sigAct.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &sigAct, NULL);
sigaction(SIGXFSZ, &sigAct, NULL); /* do not abort if 2gig file limit is hit */
@@ -5012,6 +5001,7 @@ finalize_it:
if(iRet != RS_RET_OK)
fprintf(stderr, "rsyslogd run failed with error %d.\n", iRet);
+ ENDfunc
return 0;
}
diff --git a/tcpsyslog.c b/tcpsyslog.c
index 01655620..80f0d82a 100644
--- a/tcpsyslog.c
+++ b/tcpsyslog.c
@@ -1220,7 +1220,7 @@ static rsRetVal TCPSendBldFrame(TCPFRAMINGMODE rqdFraming, char **pmsg, size_t *
}
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -1282,7 +1282,7 @@ int TCPSend(void *pData, char *msg, size_t len, TCPFRAMINGMODE rqdFraming,
finalize_it:
if(bMsgMustBeFreed)
free(msg);
- return iRet;
+ RETiRet;
}
diff --git a/template.c b/template.c
index ac436081..12096c91 100644
--- a/template.c
+++ b/template.c
@@ -132,7 +132,7 @@ rsRetVal tplToString(struct template *pTpl, msg_t *pMsg, uchar** ppSz)
finalize_it:
*ppSz = (iRet == RS_RET_OK) ? pVal : NULL;
- return iRet;
+ RETiRet;
}
/* Helper to doSQLEscape. This is called if doSQLEscape
diff --git a/threads.c b/threads.c
index 8ce5515d..c7b5f4e4 100644
--- a/threads.c
+++ b/threads.c
@@ -143,6 +143,7 @@ static void* thrdStarter(void *arg)
iRet = pThis->pUsrThrdMain(pThis);
dbgprintf("thrdStarter: usrThrdMain 0x%lx returned with iRet %d, exiting now.\n", (unsigned long) pThis->thrdID, iRet);
+ ENDfunc
pthread_exit(0);
}
@@ -166,7 +167,7 @@ rsRetVal thrdCreate(rsRetVal (*thrdMain)(thrdInfo_t*), rsRetVal(*afterRun)(thrdI
CHKiRet(llAppend(&llThrds, NULL, pThis));
finalize_it:
- return iRet;
+ RETiRet;
}
@@ -195,7 +196,7 @@ rsRetVal thrdInit(void)
sigAct.sa_handler = sigusr2Dummy;
sigaction(SIGUSR2, &sigAct, NULL);
- return iRet;
+ RETiRet;
}
@@ -208,7 +209,7 @@ rsRetVal thrdExit(void)
iRet = llDestroy(&llThrds);
- return iRet;
+ RETiRet;
}
@@ -232,7 +233,7 @@ thrdSleep(thrdInfo_t *pThis, int iSeconds, int iuSeconds)
select(1, NULL, NULL, NULL, &tvSelectTimeout);
if(pThis->bShallStop)
iRet = RS_RET_TERMINATE_NOW;
- return iRet;
+ RETiRet;
}
diff --git a/wti.c b/wti.c
index 0e33b60f..045330c6 100644
--- a/wti.c
+++ b/wti.c
@@ -73,18 +73,20 @@ wtiGetDbgHdr(wti_t *pThis)
* bite in the long term... -- rgerhards, 2008-01-17
* TODO: may be performance optimized by atomic operations
*/
-static inline qWrkCmd_t
+qWrkCmd_t
wtiGetState(wti_t *pThis, int bLockMutex)
{
DEFVARS_mutexProtection;
qWrkCmd_t tCmd;
+ BEGINfunc
ISOBJ_TYPE_assert(pThis, wti);
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
tCmd = pThis->tCurrCmd;
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ ENDfunc
return tCmd;
}
@@ -95,19 +97,20 @@ wtiGetState(wti_t *pThis, int bLockMutex)
* in an active state. -- rgerhards, 2008-01-20
*/
rsRetVal
-wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly)
+wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex)
{
DEFiRet;
- DEFVARS_mutex_cancelsafeLock;
+ DEFVARS_mutexProtection;
ISOBJ_TYPE_assert(pThis, wti);
assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE);
- mutex_cancelsafe_lock(&pThis->mut);
+ BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
+RUNLOG_VAR("%d", bActiveOnly);
/* all worker states must be followed sequentially, only termination can be set in any state */
if( (bActiveOnly && (pThis->tCurrCmd < eWRKTHRD_RUN_CREATED))
- || (pThis->tCurrCmd > tCmd && tCmd != eWRKTHRD_TERMINATING)) {
+ || (pThis->tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) {
dbgprintf("%s: command %d can not be accepted in current %d processing state - ignored\n",
wtiGetDbgHdr(pThis), tCmd, pThis->tCurrCmd);
} else {
@@ -137,7 +140,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly)
pThis->tCurrCmd = tCmd; /* apply the new state */
}
- mutex_cancelsafe_unlock(&pThis->mut);
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
}
@@ -188,15 +191,17 @@ ENDobjConstruct(wti)
rsRetVal
wtiConstructFinalize(wti_t *pThis)
{
+ DEFiRet;
+
ISOBJ_TYPE_assert(pThis, wti);
dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
/* initialize our thread instance descriptor */
- pThis->pUsr = NULL;
+ pThis->pUsrp = NULL;
pThis->tCurrCmd = eWRKTHRD_STOPPED;
- return RS_RET_OK;
+ RETiRet;
}
@@ -211,7 +216,9 @@ wtiJoinThrd(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
dbgprintf("wti: waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd);
pthread_join(pThis->thrdID, NULL);
- wtiSetState(pThis, eWRKTHRD_STOPPED, 0); /* back to virgin... */
+RUNLOG;
+ wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */
+RUNLOG;
pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
dbgprintf("wti: worker %s has stopped\n", wtiGetDbgHdr(pThis));
@@ -261,6 +268,7 @@ wtiWorkerCancelCleanup(void *arg)
wtp_t *pWtp;
int iCancelStateSave;
+ BEGINfunc
ISOBJ_TYPE_assert(pThis, wti);
pWtp = pThis->pWtp;
ISOBJ_TYPE_assert(pWtp, wtp);
@@ -268,17 +276,17 @@ wtiWorkerCancelCleanup(void *arg)
dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
/* call user supplied handler (that one e.g. requeues the element) */
- pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr);
+ pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pWtp->mut);
- wtiSetState(pThis, eWRKTHRD_TERMINATING, 0);
+ wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
// TODO: sync access!
pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
- pthread_cond_signal(&pWtp->condThrdTrm); /* activate anyone waiting on thread shutdown */
d_pthread_mutex_unlock(&pWtp->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
+ ENDfunc
}
@@ -318,8 +326,11 @@ wtiWorker(wti_t *pThis)
pWtp = pThis->pWtp; /* shortcut */
ISOBJ_TYPE_assert(pWtp, wtp);
+ dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
+ pWtp->pfOnWorkerStartup(pWtp->pUsr);
+
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd);
@@ -349,21 +360,22 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis),
if(pWtp->toWrkShutdown == -1) {
dbgprintf("worker never times out!\n"); // DEL
/* never shut down any started worker */
- pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
+ d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
} else {
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
- if(pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
+ if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
dbgprintf("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
bInactivityTOOccured = 1; /* indicate we had a timeout */
}
}
- dbgprintf("%s: post condwait ->notEmpty\n", wtiGetDbgHdr(pThis)); // DEL
+ dbgprintf("%s: post condwait ->Busy or timeout\n", wtiGetDbgHdr(pThis)); // DEL
END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
continue; /* request next iteration */
}
/* if we reach this point, we have a non-empty queue (and are still protected by mutex) */
- pWtp->pfDoWork(pThis, iCancelStateSave);
+ dbgprintf("%s: calling consumer\n", wtiGetDbgHdr(pThis));
+ pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
/* TODO: move this above into one of the chck Term functions */
//if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0)
@@ -371,6 +383,8 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis),
// " %d messages to process.\n", wtiGetDbgHdr(pThis), pThis->iQueueSize);
}
+ pWtp->pfOnWorkerShutdown(pWtp->pUsr);
+
/* indicate termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
dbgprintf("%s: worker waiting for mutex\n", wtiGetDbgHdr(pThis));
@@ -391,11 +405,9 @@ dbgprintf("%s: setting termination state\n", wtiGetDbgHdr(pThis));
wtiSetState(pWrkrInst, eWRKTHRD_TERMINATING, 0);
}
#else
- wtiSetState(pThis, eWRKTHRD_TERMINATING, 0);
+ wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED);
#endif
- // TODO: call, mutex:
pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
- pthread_cond_signal(&pWtp->condThrdTrm); /* activate anyone waiting on thread shutdown */
d_pthread_mutex_unlock(&pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -404,6 +416,7 @@ dbgprintf("%s: setting termination state\n", wtiGetDbgHdr(pThis));
/* some simple object access methods */
+DEFpropSetMeth(wti, pWtp, wtp_t*);
/* set the debug header message
* The passed-in string is duplicated. So if the caller does not need
@@ -441,7 +454,7 @@ finalize_it:
* rgerhards, 2008-01-09
*/
BEGINObjClassInit(wti, 1) /* one is the object version (most important for persisting) */
-ENDObjClassInit(queue)
+ENDObjClassInit(wti)
/*
* vi:set ai:
diff --git a/wti.h b/wti.h
index 4b028f73..db782d2b 100644
--- a/wti.h
+++ b/wti.h
@@ -32,7 +32,7 @@ typedef struct wti_s {
BEGINobjInstance;
pthread_t thrdID; /* thread ID */
qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
- obj_t *pUsr; /* current user object being processed (or NULL if none) */
+ obj_t *pUsrp; /* pointer to an object meaningful for current user pointer (e.g. queue pUsr data elemt) */
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */
pthread_mutex_t mut;
@@ -47,8 +47,13 @@ rsRetVal wtiConstruct(wti_t **ppThis);
rsRetVal wtiConstructFinalize(wti_t *pThis);
rsRetVal wtiDestruct(wti_t **ppThis);
rsRetVal wtiWorker(wti_t *pThis);
+rsRetVal wtiProcessThrdChanges(wti_t *pThis, int bLockMutex);
+rsRetVal wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg);
+rsRetVal wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex);
+rsRetVal wtiJoinThrd(wti_t *pThis);
+qWrkCmd_t wtiGetState(wti_t *pThis, int bLockMutex);
PROTOTYPEObjClassInit(wti);
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
-#define wtiGetID(pThis) ((unsigned long) pThis)
+PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*);
#endif /* #ifndef WTI_H_INCLUDED */
diff --git a/wtp.c b/wtp.c
index 4bc0cd4f..4c3ea921 100644
--- a/wtp.c
+++ b/wtp.c
@@ -71,52 +71,59 @@ wtpGetDbgHdr(wtp_t *pThis)
/* Not implemented dummy function for constructor */
-static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; }
+static rsRetVal NotImplementedDummy() { return RS_RET_OK; }
/* Standard-Constructor for the wtp object
*/
BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */
- int i;
- uchar pszBuf[64];
- size_t lenBuf;
- wti_t *pWti;
-
pthread_mutex_init(&pThis->mut, NULL);
pthread_cond_init(&pThis->condThrdTrm, NULL);
/* set all function pointers to "not implemented" dummy so that we can safely call them */
pThis->pfChkStopWrkr = NotImplementedDummy;
pThis->pfIsIdle = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
- pThis->pfOnShutdownAdvise = NotImplementedDummy;
pThis->pfOnIdle = NotImplementedDummy;
pThis->pfOnWorkerCancel = NotImplementedDummy;
+ pThis->pfOnWorkerStartup = NotImplementedDummy;
+ pThis->pfOnWorkerShutdown = NotImplementedDummy;
+ENDobjConstruct(wtp)
+
+
+/* Construction finalizer
+ * rgerhards, 2008-01-17
+ */
+rsRetVal
+wtpConstructFinalize(wtp_t *pThis)
+{
+ DEFiRet;
+ int i;
+ uchar pszBuf[64];
+ size_t lenBuf;
+ wti_t *pWti;
- /* alloc and construct workers */
+ ISOBJ_TYPE_assert(pThis, wtp);
+
+ dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis));
+ /* alloc and construct workers - this can only be done in finalizer as we previously do
+ * not know the max number of workers
+ */
+RUNLOG;
if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+RUNLOG_VAR("%d", i);
+RUNLOG_VAR("%p", pThis->pWrkr[i]);
CHKiRet(wtiConstruct(&pThis->pWrkr[i]));
pWti = pThis->pWrkr[i];
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s/w%d", wtpGetDbgHdr(pThis), i);
CHKiRet(wtiSetDbgHdr(pWti, pszBuf, lenBuf));
+ CHKiRet(wtiSetpWtp(pWti, pThis));
CHKiRet(wtiConstructFinalize(pWti));
}
-finalize_it:
-ENDobjConstruct(wtp)
-
-/* Construction finalizer
- * rgerhards, 2008-01-17
- */
-rsRetVal
-wtpConstructFinalize(wtp_t __attribute__((unused)) *pThis)
-{
- ISOBJ_TYPE_assert(pThis, wtp);
-
- dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis));
-
- return RS_RET_OK;
+finalize_it:
+ RETiRet;
}
@@ -129,6 +136,8 @@ wtpDestruct(wtp_t **ppThis)
int iCancelStateSave;
int i;
+dbgPrintAllDebugInfo();
+RUNLOG;
assert(ppThis != NULL);
pThis = *ppThis;
ISOBJ_TYPE_assert(pThis, wtp);
@@ -161,17 +170,31 @@ wtpDestruct(wtp_t **ppThis)
}
-/* wake up all worker threads. Param bWithDAWrk tells if the DA worker
- * is to be awaken, too. It needs special handling because it waits on
- * two different conditions depending on processing state.
+/* wake up at least one worker thread.
+ * rgerhards, 2008-01-20
+ */
+rsRetVal
+wtpWakeupWrkr(wtp_t *pThis)
+{
+ DEFiRet;
+
+ // TODO; mutex?
+ ISOBJ_TYPE_assert(pThis, wtp);
+dbgprintf("wtpWakeupWrkr 1, cond %p\n", pThis->pcondBusy);
+ pthread_cond_signal(pThis->pcondBusy);
+dbgprintf("wtpWakeupWrkr 2\n");
+ RETiRet;
+}
+/* wake up all worker threads.
* rgerhards, 2008-01-16
*/
-static inline rsRetVal
-wtpWakeupWrks(wtp_t *pThis)
+rsRetVal
+wtpWakeupAllWrkr(wtp_t *pThis)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, wtp);
+ // TODO; mutex?
pthread_cond_broadcast(pThis->pcondBusy);
RETiRet;
}
@@ -189,8 +212,10 @@ wtpProcessThrdChanges(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
+ RUNLOG;
if(pThis->bThrdStateChanged == 0)
FINALIZE;
+ RUNLOG;
/* go through all threads */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
@@ -198,6 +223,7 @@ wtpProcessThrdChanges(wtp_t *pThis)
}
finalize_it:
+ RUNLOG;
RETiRet;
}
@@ -218,39 +244,68 @@ wtpSetState(wtp_t *pThis, wtpState_t iNewState)
}
-#if 0
+/* check if the worker shall shutdown (1 = yes, 0 = no)
+ * TODO: check if we can use atomic operations to enhance performance
+ * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user"
+ * (e.g. the queue clas)
+ * rgerhards, 2008-01-21
+ */
+rsRetVal
+wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
+{
+ DEFiRet;
+ DEFVARS_mutexProtection;
+
+ BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
+ if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
+ || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex)))
+ iRet = RS_RET_TERMINATE_NOW;
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+
+ /* try customer handler if one was set and we do not yet have a definite result */
+ if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) {
+ iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
+ }
+
+ RETiRet;
+}
+
+
/* Send a shutdown command to all workers and see if they terminate.
* A timeout may be specified.
* rgerhards, 2008-01-14
*/
-static rsRetVal
-wtpWrkrShutdown(wtp_t *pThis)
+rsRetVal
+wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, long iTimeout)
{
DEFiRet;
int bTimedOut;
struct timespec t;
int iCancelStateSave;
- // TODO: implement
- ISOBJ_TYPE_assert(pThis);
- queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */
- queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
- /* race: must make sure all are running! */
- queueTimeoutComp(&t, iTimeout);/* get timeout */
+dbgPrintAllDebugInfo();
+RUNLOG_VAR("%p", pThis);
+RUNLOG_VAR("%ld", iTimeout);
+RUNLOG_VAR("%d", tShutdownCmd);
+ ISOBJ_TYPE_assert(pThis, wtp);
+
+ wtpSetState(pThis, tShutdownCmd);
+ wtpWakeupAllWrkr(pThis);
+ timeoutComp(&t, iTimeout);/* get timeout */
/* and wait for their termination */
-dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt);
+dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- d_pthread_mutex_lock(&pThis->mutThrdMgmt);
- pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt);
+ d_pthread_mutex_lock(&pThis->mut);
+ pthread_cleanup_push(mutexCancelCleanup, &pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
- dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n",
- queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd);
+ dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
+ wtpGetDbgHdr(pThis), iTimeout, pThis->iCurNumWrkThrd);
- if(pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutThrdMgmt, &t) != 0) {
- dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", queueGetID(pThis));
+ if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, &t) != 0) {
+ dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis));
bTimedOut = 1; /* we exit the loop on timeout */
}
}
@@ -259,122 +314,61 @@ dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt);
if(bTimedOut)
iRet = RS_RET_TIMED_OUT;
+dbgprintf("wtpShutdownAll exit");
RETiRet;
}
-/* Unconditionally cancel all running worker threads.
- * rgerhards, 2008-01-14
+/* indicate that a thread has terminated and awake anyone waiting on it
+ * rgerhards, 2008-01-23
*/
-static rsRetVal
-wtpWrkrCancel(wtp_t *pThis)
+rsRetVal wtpSignalWrkrTermination(wtp_t *pThis)
{
DEFiRet;
- int i;
- // TODO: we need to implement peek(), without it (today!) we lose one message upon
- // worker cancellation! -- rgerhards, 2008-01-14
-
- ISOB_TYPE_assert(pThis);
- /* process any pending thread requests so that we know who actually is still running */
- queueChkWrkThrdChanges(pThis);
-
- /* awake the workers one more time, just to be sure */
- queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
+ //TODO: mutex or not mutex, that's the question ;)DEFVARS_mutexProtection;
- /* first tell the workers our request */
- for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) {
- dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i);
- pthread_cancel(pThis->pWrkThrds[i].thrdID);
- }
- }
+ ISOBJ_TYPE_assert(pThis, wtp);
+ //BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);
+dbgprintf("signaling thread termination, cond %p\n", &pThis->condThrdTrm);
+ pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
+ //END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
}
-/* Worker thread management function carried out when the main
- * worker is about to terminate.
+/* Unconditionally cancel all running worker threads.
+ * rgerhards, 2008-01-14
*/
-static rsRetVal
-wtpShutdownWorkers(wtp_t *pThis)
+rsRetVal
+wtpCancelAll(wtp_t *pThis)
{
DEFiRet;
int i;
+ // TODO: we need to implement peek(), without it (today!) we lose one message upon
+ // worker cancellation! -- rgerhards, 2008-01-14
- assert(pThis != NULL);
+ ISOBJ_TYPE_assert(pThis, wtp);
- dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis);
+ /* process any pending thread requests so that we know who actually is still running */
+ wtpProcessThrdChanges(pThis);
- ISOB_TYPE_assert(pThis);
- /* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This
- * is necessary so that all threads get sent the termination command. With a timeout of 0, however,
- * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as
- * good.
- */
- iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN, pThis->toQShutdown);
- if(iRet == RS_RET_TIMED_OUT) {
- if(pThis->toQShutdown == 0) {
- iRet = RS_RET_OK;
- } else {
- /* OK, we now need to try force the shutdown */
- dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n",
- queueGetID(pThis));
- iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown);
- }
- }
+ /* awake the workers one more time, just to be sure */
+ wtpWakeupAllWrkr(pThis);
- if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */
- /* still didn't work out - so we now need to cancel the workers */
- dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis);
- iRet = queueWrkThrdCancel(pThis);
- }
-
- /* finally join the threads
- * In case of a cancellation, this may actually take some time. This is also
- * needed to clean up the thread descriptors, even with a regular termination.
- * And, most importantly, this is needed if we have an indifitite termination
- * time set (timeout == 0)! -- rgerhards, 2008-01-14
- */
+ /* first tell the workers our request */
for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRD_STOPPED) {
- queueJoinWrkThrd(pThis, i);
+ // TODO: mutex lock!
+ if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) {
+ dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
+ pthread_cancel(pThis->pWrkr[i]->thrdID);
}
}
- dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
- queueGetID(pThis), pThis->iQueueSize);
-
RETiRet;
}
-#endif
-
-
-
-/* check if the worker shall shutdown
- * TODO: check if we can use atomic operations to enhance performance
- * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user"
- * (e.g. the queue clas)
- * rgerhards, 2008-01-21
- */
-rsRetVal
-wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
-{
- DEFiRet;
- DEFVARS_mutexProtection;
- BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
- || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex)))
- iRet = RS_RET_TERMINATE_NOW;
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
- /* try customer handler if one was set and we do not yet have a definite result */
- if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL)
- iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
-
- RETiRet;
-}
/* Set the Inactivity Guard
* rgerhards, 2008-01-21
@@ -385,9 +379,13 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex)
DEFiRet;
DEFVARS_mutexProtection;
+RUNLOG;
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
+RUNLOG;
pThis->bInactivityGuard = bNewState;
+RUNLOG;
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+RUNLOG;
RETiRet;
}
@@ -404,6 +402,7 @@ wtpWrkrExecCancelCleanup(void *arg)
ISOBJ_TYPE_assert(pThis, wtp);
pThis->iCurNumWrkThrd--;
+ wtpSignalWrkrTermination(pThis);
dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd);
@@ -415,7 +414,7 @@ wtpWrkrExecCancelCleanup(void *arg)
* rgerhards, 2008-01-21
*/
static void *
-wtpWorker(void *arg)
+wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */
{
DEFiRet;
DEFVARS_mutexProtection;
@@ -442,7 +441,7 @@ wtpWorker(void *arg)
* our init. That would be a bad race... -- rgerhards, 2008-01-16
*/
//if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT)
- wtiSetState(pWti, eWRKTHRD_RUNNING, MUTEX_ALREADY_LOCKED); /* we are running now! */
+ wtiSetState(pWti, eWRKTHRD_RUNNING, 0, MUTEX_ALREADY_LOCKED); /* we are running now! */
do {
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
@@ -459,12 +458,14 @@ wtpWorker(void *arg)
pthread_cleanup_pop(0);
pThis->iCurNumWrkThrd--;
+ wtpSignalWrkrTermination(pThis);
dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n",
wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd);
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ ENDfunc
pthread_exit(0);
}
@@ -492,7 +493,9 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
*/
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
// TODO: sync!
- if(pThis->pWrkr[i]->tCurrCmd == eWRKTHRD_STOPPED) {
+RUNLOG;
+dbgprintf("%s: i %d, wti_T* %p\n", wtpGetDbgHdr(pThis), i, pThis->pWrkr[i]);
+ if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) {
break;
}
}
@@ -502,11 +505,12 @@ dbgprintf("%s: after thrd search: i %d, max %d\n", wtpGetDbgHdr(pThis), i, pThis
ABORT_FINALIZE(RS_RET_NO_MORE_THREADS);
pWti = pThis->pWrkr[i];
- wtiSetState(pWti, eWRKTHRD_RUN_CREATED, LOCK_MUTEX); // TODO: thuink about mutex lock
+ wtiSetState(pWti, eWRKTHRD_RUN_CREATED, 0, LOCK_MUTEX);
iState = pthread_create(&(pWti->thrdID), NULL, wtpWorker, (void*) pWti);
dbgprintf("%s: started with state %d, num workers now %d\n",
wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd);
+RUNLOG;
/* we try to give the starting worker a little boost. It won't help much as we still
* hold the queue's mutex, but at least it has a chance to start on a single-CPU system.
*/
@@ -517,6 +521,7 @@ dbgprintf("%s: after thrd search: i %d, max %d\n", wtpGetDbgHdr(pThis), i, pThis
finalize_it:
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+RUNLOG;
RETiRet;
}
@@ -524,7 +529,9 @@ finalize_it:
/* set the number of worker threads that should be running. If less than currently running,
* a new worker may be started. Please note that there is no guarantee the number of workers
* said will be running after we exit this function. It is just a hint. If the number is
- * higher than one, the "busy" condition is also signaled to awake a worker.
+ * higher than one, and no worker is started, the "busy" condition is signaled to awake a worker.
+ * So the caller can assume that there is at least one worker re-checking if there is "work to do"
+ * after this function call.
* rgerhards, 2008-01-21
*/
rsRetVal
@@ -535,8 +542,10 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
int nMissing; /* number workers missing to run */
int i;
+ if(pThis == NULL) dbgPrintAllDebugInfo();
ISOBJ_TYPE_assert(pThis, wtp);
+dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetDbgHdr(pThis), nMaxWrkr, pThis->iCurNumWrkThrd, pThis->iNumWorkerThreads);
if(nMaxWrkr == 0)
FINALIZE;
@@ -548,15 +557,20 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
else if(nMissing < 0)
nMissing = 0;
- dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
- /* start the rqtd nbr of workers */
- for(i = 0 ; i < nMissing ; ++i) {
- CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED));
+ if(nMissing > 0) {
+ dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
+ /* start the rqtd nbr of workers */
+ for(i = 0 ; i < nMissing ; ++i) {
+ CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED));
+ }
+ } else {
+dbgprintf("wtpAdviseMaxWorkers signals busy\n");
+ wtpWakeupWrkr(pThis);
}
-
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+
finalize_it:
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
}
@@ -564,6 +578,17 @@ finalize_it:
/* some simple object access methods */
DEFpropSetMeth(wtp, toWrkShutdown, long);
DEFpropSetMeth(wtp, wtpState, wtpState_t);
+DEFpropSetMeth(wtp, iNumWorkerThreads, int);
+DEFpropSetMeth(wtp, pUsr, void*);
+DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t);
+DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t);
+DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
+DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int));
+DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int));
+DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));
+DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*));
+DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*));
+DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*));
/* set the debug header message
@@ -572,11 +597,12 @@ DEFpropSetMeth(wtp, wtpState, wtpState_t);
* rgerhards, 2008-01-09
*/
rsRetVal
-wtpSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg)
+wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, wti);
+dbgprintf("objID: %d\n", pThis->pObjInfo->objID);
+ ISOBJ_TYPE_assert(pThis, wtp);
assert(pszMsg != NULL);
if(lenMsg < 1)
diff --git a/wtp.h b/wtp.h
index 45fa2b8d..58fc8a5f 100644
--- a/wtp.h
+++ b/wtp.h
@@ -67,12 +67,13 @@ typedef struct wtp_s {
void *pUsr; /* pointer to user object */
pthread_mutex_t *pmutUsr;
pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */
- rsRetVal (*pfChkStopWrkr)(void *, int);
- rsRetVal (*pfIsIdle)(void *, int);
- rsRetVal (*pfDoWork)(void *, int);
- rsRetVal (*pfOnShutdownAdvise)(void *, int);
- rsRetVal (*pfOnIdle)(void *, int);
- rsRetVal (*pfOnWorkerCancel)(void *);
+ rsRetVal (*pfChkStopWrkr)(void *pUsr, int);
+ rsRetVal (*pfIsIdle)(void *pUsr, int);
+ rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int);
+ rsRetVal (*pfOnIdle)(void *pUsr, int);
+ rsRetVal (*pfOnWorkerCancel)(void *pUsr, void*pWti);
+ rsRetVal (*pfOnWorkerStartup)(void *pUsr);
+ rsRetVal (*pfOnWorkerShutdown)(void *pUsr);
/* end user objects */
uchar *pszDbgHdr; /* header string for debug messages */
} wtp_t;
@@ -89,9 +90,25 @@ rsRetVal wtpProcessThrdChanges(wtp_t *pThis);
rsRetVal wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex);
rsRetVal wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex);
rsRetVal wtpSetState(wtp_t *pThis, wtpState_t iNewState);
+rsRetVal wtpWakeupWrkr(wtp_t *pThis);
+rsRetVal wtpWakeupAllWrkr(wtp_t *pThis);
+rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, long iTimeout);
+rsRetVal wtpCancelAll(wtp_t *pThis);
+rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg);
+rsRetVal wtpSignalWrkrTermination(wtp_t *pWtp);
PROTOTYPEObjClassInit(wtp);
+PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
+PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int));
+PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int));
+PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));
+PROTOTYPEpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*,void*));
+PROTOTYPEpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*));
+PROTOTYPEpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*));
PROTOTYPEpropSetMeth(wtp, toWrkShutdown, long);
PROTOTYPEpropSetMeth(wtp, wtpState, wtpState_t);
-#define wtpGetID(pThis) ((unsigned long) pThis)
+PROTOTYPEpropSetMeth(wtp, iMaxWorkerThreads, int);
+PROTOTYPEpropSetMeth(wtp, pUsr, void*);
+PROTOTYPEpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t);
+PROTOTYPEpropSetMethPTR(wtp, pcondBusy, pthread_cond_t);
#endif /* #ifndef WTP_H_INCLUDED */