diff options
-rw-r--r-- | Makefile.am | 6 | ||||
-rw-r--r-- | action.c | 10 | ||||
-rw-r--r-- | cfsysline.c | 38 | ||||
-rw-r--r-- | debug.c | 6 | ||||
-rw-r--r-- | doc/status.html | 4 | ||||
-rw-r--r-- | iminternal.c | 12 | ||||
-rw-r--r-- | linkedlist.c | 24 | ||||
-rw-r--r-- | module-template.h | 48 | ||||
-rw-r--r-- | modules.c | 12 | ||||
-rw-r--r-- | msg.c | 36 | ||||
-rw-r--r-- | net.c | 28 | ||||
-rw-r--r-- | obj-types.h | 38 | ||||
-rw-r--r-- | obj.c | 36 | ||||
-rw-r--r-- | obj.h | 14 | ||||
-rw-r--r-- | objomsr.c | 2 | ||||
-rw-r--r-- | omfile.c | 28 | ||||
-rw-r--r-- | omfwd.c | 8 | ||||
-rw-r--r-- | parse.c | 50 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 9 | ||||
-rw-r--r-- | queue.c | 1579 | ||||
-rw-r--r-- | queue.h | 40 | ||||
-rw-r--r-- | rsyslog.h | 11 | ||||
-rwxr-xr-x | srUtils.c | 4 | ||||
-rw-r--r-- | stream.c | 46 | ||||
-rwxr-xr-x | stringbuf.c | 36 | ||||
-rw-r--r-- | syslogd.c | 156 | ||||
-rw-r--r-- | tcpsyslog.c | 4 | ||||
-rw-r--r-- | template.c | 2 | ||||
-rw-r--r-- | threads.c | 9 | ||||
-rw-r--r-- | wti.c | 53 | ||||
-rw-r--r-- | wti.h | 9 | ||||
-rw-r--r-- | wtp.c | 312 | ||||
-rw-r--r-- | wtp.h | 31 |
33 files changed, 1012 insertions, 1689 deletions
diff --git a/Makefile.am b/Makefile.am index f15fe567..06602861 100644 --- a/Makefile.am +++ b/Makefile.am @@ -5,6 +5,8 @@ rfc3195d_SOURCES = rfc3195d.c rsyslog.h rsyslogd_SOURCES = \ syslogd.c \ syslogd.h \ + debug.c \ + debug.h \ glbl.h \ pidfile.c \ pidfile.h \ @@ -24,6 +26,10 @@ rsyslogd_SOURCES = \ threads.h \ stream.c \ stream.h \ + wtp.c \ + wtp.h \ + wti.c \ + wti.h \ queue.c \ queue.h \ sync.c \ @@ -84,7 +84,7 @@ rsRetVal actionConstruct(action_t **ppThis) finalize_it: *ppThis = pThis; - return iRet; + RETiRet; } @@ -97,7 +97,7 @@ static rsRetVal actionResume(action_t *pThis) assert(pThis != NULL); pThis->bSuspended = 0; - return iRet; + RETiRet; } @@ -121,7 +121,7 @@ rsRetVal actionSuspend(action_t *pThis) pThis->ttResumeRtry = time(NULL) + pThis->iResumeInterval; pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */ - return iRet; + RETiRet; } /* try to resume an action -- rgerhards, 2007-08-02 @@ -162,7 +162,7 @@ rsRetVal actionTryResume(action_t *pThis) dbgprintf("actionTryResume: iRet: %d, next retry (if applicable): %u [now %u]\n", iRet, (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); - return iRet; + RETiRet; } @@ -187,7 +187,7 @@ rsRetVal actionDbgPrint(action_t *pThis) printf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); printf("\n"); - return iRet; + RETiRet; } diff --git a/cfsysline.c b/cfsysline.c index 337d362a..7cbd5884 100644 --- a/cfsysline.c +++ b/cfsysline.c @@ -74,7 +74,7 @@ static rsRetVal doGetChar(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void * } finalize_it: - return iRet; + RETiRet; } @@ -92,7 +92,7 @@ static rsRetVal doCustomHdlr(uchar **pp, rsRetVal (*pSetHdlr)(uchar**, void*), v CHKiRet(pSetHdlr(pp, pVal)); finalize_it: - return iRet; + RETiRet; } @@ -128,7 +128,7 @@ static rsRetVal parseIntVal(uchar **pp, size_t *pVal) *pp = p; finalize_it: - return iRet; + RETiRet; } @@ -158,7 +158,7 @@ static rsRetVal doGetInt(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *p *pp = p; finalize_it: - return iRet; + RETiRet; } @@ -209,7 +209,7 @@ static rsRetVal doGetSize(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void * } finalize_it: - return iRet; + RETiRet; } @@ -270,7 +270,7 @@ static rsRetVal doFileCreateMode(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), *pp = p; finalize_it: - return iRet; + RETiRet; } @@ -347,7 +347,7 @@ static rsRetVal doGetGID(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *p skipWhiteSpace(pp); /* skip over any whitespace */ finalize_it: - return iRet; + RETiRet; } @@ -389,7 +389,7 @@ static rsRetVal doGetUID(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *p skipWhiteSpace(pp); /* skip over any whitespace */ finalize_it: - return iRet; + RETiRet; } @@ -420,7 +420,7 @@ static rsRetVal doBinaryOptionLine(uchar **pp, rsRetVal (*pSetHdlr)(void*, int), skipWhiteSpace(pp); /* skip over any whitespace */ finalize_it: - return iRet; + RETiRet; } @@ -481,7 +481,7 @@ finalize_it: rsCStrDestruct(pStrB); } - return iRet; + RETiRet; } @@ -514,7 +514,7 @@ static rsRetVal cslchConstruct(cslCmdHdlr_t **ppThis) finalize_it: *ppThis = pThis; - return iRet; + RETiRet; } /* destructor for linked list keys. As we do not use any dynamic memory, @@ -606,7 +606,7 @@ static rsRetVal cslchCallHdlr(cslCmdHdlr_t *pThis, uchar **ppConfLine) CHKiRet(pHdlr(ppConfLine, pThis->cslCmdHdlr, pThis->pData)); finalize_it: - return iRet; + RETiRet; } @@ -655,7 +655,7 @@ static rsRetVal cslcConstruct(cslCmd_t **ppThis, int bChainingPermitted) finalize_it: *ppThis = pThis; - return iRet; + RETiRet; } @@ -678,7 +678,7 @@ finalize_it: cslchDestruct(pCmdHdlr); } - return iRet; + RETiRet; } @@ -692,7 +692,7 @@ rsRetVal cfsyslineInit(void) CHKiRet(llInit(&llCmdList, cslcDestruct, cslcKeyDestruct, strcasecmp)); finalize_it: - return iRet; + RETiRet; } @@ -740,7 +740,7 @@ rsRetVal regCfSysLineHdlr(uchar *pCmdName, int bChainingPermitted, ecslCmdHdrlTy } finalize_it: - return iRet; + RETiRet; } @@ -774,7 +774,7 @@ DEFFUNC_llExecFunc(unregHdlrsHeadExec) } finalize_it: - return iRet; + RETiRet; } /* unregister and destroy cfSysLineHandlers for a specific owner. This method is * most importantly used before unloading a loadable module providing some handlers. @@ -790,7 +790,7 @@ rsRetVal unregCfSysLineHdlrs4Owner(void *pOwnerCookie) */ iRet = llExecFunc(&llCmdList, unregHdlrsHeadExec, pOwnerCookie); - return iRet; + RETiRet; } @@ -844,7 +844,7 @@ rsRetVal processCfSysLineCommand(uchar *pCmdName, uchar **p) iRet = iRetLL; finalize_it: - return iRet; + RETiRet; } @@ -289,11 +289,6 @@ dbgMutLog_t *dbgMutLogAddEntry(pthread_mutex_t *pmut, short mutexOp, dbgFuncDB_t pLog->pFuncDB = pFuncDB; DLL_Add(MutLog, pLog); -//RUNLOG_VAR("%p", pLog); -//RUNLOG_VAR("%p", dbgMutLogListRoot); -//RUNLOG_VAR("%p", dbgMutLogListLast); -//RUNLOG_VAR("%p", pLog->pNext); -//RUNLOG_VAR("%p", pLog->pPrev); return pLog; } @@ -520,7 +515,6 @@ int dbgCondTimedWait(pthread_cond_t *cond, pthread_mutex_t *pmut, const struct t dbgprintf("%s:%d:%s: mutex %p waiting on condition %p (with timeout)\n", pFuncDB->file, pFuncDB->line, pFuncDB->func, (void*)pmut, (void*)cond); ret = pthread_cond_timedwait(cond, pmut, abstime); -RUNLOG; dbgMutexLockLog(pmut, pFuncDB, ln); return ret; } diff --git a/doc/status.html b/doc/status.html index 7950cfdb..226a2eb2 100644 --- a/doc/status.html +++ b/doc/status.html @@ -13,8 +13,8 @@ rsyslog v3 compatibility document!</a></font></b><br> Documentation for 3.x is currently sparse. If you need assistance, please <a href="http://www.rsyslog.com/PNphpBB2.phtml">post in the rsyslog forums</a>!</p> -<p><b>stable:</b> 2.0.0 - <a href="http://www.rsyslog.com/Article155.phtml">change log</a> - -<a href="http://www.rsyslog.com/Downloads-index-req-getit-lid-70.phtml">download</a></p> +<p><b>stable:</b> 2.0.1 - <a href="http://www.rsyslog.com/Article165.phtml">change log</a> - +<a href="http://www.rsyslog.com/Downloads-index-req-getit-lid-73.phtml">download</a></p> <p> (<a href="version_naming.html">How are versions named?</a>)</p> <h2>Platforms</h2> <p>Thankfully, a number of folks have begin to build packages and help port diff --git a/iminternal.c b/iminternal.c index 86d5097c..fb70d062 100644 --- a/iminternal.c +++ b/iminternal.c @@ -53,7 +53,7 @@ static rsRetVal iminternalDestruct(iminternal_t *pThis) free(pThis); - return iRet; + RETiRet; } @@ -78,7 +78,7 @@ finalize_it: *ppThis = pThis; - return iRet; + RETiRet; } @@ -111,7 +111,7 @@ finalize_it: iminternalDestruct(pThis); } - return iRet; + RETiRet; } @@ -142,7 +142,7 @@ rsRetVal iminternalRemoveMsg(int *pPri, msg_t **ppMsg, int *pFlags) } finalize_it: - return iRet; + RETiRet; } /* tell the caller if we have any messages ready for processing. @@ -166,7 +166,7 @@ rsRetVal modInitIminternal(void) iRet = llInit(&llMsgs, iminternalDestruct, NULL, NULL); - return iRet; + RETiRet; } @@ -182,7 +182,7 @@ rsRetVal modExitIminternal(void) iRet = llDestroy(&llMsgs); - return iRet; + RETiRet; } /* diff --git a/linkedlist.c b/linkedlist.c index ff1320e5..9adf40c4 100644 --- a/linkedlist.c +++ b/linkedlist.c @@ -79,7 +79,7 @@ static rsRetVal llDestroyElt(linkedList_t *pList, llElt_t *pElt) free(pElt); pList->iNumElts--; /* one less */ - return iRet; + RETiRet; } @@ -106,7 +106,7 @@ rsRetVal llDestroy(linkedList_t *pThis) pThis->pRoot = NULL; pThis->pLast = NULL; - return iRet; + RETiRet; } /* llDestroyRootElt - destroy the root element but otherwise @@ -134,7 +134,7 @@ rsRetVal llDestroyRootElt(linkedList_t *pThis) CHKiRet(llDestroyElt(pThis, pPrev)); finalize_it: - return iRet; + RETiRet; } @@ -166,7 +166,7 @@ rsRetVal llGetNextElt(linkedList_t *pThis, linkedListCookie_t *ppElt, void **ppU *ppElt = pElt; - return iRet; + RETiRet; } @@ -204,7 +204,7 @@ static rsRetVal llEltConstruct(llElt_t **ppThis, void *pKey, void *pData) finalize_it: *ppThis = pThis; - return iRet; + RETiRet; } @@ -227,7 +227,7 @@ rsRetVal llAppend(linkedList_t *pThis, void *pKey, void *pData) pThis->pLast = pElt; finalize_it: - return iRet; + RETiRet; } @@ -267,7 +267,7 @@ static rsRetVal llUnlinkAndDelteElt(linkedList_t *pThis, llElt_t *pElt, llElt_t CHKiRet(llDestroyElt(pThis, pElt)); finalize_it: - return iRet; + RETiRet; } /* find a user element based on the provided key - this is the @@ -305,7 +305,7 @@ static rsRetVal llFindElt(linkedList_t *pThis, void *pKey, llElt_t **ppElt, llEl } else iRet = RS_RET_NOT_FOUND; - return iRet; + RETiRet; } @@ -323,7 +323,7 @@ rsRetVal llFind(linkedList_t *pThis, void *pKey, void **ppData) *ppData = pElt->pData; finalize_it: - return iRet; + RETiRet; } @@ -345,7 +345,7 @@ rsRetVal llFindAndDelete(linkedList_t *pThis, void *pKey) CHKiRet(llUnlinkAndDelteElt(pThis, pElt, pEltPrev)); finalize_it: - return iRet; + RETiRet; } @@ -360,7 +360,7 @@ rsRetVal llGetNumElts(linkedList_t *pThis, int *piCnt) *piCnt = pThis->iNumElts; - return iRet; + RETiRet; } @@ -404,7 +404,7 @@ rsRetVal llExecFunc(linkedList_t *pThis, rsRetVal (*pFunc)(void*, void*), void* iRet = iRetLL; finalize_it: - return iRet; + RETiRet; } diff --git a/module-template.h b/module-template.h index ecd6fe6d..c92ec25a 100644 --- a/module-template.h +++ b/module-template.h @@ -100,12 +100,13 @@ static rsRetVal createInstance(instanceData **ppData)\ #define CODESTARTcreateInstance \ if((pData = calloc(1, sizeof(instanceData))) == NULL) {\ *ppData = NULL;\ + ENDfunc \ return RS_RET_OUT_OF_MEMORY;\ } #define ENDcreateInstance \ *ppData = pData;\ - return iRet;\ + RETiRet;\ } /* freeInstance() @@ -129,7 +130,7 @@ static rsRetVal freeInstance(void* pModData)\ #define ENDfreeInstance \ if(pData != NULL)\ free(pData); /* we need to free this in any case */\ - return iRet;\ + RETiRet;\ } /* isCompatibleWithFeature() @@ -137,12 +138,13 @@ static rsRetVal freeInstance(void* pModData)\ #define BEGINisCompatibleWithFeature \ static rsRetVal isCompatibleWithFeature(syslogFeature __attribute__((unused)) eFeat)\ {\ - rsRetVal iRet = RS_RET_INCOMPATIBLE; + rsRetVal iRet = RS_RET_INCOMPATIBLE; \ + BEGINfunc #define CODESTARTisCompatibleWithFeature #define ENDisCompatibleWithFeature \ - return iRet;\ + RETiRet;\ } /* doAction() @@ -156,7 +158,7 @@ static rsRetVal doAction(uchar __attribute__((unused)) **ppString, unsigned __at /* ppString may be NULL if the output module requested no strings */ #define ENDdoAction \ - return iRet;\ + RETiRet;\ } @@ -174,7 +176,7 @@ static rsRetVal dbgPrintInstInfo(void *pModData)\ pData = (instanceData*) pModData; #define ENDdbgPrintInstInfo \ - return iRet;\ + RETiRet;\ } @@ -189,13 +191,14 @@ static rsRetVal dbgPrintInstInfo(void *pModData)\ static rsRetVal needUDPSocket(void *pModData)\ {\ rsRetVal iRet = RS_RET_FALSE;\ - instanceData *pData = NULL; + instanceData *pData = NULL; \ + BEGINfunc #define CODESTARTneedUDPSocket \ pData = (instanceData*) pModData; #define ENDneedUDPSocket \ - return iRet;\ + RETiRet;\ } @@ -245,7 +248,7 @@ finalize_it:\ } #define ENDparseSelectorAct \ - return iRet;\ + RETiRet;\ } @@ -267,7 +270,7 @@ static rsRetVal tryResume(instanceData __attribute__((unused)) *pData)\ assert(pData != NULL); #define ENDtryResume \ - return iRet;\ + RETiRet;\ } @@ -281,14 +284,16 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\ DEFiRet; #define CODESTARTqueryEtryPt \ - if((name == NULL) || (pEtryPoint == NULL))\ + if((name == NULL) || (pEtryPoint == NULL)) {\ + ENDfunc \ return RS_RET_PARAM_ERROR;\ + } \ *pEtryPoint = NULL; #define ENDqueryEtryPt \ if(iRet == RS_RET_OK)\ iRet = (*pEtryPoint == NULL) ? RS_RET_NOT_FOUND : RS_RET_OK;\ - return iRet;\ + RETiRet;\ } /* the following definition is the standard block for queryEtryPt for all types @@ -372,13 +377,15 @@ rsRetVal modInit##uniqName(int iIFVersRequested __attribute__((unused)), int *ip #define CODESTARTmodInit \ assert(pHostQueryEtryPt != NULL);\ - if((pQueryEtryPt == NULL) || (ipIFVersProvided == NULL))\ - return RS_RET_PARAM_ERROR; + if((pQueryEtryPt == NULL) || (ipIFVersProvided == NULL)) {\ + ENDfunc \ + return RS_RET_PARAM_ERROR; \ + } #define ENDmodInit \ finalize_it:\ *pQueryEtryPt = queryEtryPt;\ - return iRet;\ + RETiRet;\ } @@ -408,7 +415,7 @@ static rsRetVal modExit(void)\ #define CODESTARTmodExit #define ENDmodExit \ - return iRet;\ + RETiRet;\ } @@ -423,10 +430,11 @@ static rsRetVal runInput(thrdInfo_t __attribute__((unused)) *pThrd)\ {\ DEFiRet; -#define CODESTARTrunInput +#define CODESTARTrunInput \ + dbgSetThrdName((uchar*)__FILE__); /* we need to provide something better later */ #define ENDrunInput \ - return iRet;\ + RETiRet;\ } @@ -446,7 +454,7 @@ static rsRetVal willRun(void)\ #define CODESTARTwillRun #define ENDwillRun \ - return iRet;\ + RETiRet;\ } @@ -465,7 +473,7 @@ static rsRetVal afterRun(void)\ #define CODESTARTafterRun #define ENDafterRun \ - return iRet;\ + RETiRet;\ } @@ -1,6 +1,6 @@ /* modules.c * This is the implementation of syslogd modules object. - * This object handles plug-ins and buil-in modules of all kind. + * This object handles plug-ins and build-in modules of all kind. * * File begun on 2007-07-22 by RGerhards * @@ -99,7 +99,7 @@ rsRetVal queryHostEtryPt(uchar *name, rsRetVal (**pEtryPoint)()) if(iRet == RS_RET_OK) iRet = (*pEtryPoint == NULL) ? RS_RET_NOT_FOUND : RS_RET_OK; - return iRet; + RETiRet; } @@ -204,7 +204,7 @@ static rsRetVal modPrepareUnload(modInfo_t *pThis) /* END DEVEL */ finalize_it: - return iRet; + RETiRet; } @@ -288,7 +288,7 @@ finalize_it: moduleDestruct(pNew); } - return iRet; + RETiRet; } /* Print loaded modules. This is more or less a @@ -348,7 +348,7 @@ rsRetVal modUnloadAndDestructAll(void) moduleDestruct(pModPrev); } - return iRet; + RETiRet; } @@ -374,7 +374,7 @@ rsRetVal modUnloadAndDestructDynamic(void) } } - return iRet; + RETiRet; } /* vi:set ai: */ @@ -225,7 +225,7 @@ rsRetVal MsgConstruct(msg_t **ppThis) *ppThis = pM; finalize_it: - return iRet; + RETiRet; } @@ -422,7 +422,7 @@ static rsRetVal MsgSerialize(msg_t *pThis, strm_t *pStrm) CHKiRet(objEndSerialize(pStrm)); finalize_it: - return iRet; + RETiRet; } @@ -457,7 +457,7 @@ msg_t *MsgAddRef(msg_t *pM) static rsRetVal aquirePROCIDFromTAG(msg_t *pM) { register int i; - int iRet; + DEFiRet; assert(pM != NULL); if(pM->pCSPROCID != NULL) @@ -480,8 +480,9 @@ static rsRetVal aquirePROCIDFromTAG(msg_t *pM) return RS_RET_OBJ_CREATION_FAILED; /* best we can do... */ rsCStrSetAllocIncrement(pM->pCSPROCID, 16); while((i < pM->iLenTAG) && (pM->pszTAG[i] != ']')) { - if((iRet = rsCStrAppendChar(pM->pCSPROCID, pM->pszTAG[i])) != RS_RET_OK) - return iRet; + if((iRet = rsCStrAppendChar(pM->pCSPROCID, pM->pszTAG[i])) != RS_RET_OK) { + RETiRet; + } ++i; } @@ -497,10 +498,11 @@ static rsRetVal aquirePROCIDFromTAG(msg_t *pM) } /* OK, finaally we could obtain a PROCID. So let's use it ;) */ - if((iRet = rsCStrFinish(pM->pCSPROCID)) != RS_RET_OK) - return iRet; + if((iRet = rsCStrFinish(pM->pCSPROCID)) != RS_RET_OK) { + RETiRet; + } - return RS_RET_OK; + RETiRet; } @@ -523,7 +525,7 @@ static rsRetVal aquirePROCIDFromTAG(msg_t *pM) static rsRetVal aquireProgramName(msg_t *pM) { register int i; - int iRet; + DEFiRet; assert(pM != NULL); if(pM->pCSProgName == NULL) { @@ -538,13 +540,15 @@ static rsRetVal aquireProgramName(msg_t *pM) && (pM->pszTAG[i] != '\0') && (pM->pszTAG[i] != ':') && (pM->pszTAG[i] != '[') && (pM->pszTAG[i] != '/') ; ++i) { - if((iRet = rsCStrAppendChar(pM->pCSProgName, pM->pszTAG[i])) != RS_RET_OK) - return iRet; + if((iRet = rsCStrAppendChar(pM->pCSProgName, pM->pszTAG[i])) != RS_RET_OK) { + RETiRet; + } + } + if((iRet = rsCStrFinish(pM->pCSProgName)) != RS_RET_OK) { + RETiRet; } - if((iRet = rsCStrFinish(pM->pCSProgName)) != RS_RET_OK) - return iRet; } - return RS_RET_OK; + RETiRet; } @@ -924,7 +928,7 @@ rsRetVal MsgSetAPPNAME(msg_t *pMsg, char* pszAPPNAME) /* if we reach this point, we have the object */ iRet = rsCStrSetSzStr(pMsg->pCSAPPNAME, (uchar*) pszAPPNAME); - return iRet; + RETiRet; } @@ -2128,7 +2132,7 @@ rsRetVal MsgSetProperty(msg_t *pThis, property_t *pProp) memcpy(&pThis->tTIMESTAMP, &pProp->val.vSyslogTime, sizeof(struct syslogTime)); } - return iRet; + RETiRet; } #undef isProp @@ -214,7 +214,7 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS */ logerrorInt("Internal error caused AllowedSender to be ignored, AF = %d", iAllow->addr.NetAddr->sa_family); - return RS_RET_ERR; + ABORT_FINALIZE(RS_RET_ERR); } /* OK, entry constructed, now lets add it to the ACL list */ iRet = AddAllowedSenderEntry(ppRoot, ppLast, iAllow, iSignificantBits); @@ -222,7 +222,7 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS /* we need to process a hostname ACL */ if (DisableDNS) { logerror ("Ignoring hostname based ACLs because DNS is disabled."); - return RS_RET_OK; + ABORT_FINALIZE(RS_RET_OK); } if (!strchr (iAllow->addr.HostWildcard, '*') && @@ -246,10 +246,11 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS if (ACLAddHostnameOnFail) { logerrorSz("Adding hostname \"%s\" to ACL as a wildcard entry.", iAllow->addr.HostWildcard); - return AddAllowedSenderEntry(ppRoot, ppLast, iAllow, iSignificantBits); + iRet = AddAllowedSenderEntry(ppRoot, ppLast, iAllow, iSignificantBits); + FINALIZE; } else { logerrorSz("Hostname \"%s\" WON\'T be added to ACL.", iAllow->addr.HostWildcard); - return RS_RET_NOENTRY; + ABORT_FINALIZE(RS_RET_NOENTRY); } } @@ -260,13 +261,13 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS allowIP.flags = 0; if((allowIP.addr.NetAddr = malloc(res->ai_addrlen)) == NULL) { glblHadMemShortage = 1; - return RS_RET_OUT_OF_MEMORY; + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } memcpy(allowIP.addr.NetAddr, res->ai_addr, res->ai_addrlen); if((iRet = AddAllowedSenderEntry(ppRoot, ppLast, &allowIP, iSignificantBits)) != RS_RET_OK) - return(iRet); + FINALIZE; break; case AF_INET6: /* IPv6 - but need to check if it is a v6-mapped IPv4 */ if(IN6_IS_ADDR_V4MAPPED (&SIN6(res->ai_addr)->sin6_addr)) { @@ -277,7 +278,7 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS if((allowIP.addr.NetAddr = malloc(sizeof(struct sockaddr_in))) == NULL) { glblHadMemShortage = 1; - return RS_RET_OUT_OF_MEMORY; + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } SIN(allowIP.addr.NetAddr)->sin_family = AF_INET; #ifdef HAVE_STRUCT_SOCKADDR_SA_LEN @@ -291,7 +292,7 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS if((iRet = AddAllowedSenderEntry(ppRoot, ppLast, &allowIP, iSignificantBits)) != RS_RET_OK) - return(iRet); + FINALIZE; } else { /* finally add IPv6 */ @@ -299,14 +300,14 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS allowIP.flags = 0; if((allowIP.addr.NetAddr = malloc(res->ai_addrlen)) == NULL) { glblHadMemShortage = 1; - return RS_RET_OUT_OF_MEMORY; + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } memcpy(allowIP.addr.NetAddr, res->ai_addr, res->ai_addrlen); if((iRet = AddAllowedSenderEntry(ppRoot, ppLast, &allowIP, iSignificantBits)) != RS_RET_OK) - return(iRet); + FINALIZE; } break; } @@ -321,7 +322,8 @@ static rsRetVal AddAllowedSender(struct AllowedSenders **ppRoot, struct AllowedS } } - return iRet; +finalize_it: + RETiRet; } @@ -700,7 +702,7 @@ rsRetVal gethname(struct sockaddr_storage *f, uchar *pszHostFQDN) } finalize_it: - return iRet; + RETiRet; } @@ -833,7 +835,7 @@ rsRetVal cvthname(struct sockaddr_storage *f, uchar *pszHost, uchar *pszHostFQDN } finalize_it: - return iRet; + RETiRet; } diff --git a/obj-types.h b/obj-types.h index 9d430b72..7eff503d 100644 --- a/obj-types.h +++ b/obj-types.h @@ -58,9 +58,11 @@ typedef enum { /* IDs of known object "types/classes" */ OBJNull = 0, /* no valid object (we do not start at zero so we can detect calloc()) */ OBJMsg = 1, OBJstrm = 2, - OBJqueue = 3 /* remeber to UPDATE OBJ_NUM_IDS (below) if you add one! */ + OBJwtp = 3, + OBJwti = 4, + OBJqueue = 5 /* remeber to UPDATE OBJ_NUM_IDS (below) if you add one! */ } objID_t; -#define OBJ_NUM_IDS 4 +#define OBJ_NUM_IDS 6 typedef enum { /* IDs of base methods supported by all objects - used for jump table, so * they must start at zero and be incremented. -- rgerahrds, 2008-01-04 @@ -114,6 +116,31 @@ typedef struct obj { /* the dummy struct that each derived class can be casted t # define ISOBJ_assert(pObj) #endif +#define DEFpropSetMethPTR(obj, prop, dataType)\ + rsRetVal obj##Set##prop(obj##_t *pThis, dataType *pVal)\ + { \ + /* DEV debug: dbgprintf("%sSet%s()\n", #obj, #prop); */\ + pThis->prop = pVal; \ + return RS_RET_OK; \ + } +#define PROTOTYPEpropSetMethPTR(obj, prop, dataType)\ + rsRetVal obj##Set##prop(obj##_t *pThis, dataType*) +#define DEFpropSetMeth(obj, prop, dataType)\ + rsRetVal obj##Set##prop(obj##_t *pThis, dataType pVal)\ + { \ + /* DEV debug: dbgprintf("%sSet%s()\n", #obj, #prop); */\ + pThis->prop = pVal; \ + return RS_RET_OK; \ + } +#define DEFpropSetMethFP(obj, prop, dataType)\ + rsRetVal obj##Set##prop(obj##_t *pThis, dataType)\ + { \ + /* DEV debug: dbgprintf("%sSet%s()\n", #obj, #prop); */\ + pThis->prop = pVal; \ + return RS_RET_OK; \ + } +#define PROTOTYPEpropSetMethFP(obj, prop, dataType)\ + rsRetVal obj##Set##prop(obj##_t *pThis, dataType) #define DEFpropSetMeth(obj, prop, dataType)\ rsRetVal obj##Set##prop(obj##_t *pThis, dataType pVal)\ { \ @@ -135,7 +162,7 @@ rsRetVal objName##ClassInit(void) \ #define ENDObjClassInit(objName) \ objRegisterObj(OBJ##objName, pObjInfoOBJ); \ finalize_it: \ - return iRet; \ + RETiRet; \ } /* this defines both the constructor and initializer @@ -148,7 +175,7 @@ finalize_it: \ #define ENDobjConstruct(obj) \ /* use finalize_it: before calling the macro (if you need it)! */ \ - return iRet; \ + RETiRet; \ } \ rsRetVal obj##Construct(obj##_t **ppThis) \ { \ @@ -166,8 +193,9 @@ finalize_it: \ \ finalize_it: \ OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP \ - return iRet; \ + RETiRet; \ } + #endif /* #ifndef OBJ_TYPES_H_INCLUDED */ @@ -100,7 +100,7 @@ rsRetVal objInfoConstruct(objInfo_t **ppThis, objID_t objID, uchar *pszName, int *ppThis = pThis; finalize_it: - return iRet; + RETiRet; } @@ -152,7 +152,7 @@ static rsRetVal objSerializeHeader(strm_t *pStrm, obj_t *pObj, uchar *pszRecType CHKiRet(strmWriteChar(pStrm, '\n')); finalize_it: - return iRet; + RETiRet; } @@ -171,7 +171,7 @@ dbgprintf("objBeginSerialize obj type: %x\n", objGetObjID(pStrm)); CHKiRet(objSerializeHeader(pStrm, pObj, (uchar*) "Obj")); finalize_it: - return iRet; + RETiRet; } @@ -195,7 +195,7 @@ rsRetVal objBeginSerializePropBag(strm_t *pStrm, obj_t *pObj) CHKiRet(objSerializeHeader(pStrm, pObj, (uchar*) "OPB")); finalize_it: - return iRet; + RETiRet; } @@ -286,7 +286,7 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t prop CHKiRet(strmWriteChar(pStrm, '\n')); finalize_it: - return iRet; + RETiRet; } @@ -307,7 +307,7 @@ rsRetVal objEndSerialize(strm_t *pStrm) CHKiRet(strmRecordEnd(pStrm)); finalize_it: - return iRet; + RETiRet; } @@ -334,7 +334,7 @@ static rsRetVal objDeserializeLong(long *pInt, strm_t *pStrm) *pInt = i; finalize_it: - return iRet; + RETiRet; } @@ -368,7 +368,7 @@ finalize_it: if(iRet != RS_RET_OK && pCStr != NULL) rsCStrDestruct(pCStr); - return iRet; + RETiRet; } @@ -400,7 +400,7 @@ static rsRetVal objDeserializeSyslogTime(syslogTime_t *pTime, strm_t *pStrm) GETVAL(OffsetMinute); finalize_it: - return iRet; + RETiRet; } #undef GETVAL @@ -443,7 +443,7 @@ static rsRetVal objDeserializeHeader(uchar *pszRecType, objID_t *poID, int* poVe *poVers = oVers; finalize_it: - return iRet; + RETiRet; } @@ -515,7 +515,7 @@ static rsRetVal objDeserializeProperty(property_t *pProp, strm_t *pStrm) if(c != '\n') ABORT_FINALIZE(RS_RET_INVALID_PROPFRAME); finalize_it: - return iRet; + RETiRet; } @@ -538,7 +538,7 @@ static rsRetVal objDeserializeTrailer(strm_t *pStrm) NEXTC; if(c != '\n') ABORT_FINALIZE(RS_RET_INVALID_TRAILER); finalize_it: - return iRet; + RETiRet; } @@ -578,7 +578,7 @@ static rsRetVal objDeserializeTryRecover(strm_t *pStrm) finalize_it: dbgprintf("deserializer has possibly been able to re-sync and recover, state %d\n", iRet); - return iRet; + RETiRet; } @@ -608,7 +608,7 @@ static rsRetVal objDeserializeProperties(obj_t *pObj, objID_t oID, strm_t *pStrm CHKiRet(objDeserializeTrailer(pStrm)); /* do trailer checks */ finalize_it: - return iRet; + RETiRet; } @@ -668,7 +668,7 @@ finalize_it: if(iRet != RS_RET_OK && pObj != NULL) free(pObj); // TODO: check if we can call destructor 2008-01-13 rger - return iRet; + RETiRet; } @@ -711,7 +711,7 @@ dbgprintf("objDese...AsPropBag 2\n"); CHKiRet(objDeserializeProperties(pObj, oID, pStrm)); finalize_it: - return iRet; + RETiRet; } @@ -756,7 +756,7 @@ rsRetVal objDeserializePropBag(obj_t *pObj, strm_t *pStrm) CHKiRet(objDeserializeProperties(pObj, oID, pStrm)); finalize_it: - return iRet; + RETiRet; } #undef NEXTC /* undef helper macro */ @@ -780,7 +780,7 @@ rsRetVal objRegisterObj(objID_t oID, objInfo_t *pInfo) arrObjInfo[oID] = pInfo; finalize_it: - return iRet; + RETiRet; } @@ -85,20 +85,6 @@ #define OBJSetMethodHandler(methodID, pHdlr) \ CHKiRet(objInfoSetMethod(pObjInfoOBJ, methodID, (rsRetVal (*)(void*)) pHdlr)) -/* debug aides */ -#if 0 -#define d_pthread_mutex_lock(x) {dbgprintf("mutex %p lock %s, %s(), line %d\n", (void*)x, __FILE__, __func__, __LINE__); \ - pthread_mutex_lock(x); \ - if(1)dbgprintf("mutex %p lock aquired %s, %s(), line %d\n",(void*)x, __FILE__, __func__, __LINE__); \ - } -#define d_pthread_mutex_unlock(x) {dbgprintf("mutex %p UNlock %s, %s(), line %d\n", (void*)x ,__FILE__, __func__, __LINE__);\ - pthread_mutex_unlock(x); \ - if(1)dbgprintf("mutex %p UNlock done %s, %s(), line %d\n", (void*)x, __FILE__, __func__, __LINE__); \ - } -#else -#define d_pthread_mutex_lock(x) pthread_mutex_lock(x) -#define d_pthread_mutex_unlock(x) pthread_mutex_unlock(x) -#endif /* prototypes */ rsRetVal objInfoConstruct(objInfo_t **ppThis, objID_t objID, uchar *pszName, int iObjVers, rsRetVal (*pConstruct)(void *), rsRetVal (*pDestruct)(void *)); rsRetVal objInfoSetMethod(objInfo_t *pThis, objMethod_t objMethod, rsRetVal (*pHandler)(void*)); @@ -92,7 +92,7 @@ rsRetVal OMSRconstruct(omodStringRequest_t **ppThis, int iNumEntries) abort_it: *ppThis = pThis; - return iRet; + RETiRet; } /* set a template name and option to the object. Index must be given. The pTplName must be @@ -172,7 +172,7 @@ rsRetVal setDynaFileCacheSize(void __attribute__((unused)) *pVal, int iNewVal) iDynaFileCacheSize = iNewVal; dbgprintf("DynaFileCacheSize changed to %d.\n", iNewVal); - return iRet; + RETiRet; } @@ -218,7 +218,7 @@ static rsRetVal cflineParseOutchannel(instanceData *pData, uchar* p, omodStringR "outchannel '%s' not found - ignoring action line", szBuf); logerror(errMsg); - return RS_RET_NOT_FOUND; + ABORT_FINALIZE(RS_RET_NOT_FOUND); } /* check if there is a file name in the outchannel... */ @@ -229,7 +229,7 @@ static rsRetVal cflineParseOutchannel(instanceData *pData, uchar* p, omodStringR "outchannel '%s' has no file name template - ignoring action line", szBuf); logerror(errMsg); - return RS_RET_ERR; + ABORT_FINALIZE(RS_RET_ERR); } /* OK, we finally got a correct template. So let's use it... */ @@ -242,7 +242,8 @@ static rsRetVal cflineParseOutchannel(instanceData *pData, uchar* p, omodStringR iRet = cflineParseTemplateName(&p, pOMSR, iEntry, iTplOpts, (uchar*) " TradFmt"); - return(iRet); +finalize_it: + RETiRet; } @@ -516,7 +517,7 @@ static rsRetVal writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pDa */ if(pData->bDynamicName) { if(prepareDynFile(pData, ppString[1], iMsgOpts) != 0) - return RS_RET_ERR; + ABORT_FINALIZE(RS_RET_ERR); } /* create the message based on format specified */ @@ -541,7 +542,7 @@ again: pData->f_fname, (long long) pData->f_sizeLimit, (long long) actualFileSize); errno = 0; logerror(errMsg); - return RS_RET_DISABLE_ACTION; + ABORT_FINALIZE(RS_RET_DISABLE_ACTION); } else { snprintf(errMsg, sizeof(errMsg), "file %s had grown beyond configured file size of %lld bytes, actual size was %lld - configured command resolved situation", @@ -558,14 +559,14 @@ again: /* If a named pipe is full, just ignore it for now - mrn 24 May 96 */ if (pData->fileType == eTypePIPE && e == EAGAIN) - return RS_RET_OK; + ABORT_FINALIZE(RS_RET_OK); /* If the filesystem is filled up, just ignore * it for now and continue writing when possible * based on patch for sysklogd by Martin Schulze on 2007-05-24 */ if (pData->fileType == eTypeFILE && e == ENOSPC) - return RS_RET_OK; + ABORT_FINALIZE(RS_RET_OK); (void) close(pData->fd); /* @@ -593,7 +594,9 @@ again: } } else if (pData->bSyncFile) fsync(pData->fd); - return(iRet); + +finalize_it: + RETiRet; } @@ -638,12 +641,15 @@ CODESTARTparseSelectorAct * the code further changes. -- rgerhards, 2007-07-25 */ if(*p == '$' || *p == '?' || *p == '|' || *p == '/' || *p == '-') { - if((iRet = createInstance(&pData)) != RS_RET_OK) - return iRet; + if((iRet = createInstance(&pData)) != RS_RET_OK) { + ENDfunc + return iRet; /* this can not use RET_iRet! */ + } } else { /* this is not clean, but we need it for the time being * TODO: remove when cleaning up modularization */ + ENDfunc return RS_RET_CONFLINE_UNPROCESSED; } @@ -201,7 +201,7 @@ static rsRetVal UDPSend(instanceData *pData, char *msg, size_t len) } } - return iRet; + RETiRet; } /* CODE FOR SENDING TCP MESSAGES */ @@ -241,7 +241,7 @@ static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len) /* TODO: we need to revisit this code -- rgerhards, 2007-12-28 */ } - return iRet; + RETiRet; } @@ -274,7 +274,7 @@ static rsRetVal TCPSendInit(void *pvData) iRet = RS_RET_TCP_SOCKCREATE_ERR; } - return iRet; + RETiRet; } @@ -322,7 +322,7 @@ static rsRetVal doTryResume(instanceData *pData) break; } - return iRet; + RETiRet; } @@ -179,7 +179,7 @@ rsRetVal parsInt(rsParsObj *pThis, int* pInt) rsRetVal parsSkipAfterChar(rsParsObj *pThis, char c) { register unsigned char *pC; - rsRetVal iRet; + DEFiRet; rsCHECKVALIDOBJECT(pThis, OIDrsPars); @@ -203,7 +203,7 @@ rsRetVal parsSkipAfterChar(rsParsObj *pThis, char c) iRet = RS_RET_NOT_FOUND; } - return iRet; + RETiRet; } /* Skip whitespace. Often used to trim parsable entries. @@ -243,12 +243,12 @@ rsRetVal parsDelimCStr(rsParsObj *pThis, rsCStrObj **ppCStr, char cDelim, int bT { register unsigned char *pC; rsCStrObj *pCStr; - rsRetVal iRet; + DEFiRet; rsCHECKVALIDOBJECT(pThis, OIDrsPars); if((pCStr = rsCStrConstruct()) == NULL) - return RS_RET_OUT_OF_MEMORY; + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); if(bTrimLeading) parsSkipWhitespace(pThis); @@ -274,20 +274,21 @@ rsRetVal parsDelimCStr(rsParsObj *pThis, rsCStrObj **ppCStr, char cDelim, int bT */ if((iRet = rsCStrFinish(pCStr)) != RS_RET_OK) { rsCStrDestruct (pCStr); - return(iRet); + FINALIZE; } if(bTrimTrailing) { if((iRet = rsCStrTrimTrailingWhiteSpace(pCStr)) != RS_RET_OK) { rsCStrDestruct (pCStr); - return iRet; + FINALIZE; } } /* done! */ *ppCStr = pCStr; - return RS_RET_OK; +finalize_it: + RETiRet; } /* Parse a quoted string ("-some-data") from the given position. @@ -309,17 +310,17 @@ rsRetVal parsQuotedCStr(rsParsObj *pThis, rsCStrObj **ppCStr) { register unsigned char *pC; rsCStrObj *pCStr; - rsRetVal iRet; + DEFiRet; rsCHECKVALIDOBJECT(pThis, OIDrsPars); if((iRet = parsSkipAfterChar(pThis, '"')) != RS_RET_OK) - return iRet; + FINALIZE; pC = rsCStrGetBufBeg(pThis->pCStr) + pThis->iCurrPos; /* OK, we most probably can obtain a value... */ if((pCStr = rsCStrConstruct()) == NULL) - return RS_RET_OUT_OF_MEMORY; + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); while(pThis->iCurrPos < rsCStrLen(pThis->pCStr)) { if(*pC == '"') { @@ -334,13 +335,13 @@ rsRetVal parsQuotedCStr(rsParsObj *pThis, rsCStrObj **ppCStr) */ if((iRet = rsCStrAppendChar(pCStr, *pC)) != RS_RET_OK) { rsCStrDestruct (pCStr); - return(iRet); + FINALIZE; } } } else { /* regular character */ if((iRet = rsCStrAppendChar(pCStr, *pC)) != RS_RET_OK) { rsCStrDestruct (pCStr); - return(iRet); + FINALIZE; } } ++pThis->iCurrPos; @@ -352,18 +353,19 @@ rsRetVal parsQuotedCStr(rsParsObj *pThis, rsCStrObj **ppCStr) } else { /* error - improperly quoted string! */ rsCStrDestruct (pCStr); - return RS_RET_MISSING_TRAIL_QUOTE; + ABORT_FINALIZE(RS_RET_MISSING_TRAIL_QUOTE); } /* We got the string, let's finish it... */ if((iRet = rsCStrFinish(pCStr)) != RS_RET_OK) { rsCStrDestruct (pCStr); - return(iRet); + FINALIZE; } /* done! */ *ppCStr = pCStr; - return RS_RET_OK; +finalize_it: + RETiRet; } /* @@ -382,7 +384,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits) uchar *pszTmp; struct addrinfo hints, *res = NULL; rsCStrObj *pCStr; - rsRetVal iRet; + DEFiRet; rsCHECKVALIDOBJECT(pThis, OIDrsPars); assert(pIP != NULL); @@ -401,7 +403,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits) && *pC != '/' && *pC != ',' && !isspace((int)*pC)) { if((iRet = rsCStrAppendChar(pCStr, *pC)) != RS_RET_OK) { rsCStrDestruct (pCStr); - return(iRet); + FINALIZE; } ++pThis->iCurrPos; ++pC; @@ -410,7 +412,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits) /* We got the string, let's finish it... */ if((iRet = rsCStrFinish(pCStr)) != RS_RET_OK) { rsCStrDestruct (pCStr); - return(iRet); + FINALIZE; } /* now we have the string and must check/convert it to @@ -424,7 +426,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits) pszTmp = (uchar*)strchr ((char*)pszIP, ']'); if (pszTmp == NULL) { free (pszIP); - return RS_RET_INVALID_IP; + ABORT_FINALIZE(RS_RET_INVALID_IP); } *pszTmp = '\0'; @@ -449,7 +451,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits) default: free (pszIP); free (*pIP); - return RS_RET_ERR; + ABORT_FINALIZE(RS_RET_ERR); } if(*pC == '/') { @@ -458,7 +460,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits) if((iRet = parsInt(pThis, pBits)) != RS_RET_OK) { free (pszIP); free (*pIP); - return(iRet); + FINALIZE; } /* we need to refresh pointer (changed by parsInt()) */ pC = rsCStrGetBufBeg(pThis->pCStr) + pThis->iCurrPos; @@ -488,7 +490,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits) default: free (pszIP); free (*pIP); - return RS_RET_ERR; + ABORT_FINALIZE(RS_RET_ERR); } if(*pC == '/') { @@ -497,7 +499,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits) if((iRet = parsInt(pThis, pBits)) != RS_RET_OK) { free (pszIP); free (*pIP); - return(iRet); + FINALIZE; } /* we need to refresh pointer (changed by parsInt()) */ pC = rsCStrGetBufBeg(pThis->pCStr) + pThis->iCurrPos; @@ -518,7 +520,7 @@ rsRetVal parsAddrWithBits(rsParsObj *pThis, struct NetAddr **pIP, int *pBits) iRet = RS_RET_OK; finalize_it: - return iRet; + RETiRet; } #endif /* #ifdef SYSLOG_INET */ diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 863f0d50..530bb2d4 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -170,12 +170,11 @@ static rsRetVal readSocket(int fd, int bParseHost) } else if (iRcvd < 0 && errno != EINTR) { char errStr[1024]; strerror_r(errno, errStr, sizeof(errStr)); - dbgprintf("UNIX socket error: %d = %s.\n", \ - errno, errStr); + dbgprintf("UNIX socket error: %d = %s.\n", errno, errStr); logerror("recvfrom UNIX"); } - return iRet; + RETiRet; } @@ -228,7 +227,7 @@ CODESTARTrunInput } } - return iRet; + RETiRet; ENDrunInput @@ -247,7 +246,7 @@ CODESTARTwillRun dbgprintf("Opened UNIX socket '%s' (fd %d).\n", funixn[i], funix[i]); } - return RS_RET_OK; + RETiRet; ENDwillRun @@ -52,491 +52,23 @@ #include "stringbuf.h" #include "srUtils.h" #include "obj.h" +#include "wtp.h" +#include "wti.h" /* static data */ DEFobjStaticHelpers -/* debug aides */ -#if 0 -#define d_pthread_mutex_lock(x) {dbgprintf("mutex %p lock %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ - pthread_mutex_lock(x); \ - if(1)dbgprintf("mutex %p lock aquired %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ - } -#define d_pthread_mutex_unlock(x) {dbgprintf("mutex %p UNlock %s, %s(), line %d\n", x ,__FILE__, __func__, __LINE__);\ - pthread_mutex_unlock(x); \ - if(1)dbgprintf("mutex %p UNlock done %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ - } -#else -#define d_pthread_mutex_lock(x) pthread_mutex_lock(x) -#define d_pthread_mutex_unlock(x) pthread_mutex_unlock(x) -#endif - - /* forward-definitions */ rsRetVal queueChkPersist(queue_t *pThis); -static void *queueWorker(void *arg); -static rsRetVal queueChkWrkThrdChanges(queue_t *pThis); static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly); +static int queueChkStopWrkrDA(queue_t *pThis); +static int queueIsIdleDA(queue_t *pThis); +static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave); +static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2); /* methods */ -/* cancellation cleanup handler - frees provided mutex - * rgerhards, 2008-01-14 - */ -static void queueMutexCleanup(void *arg) -{ - assert(arg != NULL); - d_pthread_mutex_unlock((pthread_mutex_t*) arg); -} - - -/* get the current worker state. For simplicity and speed, we have - * NOT used our regular calling interface this time. I hope that won't - * bite in the long term... -- rgerhards, 2008-01-17 - */ -static inline qWrkCmd_t -qWrkrGetState(qWrkThrd_t *pThis) -{ - assert(pThis != NULL); - return pThis->tCurrCmd; -} - - -/* indicate worker thread startup - * (it would be best if we could do this with an atomic operation) - * rgerhards, 2008-01-19 - */ -static void -queueWrkrThrdStartupIndication(queue_t *pThis) -{ - int iCancelStateSave; - - ISOBJ_TYPE_assert(pThis, queue); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - pThis->iCurNumWrkThrd++; - d_pthread_mutex_unlock(&pThis->mutThrdMgmt); - pthread_setcancelstate(iCancelStateSave, NULL); -} - - -/* indicate worker thread shutdown - * (it would be best if we could do this with an atomic operation) - * rgerhards, 2008-01-19 - */ -static void -queueWrkrThrdShutdownIndication(queue_t *pThis) -{ - int iCancelStateSave; - - ISOBJ_TYPE_assert(pThis, queue); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - pThis->iCurNumWrkThrd--; - pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - d_pthread_mutex_unlock(&pThis->mutThrdMgmt); - pthread_setcancelstate(iCancelStateSave, NULL); -} - - -/* send a command to a specific thread - */ -static rsRetVal -qWrkrSetState(qWrkThrd_t *pThis, qWrkCmd_t tCmd) -{ - DEFiRet; - - assert(pThis != NULL); - -dbgprintf("Queue 0x%lx: trying to send command %d to thread %d\n", queueGetID(pThis->pQueue), tCmd, pThis->iThrd); - if(pThis->tCurrCmd == eWRKTHRD_SHUTDOWN_IMMEDIATE && tCmd != eWRKTHRD_TERMINATING) - FINALIZE; - - dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis->pQueue), tCmd, pThis->iThrd); - - /* change some admin structures */ - switch(tCmd) { - case eWRKTHRD_TERMINATING: - pthread_cond_destroy(&pThis->condInitDone); - pthread_mutex_destroy(&pThis->mut); - dbgprintf("Queue 0x%lx/w%d: thread terminating with %d entries left in queue, %d workers running.\n", - queueGetID(pThis->pQueue), pThis->iThrd, pThis->pQueue->iQueueSize, - pThis->pQueue->iCurNumWrkThrd); - break; - case eWRKTHRD_RUN_CREATED: - pthread_cond_init(&pThis->condInitDone, NULL); - pthread_mutex_init(&pThis->mut, NULL); - break; - case eWRKTHRD_RUN_INIT: - break; - case eWRKTHRD_RUNNING: - pthread_cond_signal(&pThis->condInitDone); - break; - /* these cases just to satisfy the compiler, we do (yet) not act an them: */ - case eWRKTHRD_STOPPED: - case eWRKTHRD_SHUTDOWN: - case eWRKTHRD_SHUTDOWN_IMMEDIATE: - /* DO NOTHING */ - break; - } - - pThis->tCurrCmd = tCmd; - -finalize_it: - return iRet; -} - -/* send a command to a specific active thread. If the thread is not - * active, the command is not sent. - */ -static inline rsRetVal -queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, queue); - assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); - - if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_CREATED) { - qWrkrSetState(&pThis->pWrkThrds[iIdx], tCmd); - } else { - dbgprintf("Queue 0x%lx: command %d NOT sent to inactive thread %d\n", queueGetID(pThis), tCmd, iIdx); - } - - return iRet; -} - - -/* Finalize construction of a wWrkrThrd_t "object" - * rgerhards, 2008-01-17 - */ -static inline rsRetVal -qWrkrConstructFinalize(qWrkThrd_t *pThis, queue_t *pQueue, int i) -{ - assert(pThis != NULL); - ISOBJ_TYPE_assert(pQueue, queue); - - dbgprintf("Queue 0x%lx: finalizing construction of worker %d instance data\n", queueGetID(pQueue), i); - - /* initialize our thread instance descriptor */ - pThis = pQueue->pWrkThrds + i; - pThis->pQueue = pQueue; - pThis->iThrd = i; - pThis->pUsr = NULL; - - qWrkrSetState(pThis, eWRKTHRD_STOPPED); - - return RS_RET_OK; -} - - -/* Waits until the specified worker thread - * changed to full running state (aka have started up). This function - * MUST NOT be called while the queue mutex is locked as it does - * this itself. The wait is without timeout. - * rgerhards, 2008-01-17 - */ -static inline rsRetVal -qWrkrWaitStartup(qWrkThrd_t *pThis) -{ - int iCancelStateSave; - - assert(pThis != NULL); - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->pQueue->mut); - if((pThis->tCurrCmd == eWRKTHRD_RUN_CREATED) || (pThis->tCurrCmd == eWRKTHRD_RUN_CREATED)) { - dbgprintf("Queue 0x%lx: waiting on worker thread %d startup\n", queueGetID(pThis->pQueue), - pThis->iThrd); - pthread_cond_wait(&pThis->condInitDone, pThis->pQueue->mut); -dbgprintf("worker startup done!\n"); - } - d_pthread_mutex_unlock(pThis->pQueue->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - - return RS_RET_OK; -} - - -/* waits until all worker threads that a currently initializing are fully started up - * rgerhards, 2008-01-18 - */ -static rsRetVal -qWrkrWaitAllWrkrStartup(queue_t *pThis) -{ - DEFiRet; - int i; - - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - qWrkrWaitStartup(pThis->pWrkThrds + i); - } - - return iRet; -} - - -/* initialize the qWrkThrd_t structure - this MUST be called right after - * startup of a worker thread. -- rgerhards, 2008-01-17 - */ -static inline rsRetVal -qWrkrInit(qWrkThrd_t **ppThis, queue_t *pQueue) -{ - qWrkThrd_t *pThis; - int i; - - assert(ppThis != NULL); - ISOBJ_TYPE_assert(pQueue, queue); - - /* find myself in the queue's thread table */ - for(i = 0 ; i <= pQueue->iNumWorkerThreads ; ++i) - if(pQueue->pWrkThrds[i].thrdID == pthread_self()) - break; -dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pQueue, - (unsigned) pQueue->pWrkThrds[i].thrdID, i, (unsigned) pthread_self()); - assert(pQueue->pWrkThrds[i].thrdID == pthread_self()); - - /* initialize our thread instance descriptor */ - pThis = pQueue->pWrkThrds + i; - pThis->pQueue = pQueue; - pThis->iThrd = i; - pThis->pUsr = NULL; - - *ppThis = pThis; - qWrkrSetState(pThis, eWRKTHRD_RUN_INIT); - - return RS_RET_OK; -} - - -/* join a specific worker thread - */ -static inline rsRetVal -queueJoinWrkThrd(queue_t *pThis, int iIdx) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, queue); - assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); - assert(pThis->pWrkThrds[iIdx].tCurrCmd != eWRKTHRD_STOPPED); - - dbgprintf("Queue 0x%lx: thread %d state %d, waiting for exit\n", queueGetID(pThis), iIdx, - pThis->pWrkThrds[iIdx].tCurrCmd); - pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL); - qWrkrSetState(&pThis->pWrkThrds[iIdx], eWRKTHRD_STOPPED); /* back to virgin... */ - pThis->pWrkThrds[iIdx].thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ - dbgprintf("Queue 0x%lx: thread %d state %d, has stopped\n", queueGetID(pThis), iIdx, - pThis->pWrkThrds[iIdx].tCurrCmd); - - return iRet; -} - - -/* Starts a worker thread (on a specific index [i]!) - */ -static inline rsRetVal -queueStrtWrkThrd(queue_t *pThis, int i) -{ - DEFiRet; - int iState; - - ISOBJ_TYPE_assert(pThis, queue); - assert(i >= 0 && i <= pThis->iNumWorkerThreads); - assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRD_RUN_CREATED); - - qWrkrSetState(&pThis->pWrkThrds[i], eWRKTHRD_RUN_CREATED); - iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); - dbgprintf("Queue 0x%lx: starting Worker thread %x, index %d with state %d.\n", - (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); - - return iRet; -} - - -/* start the DA worker thread (if not already running) - */ -static inline rsRetVal -queueStrtDAWrkr(queue_t *pThis) -{ - DEFiRet; - int iCancelStateSave; - - ISOBJ_TYPE_assert(pThis, queue); - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->pWrkThrds[0].mut); - if(pThis->pWrkThrds[0].tCurrCmd == eWRKTHRD_STOPPED) { - iRet = queueStrtWrkThrd(pThis, 0); - } - d_pthread_mutex_unlock(&pThis->pWrkThrds[0].mut); - pthread_setcancelstate(iCancelStateSave, NULL); - - return iRet; -} - -/* Starts a *new* worker thread. Function searches itself for a free index spot. It must only - * be called when we have less than max workers active. Pending wrkr thread requests MUST have - * been processed before calling this function. -- rgerhards, 2008-01-16 - */ -static inline rsRetVal -queueStrtNewWrkThrd(queue_t *pThis) -{ - DEFiRet; - int i; - int iStartingUp; - int iState; - - ISOBJ_TYPE_assert(pThis, queue); - - /* find free spot in thread table. If we find at least one worker that is in initializiation, - * we do NOT start a new one. Let's give the other one a chance, first. - */ - iStartingUp = -1; - for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) { -dbgprintf("Queue %p: search thrd tbl slot: i %d, CuccCmd %d\n", pThis, i, pThis->pWrkThrds[i].tCurrCmd); - if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_STOPPED) { - break; - } else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_CREATED) { - iStartingUp = i; - break; - } - } - -dbgprintf("Queue %p: after thrd search: i %d, iStartingUp %d\n", pThis, i, iStartingUp); - if(iStartingUp > -1) - ABORT_FINALIZE(RS_RET_ALREADY_STARTING); - - assert(i <= pThis->iNumWorkerThreads); /* now there must be a free spot, else something is really wrong! */ - - qWrkrSetState(&pThis->pWrkThrds[i], eWRKTHRD_RUN_CREATED); - iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); - dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n", - (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); - /* we try to give the starting worker a little boost. It won't help much as we still - * hold the queue's mutex, but at least it has a chance to start on a single-CPU system. - */ - pthread_yield(); - -finalize_it: - return iRet; -} - - -/* send a command to all active worker threads. A start index can be - * given. Usually, this is 0 or 1. Thread 0 is reserved to disk-assisted - * mode and this start index take care of the special handling it needs to - * receive. -- rgerhards, 2008-01-16 - */ -static inline rsRetVal -queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd) -{ - DEFiRet; - int i; - - ISOBJ_TYPE_assert(pThis, queue); - assert(iStartIdx == 0 || iStartIdx == 1); - - /* tell the workers our request */ - for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i) - if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) - queueTellActWrkThrd(pThis, i, tCmd); - - return iRet; -} - - -/* compute an absolute time timeout suitable for calls to pthread_cond_timedwait() - * rgerhards, 2008-01-14 - */ -static rsRetVal -queueTimeoutComp(struct timespec *pt, int iTimeout) -{ - assert(pt != NULL); - /* compute timeout */ - clock_gettime(CLOCK_REALTIME, pt); - pt->tv_nsec += (iTimeout % 1000) * 1000000; /* think INTEGER arithmetic! */ - if(pt->tv_nsec > 999999999) { /* overrun? */ - pt->tv_nsec -= 1000000000; - ++pt->tv_sec; - } - pt->tv_sec += iTimeout / 1000; - return RS_RET_OK; /* so far, this is static... */ -} - - -/* wake up all worker threads. Param bWithDAWrk tells if the DA worker - * is to be awaken, too. It needs special handling because it waits on - * two different conditions depending on processing state. - * rgerhards, 2008-01-16 - */ -static inline rsRetVal -queueWakeupWrkThrds(queue_t *pThis, int bWithDAWrk) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, queue); - - pthread_cond_broadcast(&pThis->notEmpty); - if(bWithDAWrk && pThis->qRunsDA != QRUNS_REGULAR) { - /* if running disk-assisted, workers may wait on that condition, too */ - pthread_cond_broadcast(&pThis->condDA); - } - - return iRet; -} - - -/* This function checks if (another) worker threads needs to be started. It - * must be called while the caller holds a lock on the queue mutex. So it must not - * do anything that either reaquires the mutex or forces somebody else to aquire - * it (that would lead to a deadlock). - * rgerhards, 2008-01-16 - */ -static inline rsRetVal -queueChkAndStrtWrk(queue_t *pThis) -{ - DEFiRet; - int iCancelStateSave; - - ISOBJ_TYPE_assert(pThis, queue); - - /* process any pending thread requests */ - queueChkWrkThrdChanges(pThis); - - if(pThis->bEnqOnly == 1) - FINALIZE; /* in enqueue-only mode we have no workers */ - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt); - pthread_setcancelstate(iCancelStateSave, NULL); - /* check if we need to start up another worker */ - if(pThis->qRunsDA == QRUNS_REGULAR) { - if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) { -dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qRunsDA: %d\n", - pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA); - /* check if we satisfy the min nbr of messages per worker to start a new one */ - if(pThis->iCurNumWrkThrd == 0 || - pThis->iQueueSize / pThis->iCurNumWrkThrd > pThis->iMinMsgsPerWrkr) { - dbgprintf("Queue 0x%lx: high activity - starting additional worker thread.\n", - queueGetID(pThis)); - queueStrtNewWrkThrd(pThis); - } - } - } else { - if(pThis->iCurNumWrkThrd == 0 && pThis->bEnqOnly == 0) { -dbgprintf("Queue %p: DA worker is no longer running, restarting, qsize %d, workers %d, qRunsDA: %d\n", - pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA); - /* DA worker has timed out and needs to be restarted */ - iRet = queueStrtDAWrkr(pThis); - } - } - pthread_cleanup_pop(1); - -finalize_it: - return iRet; -} - /* --------------- code for disk-assisted (DA) queue modes -------------------- */ @@ -555,7 +87,7 @@ queueTurnOffDAMode(queue_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, queue); - assert(pThis->qRunsDA != QRUNS_REGULAR); + assert(pThis->bRunsDA); /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to @@ -572,62 +104,18 @@ queueTurnOffDAMode(queue_t *pThis) dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQueInDestr %d, NumWrkd %d\n", queueGetID(pThis), pThis->bEnqOnly,pThis->bQueueInDestruction,pThis->iCurNumWrkThrd); - // TODO: think about this code - there is a race - if(pThis->bEnqOnly == 0 && pThis->bQueueInDestruction == 0 && pThis->iCurNumWrkThrd < 2) - queueStrtNewWrkThrd(pThis); - pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */ + // TODO: mutex? + pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, * this will be quick. */ queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ - /* now free the remaining resources */ - pthread_mutex_destroy(&pThis->mutDA); - pthread_cond_destroy(&pThis->condDA); - - queueTellActWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */ dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n", queueGetID(pThis), iRet); - return iRet; -} - -/* check if we had any worker thread changes and, if so, act - * on them. At a minimum, terminated threads are harvested (joined). - * This function MUST NEVER block on the queue mutex! - */ -static rsRetVal -queueChkWrkThrdChanges(queue_t *pThis) -{ - DEFiRet; - int i; - - ISOBJ_TYPE_assert(pThis, queue); - - if(pThis->bThrdStateChanged == 0) - FINALIZE; - - /* go through all threads (including DA thread) */ - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - switch(pThis->pWrkThrds[i].tCurrCmd) { - case eWRKTHRD_TERMINATING: - queueJoinWrkThrd(pThis, i); - break; - /* these cases just to satisfy the compiler, we do not act an them: */ - case eWRKTHRD_STOPPED: - case eWRKTHRD_RUN_CREATED: - case eWRKTHRD_RUN_INIT: - case eWRKTHRD_RUNNING: - case eWRKTHRD_SHUTDOWN: - case eWRKTHRD_SHUTDOWN_IMMEDIATE: - /* DO NOTHING */ - break; - } - } - -finalize_it: - return iRet; + RETiRet; } @@ -644,6 +132,7 @@ queueChkIsDA(queue_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, queue); +RUNLOG_VAR("%s", pThis->pszFilePrefix); if(pThis->pszFilePrefix != NULL) { pThis->bIsDA = 1; dbgprintf("Queue 0x%lx: is disk-assisted, disk will be used on demand\n", queueGetID(pThis)); @@ -651,7 +140,7 @@ queueChkIsDA(queue_t *pThis) dbgprintf("Queue 0x%lx: is NOT disk-assisted\n", queueGetID(pThis)); } - return iRet; + RETiRet; } @@ -660,11 +149,11 @@ queueChkIsDA(queue_t *pThis) * chore of this function is to create the DA queue object. If that function fails, * the DA worker should return with an appropriate state, which in turn should lead to * a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this - * function is called, else a race on pThis->qRunsDA may happen. + * function is called, else a race on pThis->bRunsDA may happen. * rgerhards, 2008-01-15 */ static rsRetVal -queueStrtDA(queue_t *pThis) +queueStartDA(queue_t *pThis) { DEFiRet; @@ -682,30 +171,19 @@ dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pT /* as the created queue is the same object class, we take the * liberty to access its properties directly. */ - pThis->pqDA->condSignalOnEmpty = &pThis->condDA; - pThis->pqDA->mutSignalOnEmpty = &pThis->mutDA; - pThis->pqDA->condSignalOnEmpty2 = &pThis->notEmpty; - pThis->pqDA->bSignalOnEmpty = 2; pThis->pqDA->pqParent = pThis; -dbgprintf("Queue %p: queueSTrtDA after assign\n", pThis); CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); -dbgprintf("Queue %p: queueSTrtDA 10\n", pThis); CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly)); -dbgprintf("Queue %p: queueSTrtDA 15\n", pThis); CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); -dbgprintf("Queue %p: queueSTrtDA 20\n", pThis); CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); -dbgprintf("Queue %p: queueSTrtDA 25\n", pThis); if(pThis->toQShutdown == 0) { -dbgprintf("Queue %p: queueSTrtDA 30a\n", pThis); CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ } else { -dbgprintf("Queue %p: queueSTrtDA 30b\n", pThis); /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND * have an obviously large backlog, we can't finish it in any case. So there is no point * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15 @@ -713,30 +191,21 @@ dbgprintf("Queue %p: queueSTrtDA 30b\n", pThis); CHKiRet(queueSettoQShutdown(pThis->pqDA, 1)); } -dbgprintf("Queue %p: queueSTrtDA pre start\n", pThis); +dbgprintf("Queue %p: queueStartDA pre start\n", pThis); iRet = queueStart(pThis->pqDA); /* file not found is expected, that means it is no previous QIF available */ if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) FINALIZE; /* something is wrong */ - /* tell our fellow workers to shut down - * NOTE: we do NOT join them by intension! If we did, we would hold draining - * the queue until some potentially long-running actions are finished. Having - * the ability to immediatly drain the queue was the primary intension of - * reserving worker thread 0 for DA queues. So if we would join the other - * workers, we would screw up and do against our design goal. - */ - CHKiRet(queueTellActWrkThrds(pThis, 1, eWRKTHRD_SHUTDOWN_IMMEDIATE)); - /* as we are right now starting DA mode because we are so busy, it is - * extremely unlikely that any worker is sleeping on empty queue. HOWEVER, + * extremely unlikely that any regular worker is sleeping on empty queue. HOWEVER, * we want to be on the safe side, and so we awake anyone that is waiting * on one. So even if the scheduler plays badly with us, things should be * quite well. -- rgerhards, 2008-01-15 */ - queueWakeupWrkThrds(pThis, 0); /* awake all workers, but not ourselves ;) */ + wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */ - pThis->qRunsDA = QRUNS_DA; /* we are now in DA mode! */ + pThis->bRunsDA = 1; /* we are now in DA mode! */ dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n", queueGetID(pThis), queueGetID(pThis->pqDA)); @@ -751,31 +220,63 @@ finalize_it: pThis->bIsDA = 0; } - return iRet; + RETiRet; } /* initiate DA mode * param bEnqOnly tells if the disk queue is to be run in enqueue-only mode. This may * be needed during shutdown of memory queues which need to be persisted to disk. + * If this function fails (should not happen), DA mode is not turned on. * rgerhards, 2008-01-16 */ static inline rsRetVal -queueInitDA(queue_t *pThis, int bEnqOnly) +queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) { DEFiRet; + DEFVARS_mutexProtection; + uchar pszBuf[64]; + size_t lenBuf; + + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex); + /* check if we already have a DA worker pool. If not, initiate one. Please note that the + * pool is created on first need but never again destructed (until the queue is). This + * is intentional. We assume that when we need it once, we may also need it on another + * occasion. Ressources used are quite minimal when no worker is running. + * rgerhards, 2008-01-24 + */ + if(pThis->pWtpDA == NULL) { + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx/DA", (unsigned long) pThis); + CHKiRet(wtpConstruct (&pThis->pWtpDA)); + CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, queueChkStopWrkrDA)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, queueIsIdleDA)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, queueConsumerDA)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, queueConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, queueStartDA)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, queueTurnOffDAMode)); + CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); + CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); + CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); + CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis)); + CHKiRet(wtpConstructFinalize (pThis->pWtpDA)); + } + /* if we reach this point, we have a "good" DA worker pool */ /* indicate we now run in DA mode - this is reset by the DA worker if it fails */ - pThis->qRunsDA = QRUNS_DA_INIT; + pThis->bRunsDA = 1; pThis->bDAEnqOnly = bEnqOnly; - /* now we must start our DA worker thread - it does the rest of the initialization - * In enqueue-only mode, we do not start any workers. + /* now we must now adivse the wtp that we need one worker. If none is yet active, + * that will also start one up. If we forgot that step, everything would be stalled + * until the next enqueue request. */ if(pThis->bEnqOnly == 0) - iRet = queueStrtDAWrkr(pThis); + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* one worker only for disk queues! */ - return iRet; +finalize_it: + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + RETiRet; } @@ -783,7 +284,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly) * keep it running if we are already in it. * rgerhards, 2008-01-14 */ -static rsRetVal +static inline rsRetVal queueChkStrtDA(queue_t *pThis) { DEFiRet; @@ -794,35 +295,31 @@ queueChkStrtDA(queue_t *pThis) if(pThis->iQueueSize != pThis->iHighWtrMrk) ABORT_FINALIZE(RS_RET_OK); - if(pThis->qRunsDA != QRUNS_REGULAR) { +dbgprintf("Queue %p: chkStartDA\n", pThis); + if(pThis->bRunsDA) { /* then we need to signal that we are at the high water mark again. If that happens * on our way down the queue, that doesn't matter, because then nobody is waiting * on the condition variable. + * (Remember that a DA queue stops draining the queue once it has reached the low + * water mark and restarts it when the high water mark is reached again - this is + * what this code here is responsible for. Please note that all workers may have been + * terminated due to the inactivity timeout, thus we need to advise the pool that + * we need at least one). */ dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", queueGetID(pThis), pThis->iQueueSize); - //pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - // TODO: mutex call order check! this must aquire the queue mutex - //d_pthread_mutex_lock(&pThis->mutDA); - pthread_cond_signal(&pThis->condDA); - //d_pthread_mutex_unlock(&pThis->mutDA); - //pthread_setcancelstate(iCancelStateSave, NULL); - queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */ - } - - /* we need to re-check if we run disk-assisted, because that status may have changed - * in our high water mark processing. - */ - if(pThis->qRunsDA == QRUNS_REGULAR) { - /* if we reach this point, we are NOT currently running in DA mode. */ + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* run again [see comment above] ;) */ + } else { + /* this is the case when we are currently not running in DA mode. So it is time + * to turn it back on. + */ dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n", queueGetID(pThis), pThis->iQueueSize); - - queueInitDA(pThis, QUEUE_MODE_ENQDEQ); /* initiate DA mode */ + queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ } finalize_it: - return iRet; + RETiRet; } @@ -855,7 +352,7 @@ static rsRetVal qConstructFixedArray(queue_t *pThis) queueChkIsDA(pThis); finalize_it: - return iRet; + RETiRet; } @@ -868,7 +365,7 @@ static rsRetVal qDestructFixedArray(queue_t *pThis) if(pThis->tVars.farray.pBuf != NULL) free(pThis->tVars.farray.pBuf); - return iRet; + RETiRet; } static rsRetVal qAddFixedArray(queue_t *pThis, void* in) @@ -881,7 +378,7 @@ static rsRetVal qAddFixedArray(queue_t *pThis, void* in) if (pThis->tVars.farray.tail == pThis->iMaxQueueSize) pThis->tVars.farray.tail = 0; - return iRet; + RETiRet; } static rsRetVal qDelFixedArray(queue_t *pThis, void **out) @@ -895,7 +392,7 @@ static rsRetVal qDelFixedArray(queue_t *pThis, void **out) if (pThis->tVars.farray.head == pThis->iMaxQueueSize) pThis->tVars.farray.head = 0; - return iRet; + RETiRet; } @@ -911,7 +408,7 @@ static rsRetVal qConstructLinkedList(queue_t *pThis) queueChkIsDA(pThis); - return iRet; + RETiRet; } @@ -925,7 +422,7 @@ static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis) * dynamic left with the linked list. */ - return iRet; + RETiRet; } static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr) @@ -949,7 +446,7 @@ static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr) } finalize_it: - return iRet; + RETiRet; } static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) @@ -971,7 +468,7 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) } free(pEntry); - return iRet; + RETiRet; } @@ -986,7 +483,7 @@ queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); CHKiRet(strmSetDir(pStrm, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); finalize_it: - return iRet; + RETiRet; } @@ -1024,7 +521,7 @@ queueHaveQIF(queue_t *pThis) /* If we reach this point, we have a .qi file */ finalize_it: - return iRet; + RETiRet; } @@ -1092,7 +589,7 @@ finalize_it: queueGetID(pThis), iRet); } - return iRet; + RETiRet; } @@ -1150,7 +647,7 @@ CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize)); CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize)); finalize_it: - return iRet; + RETiRet; } @@ -1166,7 +663,7 @@ static rsRetVal qDestructDisk(queue_t *pThis) if(pThis->pszSpoolDir != NULL) free(pThis->pszSpoolDir); - return iRet; + RETiRet; } static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) @@ -1179,7 +676,7 @@ static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) CHKiRet(strmFlush(pThis->tVars.disk.pWrite)); finalize_it: - return iRet; + RETiRet; } static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) @@ -1216,7 +713,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr) * queue anything ;) */ - return iRet; + RETiRet; } static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) @@ -1228,6 +725,7 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__ /* --------------- end type-specific handlers -------------------- */ + /* generic code to add a queue entry */ static rsRetVal queueAdd(queue_t *pThis, void *pUsr) @@ -1242,7 +740,7 @@ queueAdd(queue_t *pThis, void *pUsr) dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", queueGetID(pThis), pThis->iQueueSize); finalize_it: - return iRet; + RETiRet; } @@ -1265,121 +763,31 @@ queueDel(queue_t *pThis, void *pUsr) dbgprintf("Queue 0x%lx: entry deleted, state %d, size now %d entries\n", queueGetID(pThis), iRet, pThis->iQueueSize); - return iRet; -} - - -/* Send a shutdown command to all workers and awake them. This function - * does NOT wait for them to terminate. Set bIncludeDAWRk to send the - * termination command to the DA worker, too (else this does not happen). - * rgerhards, 2008-01-16 - */ -static inline rsRetVal -queueWrkThrdReqTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int bIncludeDAWrk) -{ - DEFiRet; - - if(bIncludeDAWrk) { - queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ - queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ - } else { - queueTellActWrkThrds(pThis, 1, tShutdownCmd);/* first tell the workers our request */ - queueWakeupWrkThrds(pThis, 0); /* awake all workers but not DA-worker */ - } - - return iRet; -} - - -/* Send a shutdown command to all workers and see if they terminate. - * A timeout may be specified. - * rgerhards, 2008-01-14 - */ -static rsRetVal -queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) -{ - DEFiRet; - int bTimedOut; - struct timespec t; - int iCancelStateSave; - - queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ - queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ - /* race: must make sure all are running! */ - queueTimeoutComp(&t, iTimeout);/* get timeout */ - - /* and wait for their termination */ -dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt); - pthread_setcancelstate(iCancelStateSave, NULL); - bTimedOut = 0; - while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { - dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n", - queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd); - - if(pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutThrdMgmt, &t) != 0) { - dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", queueGetID(pThis)); - bTimedOut = 1; /* we exit the loop on timeout */ - } - } - pthread_cleanup_pop(1); - - if(bTimedOut) - iRet = RS_RET_TIMED_OUT; - - return iRet; + RETiRet; } -/* Unconditionally cancel all running worker threads. - * rgerhards, 2008-01-14 - */ -static rsRetVal -queueWrkThrdCancel(queue_t *pThis) -{ - DEFiRet; - int i; - // TODO: we need to implement peek(), without it (today!) we lose one message upon - // worker cancellation! -- rgerhards, 2008-01-14 - - /* process any pending thread requests so that we know who actually is still running */ - queueChkWrkThrdChanges(pThis); - - /* awake the workers one more time, just to be sure */ - queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ - - /* first tell the workers our request */ - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) { - dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i); - pthread_cancel(pThis->pWrkThrds[i].thrdID); - } - } - - return iRet; -} - - -/* Worker thread management function carried out when the main - * worker is about to terminate. +/* This function shuts down all worker threads and waits until they + * have terminated. If they timeout, they are cancelled. Parameters have been set + * before this function is called so that DA queues will be fully persisted to + * disk (if configured to do so). + * rgerhards, 2008-01-24 */ static rsRetVal queueShutdownWorkers(queue_t *pThis) { DEFiRet; int i; - assert(pThis != NULL); + ISOBJ_TYPE_assert(pThis, queue); - dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis); + dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis)); /* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This * is necessary so that all threads get sent the termination command. With a timeout of 0, however, * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as * good. */ - iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN, pThis->toQShutdown); + wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, pThis->toQShutdown); if(iRet == RS_RET_TIMED_OUT) { if(pThis->toQShutdown == 0) { iRet = RS_RET_OK; @@ -1387,418 +795,69 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* OK, we now need to try force the shutdown */ dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", queueGetID(pThis)); - iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); + iRet = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); } } if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */ /* still didn't work out - so we now need to cancel the workers */ dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis); - iRet = queueWrkThrdCancel(pThis); + iRet = wtpCancelAll(pThis->pWtpReg); + } + + // TODO: do it just once but right ;) + if(pThis->pWtpDA != NULL) { + wtpShutdownAll(pThis->pWtpDA, pThis->toQShutdown, pThis->toQShutdown); + if(iRet == RS_RET_TIMED_OUT) { + if(pThis->toQShutdown == 0) { + iRet = RS_RET_OK; + } else { + /* OK, we now need to try force the shutdown */ + dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", + queueGetID(pThis)); + iRet = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); + } + } + + if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */ + /* still didn't work out - so we now need to cancel the workers */ + dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis); + iRet = wtpCancelAll(pThis->pWtpDA); + } } + /* finally join the threads * In case of a cancellation, this may actually take some time. This is also * needed to clean up the thread descriptors, even with a regular termination. * And, most importantly, this is needed if we have an indifitite termination * time set (timeout == 0)! -- rgerhards, 2008-01-14 */ +#if 0 // totally wrong, we must implement something along these lines in wtp! +RUNLOG; for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRD_STOPPED) { - queueJoinWrkThrd(pThis, i); + if(pThis->pWtpReg->pWrkr[i]->tCurrCmd != eWRKTHRD_STOPPED) { + wtiJoinThrd(pThis->pWtpReg->pWrkr[i]); } } - dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", - queueGetID(pThis), pThis->iQueueSize); - - return iRet; -} - -/* This is a special consumer to feed the disk-queue in disk-assited mode. - * When active, our own queue more or less acts as a memory buffer to the disk. - * So this consumer just needs to drain the memory queue and submit entries - * to the disk queue. The disk queue will then call the actual consumer from - * the app point of view (we chain two queues here). - * rgerhards, 2008-01-14 - */ -static inline rsRetVal -queueDAConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst) -{ - DEFiRet; - int iCancelStateSave; - - ISOBJ_TYPE_assert(pThis, queue); - assert(pThis->qRunsDA != QRUNS_REGULAR); - ISOBJ_assert(pWrkrInst->pUsr); - -dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize);/* dirty iQueueSize! */ - CHKiRet(queueEnqObj(pThis->pqDA, pWrkrInst->pUsr)); - - /* We check if we reached the low water mark (but only if we are not in shutdown mode) - * Note that the child queue now in almost all cases is non-empty, because we just enqueued - * a message. Note that we need a quick check below to see if we are still in running state. - * If not, we do not go into the wait, because that's not a good thing to do. We do not - * do a full termination check, as this is done when we go back to the main worker loop. - * We need to re-aquire the queue mutex here, because we need to have a consistent - * access to the queue's admin data. - */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); -dbgprintf("pre mutex lock (think about CLEANUP!)\n"); - d_pthread_mutex_lock(pThis->mut); - pthread_cleanup_push(queueMutexCleanup, pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); -dbgprintf("mutex locked (think about CLEANUP!)\n"); - if(pThis->iQueueSize <= pThis->iLowWtrMrk && pWrkrInst->tCurrCmd == eWRKTHRD_RUNNING) { - dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n", - queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize); - /* wait for either passing the high water mark or the child disk queue drain */ - pthread_cond_wait(&pThis->condDA, pThis->mut); - } - pthread_cleanup_pop(1); /* release mutex in an atomic way via cleanup handler */ - -finalize_it: -dbgprintf("DAConsumer returns with iRet %d\n", iRet); - return iRet; -} - - -/* This is a helper for queueWorker () it either calls the configured - * consumer or the DA-consumer (if in disk-assisted mode). It is - * protected by the queue mutex, but MUST release it as soon as possible. - * Most importantly, it must release it before the consumer is called. - * rgerhards, 2008-01-14 - */ -static inline rsRetVal -queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancelStateSave) -{ - DEFiRet; - rsRetVal iRetLocal; - int iSeverity; - int iQueueSize; - void *pUsr; - int qRunsDA; - int iMyThrdIndx; - - ISOBJ_TYPE_assert(pThis, queue); - assert(pWrkrInst != NULL); - - iMyThrdIndx = pWrkrInst->iThrd; - - /* dequeue element (still protected from mutex) */ - iRet = queueDel(pThis, &pUsr); - queueChkPersist(pThis); // when we support peek(), we must do this down after the del! - iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */ - pWrkrInst->pUsr = pUsr; /* save it for the cancel cleanup handler */ - qRunsDA = pThis->qRunsDA; - d_pthread_mutex_unlock(pThis->mut); - pthread_cond_signal(&pThis->notFull); - pthread_setcancelstate(iCancelStateSave, NULL); - /* WE ARE NO LONGER PROTECTED FROM THE MUTEX */ - - /* do actual processing (the lengthy part, runs in parallel) - * If we had a problem while dequeing, we do not call the consumer, - * but we otherwise ignore it. This is in the hopes that it will be - * self-healing. However, this is really not a good thing. - * rgerhards, 2008-01-03 - */ - if(iRet != RS_RET_OK) - FINALIZE; - - /* call consumer depending on queue mode (in DA mode, we have just one thread, so it can not change) */ - if(qRunsDA == QRUNS_DA) { - queueDAConsumer(pThis, pWrkrInst); - } else { - /* we are running in normal, non-disk-assisted mode - * do a quick check if we need to drain the queue. It is OK to use the cached - * iQueueSize here, because it does not hurt if it is slightly wrong. - */ - if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { - iRetLocal = objGetSeverity(pUsr, &iSeverity); - if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { - dbgprintf("Queue 0x%lx/w%d: dequeue/queue nearly full (%d entries), " - "discarded severity %d message\n", - queueGetID(pThis), iMyThrdIndx, iQueueSize, iSeverity); - objDestruct(pUsr); - } - } else { - dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n", - queueGetID(pThis), iMyThrdIndx); - iRetLocal = pThis->pConsumer(pUsr); - if(iRetLocal != RS_RET_OK) { - dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n", - queueGetID(pThis), iMyThrdIndx, iRetLocal); +RUNLOG; + if(pThis->pWtpDA != NULL) { + for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { + if(pThis->pWtpDA->pWrkr[i]->tCurrCmd != eWRKTHRD_STOPPED) { + wtiJoinThrd(pThis->pWtpDA->pWrkr[i]); } } } +#endif -finalize_it: - if(iRet != RS_RET_OK) { - dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things " - "may happen\n", queueGetID(pThis), iMyThrdIndx, iRet); - } -dbgprintf("CallConsumer returns %d\n", iRet); - return iRet; -} - - - -/* cancellation cleanup handler for queueWorker () - * Updates admin structure and frees ressources. - * rgerhards, 2008-01-16 - */ -static void queueWorkerCancelCleanup(void *arg) -{ - qWrkThrd_t *pWrkrInst = (qWrkThrd_t*) arg; - queue_t *pThis; - int iCancelStateSave; - - assert(pWrkrInst != NULL); - ISOBJ_TYPE_assert(pWrkrInst->pQueue, queue); - pThis = pWrkrInst->pQueue; - - dbgprintf("Queue 0x%lx/w%d: cancelation cleanup handler called (NOT FULLY IMPLEMENTED, one msg lost!)\n", - queueGetID(pThis), pWrkrInst->iThrd); - - /* TODO: re-enqueue the data element! */ - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - qWrkrSetState(&pThis->pWrkThrds[pWrkrInst->iThrd], eWRKTHRD_TERMINATING); - pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - - dbgprintf("Queue 0x%lx/w%d: thread CANCELED with %d entries left in queue, %d workers running.\n", - queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize, pThis->iCurNumWrkThrd - 1); - - pThis->iCurNumWrkThrd--; - pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - d_pthread_mutex_unlock(&pThis->mutThrdMgmt); - pthread_setcancelstate(iCancelStateSave, NULL); -} - - -/* This function is created to keep the code in queueWorker () short. Thus it - * also does not abide to the usual calling conventions used in rsyslog. It is more - * like a macro. Its sole purpose is to have a handy shortcut for the queue - * termination condition. For the same reason, the calling parameters are a bit - * more verbose than the need to be in theory. The reasoning is the Worker has - * everything handy and so we do not need to access it from memory (OK, the - * optimized would probably have created the same code, but why not do it - * optimal right away...). The function returns 0 if the worker should terminate - * and something else if it should continue to run. - * rgerhards, 2008-01-18 - */ -static inline int -queueWorkerRemainActive(queue_t *pThis, qWrkThrd_t *pWrkrInst) -{ - register int b; /* this is a boolean! */ - - /* first check the usual termination condition that applies to all workers */ - b = ( (qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING) - || ((qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && (pThis->iQueueSize > 0))); -dbgprintf("Queue %p/w%d: chk 1 pre empty queue, qsize %d (high wtr %d), cont run: %d, cmd %d, DA qsize %d\n", pThis, - pWrkrInst->iThrd, - pThis->iQueueSize, pThis->iHighWtrMrk, b, qWrkrGetState(pWrkrInst), - (pThis->pqDA == NULL) ? -1 : pThis->pqDA->iQueueSize); - if(b && pWrkrInst->iThrd == 0 && pThis->qRunsDA == QRUNS_DA) { - b = pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->pqDA->iQueueSize != 0; - } + dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", + queueGetID(pThis), pThis->iQueueSize); -dbgprintf("Queue %p/w%d: pre empty queue, qsize %d, cont run: %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize, b); - return b; + RETiRet; } -/* Each queue has at least one associated worker (consumer) thread. It will pull - * the message from the queue and pass it to a user-defined function. - * This function was provided on construction. It MUST be thread-safe. - * Worker thread 0 is always reserved for disk-assisted mode (if the queue - * is not DA, this worker will be dormant). All other workers are for - * regular operations mode. Workers are started and stopped as need arises. - * rgerhards, 2008-01-15 - */ -static void * -queueWorker(void *arg) -{ - queue_t *pThis = (queue_t*) arg; - sigset_t sigSet; - struct timespec t; - int iMyThrdIndx; /* index for this thread in queue thread table */ - int iCancelStateSave; - qWrkThrd_t *pWrkrInst; /* for cleanup handler */ - int bContinueRun; - - ISOBJ_TYPE_assert(pThis, queue); - - sigfillset(&sigSet); - pthread_sigmask(SIG_BLOCK, &sigSet, NULL); - - /* do some one-time thread initialization */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - - /* initialize our thread instance descriptor */ - qWrkrInit(&pWrkrInst, pThis); - - iMyThrdIndx = pWrkrInst->iThrd; - - dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx); - -dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA); - if((iMyThrdIndx == 0) && (pThis->qRunsDA != QRUNS_DA)) { /* are we the DA worker? */ - if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */ - /* if we could not init the DA queue, we have nothing to do, so shut down. */ - queueTellActWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE); - } - } - - /* finally change to RUNNING state. We need to check if we actually should still run, - * because someone may have requested us to shut down even before we got a chance to do - * our init. That would be a bad race... -- rgerhards, 2008-01-16 - */ - pThis->iCurNumWrkThrd++; - if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT) - qWrkrSetState(pWrkrInst, eWRKTHRD_RUNNING); /* we are running now! */ - - pthread_cleanup_push(queueWorkerCancelCleanup, pWrkrInst); - - d_pthread_mutex_unlock(&pThis->mutThrdMgmt); - pthread_setcancelstate(iCancelStateSave, NULL); - /* end one-time stuff */ - - /* now we have our identity, on to real processing */ - bContinueRun = 1; /* we need this variable, because we need to check the actual termination condition - * while protected by mutex */ - while(bContinueRun) { -dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n", - queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd); - /* process any pending thread requests */ - queueChkWrkThrdChanges(pThis); - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->mut); -dbgprintf("pthis 2: %p\n", pThis); - - if((bContinueRun = queueWorkerRemainActive(pThis, pWrkrInst)) == 0) { -dbgprintf("pthis 2a: %p\n", pThis); - d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); -dbgprintf("pthis 2b: %p\n", pThis); - continue; /* and break loop */ - } - - /* if we reach this point, we are still protected by the mutex */ -dbgprintf("pthis 3: %p\n", pThis); - - if(pThis->iQueueSize == 0) { - dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", - queueGetID(pThis), iMyThrdIndx); - /* check if the parent DA worker is running and, if not, initiate it. Thanks - * to queueStrtDAWrkr (), we do not actually need to check (that routines does - * that for us, but we need to aquire the parent queue's mutex to call it. - */ - if(pThis->pqParent != NULL) { - dbgprintf("Queue %p: pre start parent %p worker\n", pThis, pThis->pqParent); - queueStrtDAWrkr(pThis->pqParent); - } - - if(pThis->bSignalOnEmpty > 0) { - /* we need to signal our parent queue that we are empty */ - dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); - pthread_cond_signal(pThis->condSignalOnEmpty); - dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx); - } - if(pThis->bSignalOnEmpty > 1) { - /* no mutex associated with this condition, it's just a try (but needed - * to wakeup a parent worker if e.g. the queue was restarted from disk) */ - pthread_cond_signal(pThis->condSignalOnEmpty2); - } - /* If we arrive here, we have the regular case, where we can safely assume that - * iQueueSize and tCmd have not changed since the while(). - */ - dbgprintf("Queue %p/w%d: pre condwait ->notEmpty, worker shutdown %d\n", pThis, iMyThrdIndx, pThis->toWrkShutdown); - /* DA worker and first worker never have an inactivity timeout */ - if(pWrkrInst->iThrd < 2 || pThis->toWrkShutdown == -1) { - //xxx if(pThis->toWrkShutdown == -1) { - dbgprintf("worker never times out!\n"); - /* never shut down any started worker */ - pthread_cond_wait(&pThis->notEmpty, pThis->mut); - } else { - queueTimeoutComp(&t, pThis->toWrkShutdown);/* get absolute timeout */ - if(pthread_cond_timedwait(&pThis->notEmpty, pThis->mut, &t) != 0) { - dbgprintf("Queue 0x%lx/w%d: inactivity timeout, worker terminating...\n", - queueGetID(pThis), iMyThrdIndx); - /* we use SHUTDOWN (and not SHUTDOWN_IMMEDIATE) so that the worker - * does not terminate if in the mean time a new message arrived. - */ - qWrkrSetState(pWrkrInst, eWRKTHRD_SHUTDOWN); - } - } - dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); - d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - pthread_testcancel(); /* see big comment below */ - pthread_yield(); /* see big comment below */ - continue; /* request next iteration */ - } - - /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */ - queueWorkerChkAndCallConsumer(pThis, pWrkrInst, iCancelStateSave); - - /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is - * a cancellation point in itself. As we run most of the time without cancel enabled, I fear - * we may never get cancelled if we do not create a cancellation point ourselfs. - */ - pthread_testcancel(); - /* We now yield to give the other threads a chance to obtain the mutex. If we do not - * do that, this thread may very well aquire the mutex again before another thread - * has even a chance to run. The reason is that mutex operations are free to be - * implemented in the quickest possible way (and they typically are!). That is, the - * mutex lock/unlock most probably just does an atomic memory swap and does not necessarily - * schedule other threads waiting on the same mutex. That can lead to the same thread - * aquiring the mutex ever and ever again while all others are starving for it. We - * have exactly seen this behaviour when we deliberately introduced a long-running - * test action which basically did a sleep. I understand that with real actions the - * likelihood of this starvation condition is very low - but it could still happen - * and would be very hard to debug. The yield() is a sure fix, its performance overhead - * should be well accepted given the above facts. -- rgerhards, 2008-01-10 - */ - pthread_yield(); - if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0) - dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has " - " %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); - } - - /* indicate termination */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); -dbgprintf("Queue %p: worker waiting for mutex\n", pThis); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - /* check if we are the DA worker and, if so, switch back to regular mode */ - if(pWrkrInst->iThrd == 0) { - queueTurnOffDAMode(pThis); - } - pthread_cleanup_pop(0); /* remove cleanup handler */ - - pThis->iCurNumWrkThrd--; /* one less ;) */ - /* if we ever need finalize_it, here would be the place for it! */ - if(qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN || - qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN_IMMEDIATE || - qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT || - qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_CREATED) { - /* in shutdown case, we need to flag termination. All other commands - * have a meaning to the thread harvester, so we can not overwrite them - */ -dbgprintf("Queue 0x%lx/w%d: setting termination state\n", queueGetID(pThis), iMyThrdIndx); - qWrkrSetState(pWrkrInst, eWRKTHRD_TERMINATING); - } - pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - d_pthread_mutex_unlock(&pThis->mutThrdMgmt); - pthread_setcancelstate(iCancelStateSave, NULL); - - pthread_exit(0); -} - /* Constructor for the queue object * This constructs the data structure, but does not yet start the queue. That @@ -1811,6 +870,8 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, DEFiRet; queue_t *pThis; +int *pBoom = NULL; +//*pBoom = 'A'; assert(ppThis != NULL); assert(pConsumer != NULL); assert(iWorkerThreads >= 0); @@ -1866,7 +927,205 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, finalize_it: OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP - return iRet; + RETiRet; +} + + +/* cancellation cleanup handler for queueWorker () + * Updates admin structure and frees ressources. + * rgerhards, 2008-01-16 + */ +static rsRetVal +queueConsumerCancelCleanup(void *arg1, void *arg2) +{ + queue_t *pThis = (queue_t*) arg1; + wti_t *pWti = (wti_t*) arg2; + + ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pWti, wti); + + dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n", + queueGetID(pThis)); + + /* TODO: re-enqueue the data element! */ + + return RS_RET_OK; +} + + + +/* This function checks if the provided message shall be discarded and does so, if needed. + * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to + * provide real-time creation of spool files. + * Note: cached copies of iQueueSize and bRunsDA are provided so that no mutex locks are required. + * The caller must have obtained them while the mutex was locked. Of course, these values may no + * longer be current, but that is OK for the discard check. At worst, the message is either processed + * or discarded when it should not have been. As discarding is in itself somewhat racy and erratic, + * that is no problems for us. This function MUST NOT lock the queue mutex, it could result in + * deadlocks! + * If the message is discarded, it can no longer be processed by the caller. So be sure to check + * the return state! + * rgerhards, 2008-01-24 + */ +static int queueChkDiscardMsg(queue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr) +{ + DEFiRet; + rsRetVal iRetLocal; + int iSeverity; + + ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_assert(pUsr); + + if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) { + iRetLocal = objGetSeverity(pUsr, &iSeverity); + if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { + dbgprintf("Queue 0x%lx: queue nearly full (%d entries), discarded severity %d message\n", + queueGetID(pThis), iQueueSize, iSeverity); + objDestruct(pUsr); + ABORT_FINALIZE(RS_RET_QUEUE_FULL); + } else { + dbgprintf("Queue 0x%lx: queue nearly full (%d entries), but could not drop msg " + "(iRet: %d, severity %d)\n", queueGetID(pThis), iQueueSize, + iRetLocal, iSeverity); + } + } + +finalize_it: + RETiRet; +} + + +/* dequeue the queued object for the queue consumers. + * rgerhards, 2008-10-21 + */ +static rsRetVal +queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +{ + DEFiRet; + void *pUsr; + int iQueueSize; + int bRunsDA; /* cache for early mutex release */ + + /* dequeue element (still protected from mutex) */ + iRet = queueDel(pThis, &pUsr); + queueChkPersist(pThis); // when we support peek(), we must do this down after the del! + iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */ + bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ + pWti->pUsrp = pUsr; /* save it for the cancel cleanup handler */ + d_pthread_mutex_unlock(pThis->mut); + pthread_cond_signal(&pThis->notFull); + pthread_setcancelstate(iCancelStateSave, NULL); + /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ + + /* do actual processing (the lengthy part, runs in parallel) + * If we had a problem while dequeing, we do not call the consumer, + * but we otherwise ignore it. This is in the hopes that it will be + * self-healing. However, this is really not a good thing. + * rgerhards, 2008-01-03 + */ + if(iRet != RS_RET_OK) + FINALIZE; + + /* we are running in normal, non-disk-assisted mode do a quick check if we need to drain the queue. + * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to + * provide real-time creation of spool files. + * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong. + */ + CHKiRet(queueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr)); + +finalize_it: + if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { + dbgprintf("Queue 0x%lx/w?: error %d dequeueing element - ignoring, but strange things " + "may happen\n", queueGetID(pThis), iRet); + } + RETiRet; +} + + +/* This is the queue consumer in the regular (non-DA) case. It is + * protected by the queue mutex, but MUST release it as soon as possible. + * rgerhards, 2008-01-21 + */ +static rsRetVal +queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pWti, wti); + + CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(pThis->pConsumer(pWti->pUsrp)); + +finalize_it: +dbgprintf("Queue %p: regular consumer returns %d\n", pThis, iRet); + RETiRet; +} + + +/* This is a special consumer to feed the disk-queue in disk-assited mode. + * When active, our own queue more or less acts as a memory buffer to the disk. + * So this consumer just needs to drain the memory queue and submit entries + * to the disk queue. The disk queue will then call the actual consumer from + * the app point of view (we chain two queues here). + * When this method is entered, the mutex is always locked and needs to be unlocked + * as part of the processing. + * rgerhards, 2008-01-14 + */ +static rsRetVal +queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pWti, wti); + +dbgprintf("Queue %p/w?: queueDAConsumer, queue size %d\n", pThis, pThis->iQueueSize);/* dirty iQueueSize! */ + CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(queueEnqObj(pThis->pqDA, pWti->pUsrp)); + +finalize_it: +dbgprintf("DAConsumer returns with iRet %d\n", iRet); + RETiRet; +} + + +/* must only be called when the queue mutex is locked, else results + * are not stable! + * Version when running in DA mode. + */ +static int +queueChkStopWrkrDA(queue_t *pThis) +{ + return pThis->bEnqOnly || !pThis->bRunsDA; +} + +/* must only be called when the queue mutex is locked, else results + * are not stable! + * Version when running in non-DA mode. + */ +static int +queueChkStopWrkrReg(queue_t *pThis) +{ + return pThis->bEnqOnly || pThis->bRunsDA; +} + + +/* must only be called when the queue mutex is locked, else results + * are not stable! DA version + */ +static int +queueIsIdleDA(queue_t *pThis) +{ + return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk)); +} +/* must only be called when the queue mutex is locked, else results + * are not stable! Regular version + */ +static int +queueIsIdleReg(queue_t *pThis) +{ + return (pThis->iQueueSize == 0); } @@ -1878,7 +1137,8 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ DEFiRet; rsRetVal iRetLocal; int bInitialized = 0; /* is queue already initialized? */ - int i; + uchar pszBuf[64]; + size_t lenBuf; assert(pThis != NULL); @@ -1888,7 +1148,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ if(pThis->pqParent == NULL) { dbgprintf("Queue %p: no parent, alloc mutex\n", pThis); pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t)); - pthread_mutex_init (pThis->mut, NULL); + pthread_mutex_init(pThis->mut, NULL); } else { /* child queue, we need to use parent's mutex */ pThis->mut = pThis->pqParent->mut; @@ -1909,15 +1169,23 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ - /* initialize worker thread instances - * TODO: move to separate function + /* create worker thread pools for regular operation. The DA pool is created on an as-needed + * basis, which potentially means never under most circumstances. */ - if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - for(i = 0 ; i < pThis->iNumWorkerThreads + 1 ; ++i) { - qWrkrConstructFinalize(&pThis->pWrkThrds[i], pThis, i); - } - + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx/Reg", (unsigned long) pThis); + CHKiRet(wtpConstruct (&pThis->pWtpReg)); + CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, queueChkStopWrkrReg)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, queueIsIdleReg)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, queueConsumerReg)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, queueConsumerCancelCleanup)); + CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); + CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); + CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); + CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis)); + CHKiRet(wtpConstructFinalize (pThis->pWtpReg)); + + /* initialize worker thread instances */ if(pThis->bIsDA) { /* If we are disk-assisted, we need to check if there is a QIF file * which we need to load. -- rgerhards, 2008-01-15 @@ -1927,7 +1195,11 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n", queueGetID(pThis)); - queueInitDA(pThis, QUEUE_MODE_ENQDEQ); /* initiate DA mode */ + queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ + /* we need to start the DA worker thread so that messages will be processed. So + * we advise the worker pool there is at least one needed. The wtp does the rest... + */ + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); bInitialized = 1; /* we are done */ } else { // TODO: use logerror? -- rgerhards, 2008-01-16 @@ -1938,16 +1210,14 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); if(!bInitialized) { dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis)); - /* fire up the worker threads */ - if(pThis->bEnqOnly == 0) - queueStrtNewWrkThrd(pThis); - // TODO: preforked workers! queueStrtAllWrkThrds(pThis); + /* we do not fire up any worker threads here, this happens automatically when they are needed */ + // TODO: preforked workers? queueStrtAllWrkThrds(pThis); } pThis->bQueueStarted = 1; finalize_it: dbgprintf("queueStart() exit, iret %d\n", iRet); - return iRet; + RETiRet; } @@ -2023,7 +1293,7 @@ finalize_it: if(psQIF != NULL) strmDestruct(&psQIF); - return iRet; + RETiRet; } @@ -2043,15 +1313,16 @@ rsRetVal queueChkPersist(queue_t *pThis) pThis->iUpdsSincePersist = 0; } - return iRet; + RETiRet; } /* destructor for the queue object */ rsRetVal queueDestruct(queue_t **ppThis) { - queue_t *pThis; DEFiRet; + queue_t *pThis; + DEFVARS_mutexProtection; assert(ppThis != NULL); pThis = *ppThis; @@ -2064,52 +1335,38 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove /* we do not need to take care of any messages left in queue if we are in enqueue only mode */ if(!pThis->bEnqOnly) { /* in regular mode, need look at termination */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ if(pThis->bIsDA && pThis->iQueueSize > 0) { // TODO: atomic iQueueSize! dbgprintf("IsDA queue, modifying params for draining\n"); pThis->iHighWtrMrk = 1; /* make sure we drain */ pThis->iLowWtrMrk = 0; /* disable low water mark algo */ - if(pThis->qRunsDA == QRUNS_REGULAR) { + if(pThis->bRunsDA == 0) { if(pThis->iQueueSize > 0) { - queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* initiate DA mode */ + queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ } } else { queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */ + /* worker may have been waited on low water mark, reactivate */ + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); } if(pThis->bSaveOnShutdown) { dbgprintf("bSaveOnShutdown set, eternal timeout set\n"); pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; } - /* now we need to activate workers (read doc/dev_queue.html) */ + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } - - /* wait until all pending workers are started up */ - qWrkrWaitAllWrkrStartup(pThis); - - // We need to startup a worker if we are in non-DA mode and the queue is not empty and not in enque-only mode */ - dbgprintf("Queue %p: queueDestruct probing if any regular workers need to be started, CurWrkr %d, qsize %d, qRunsDA %d\n", - pThis, pThis->iCurNumWrkThrd, pThis->iQueueSize, pThis->qRunsDA); - d_pthread_mutex_lock(pThis->mut); - dbgprintf("queueDestruct mutex locked\n"); - if(pThis->iCurNumWrkThrd == 0 && pThis->iQueueSize > 0 && !pThis->bEnqOnly) { - dbgprintf("Queue %p: queueDestruct must start regular workers!\n", pThis); - // TODO check mutex call order - doies function aquire mutex? - queueStrtNewWrkThrd(pThis); - } - d_pthread_mutex_unlock(pThis->mut); - dbgprintf("queueDestruct mutex unlocked\n"); - - /* wait again in case a new worker was started */ - qWrkrWaitAllWrkrStartup(pThis); } - /* terminate our own worker threads */ - if(pThis->pWrkThrds != NULL) { + /* at this point, the queue is either empty with all workers being idle (or deact) or the queue + * is full and all workers are running. We now need to wait for everyone to become idle. + */ + if(pThis->qType != QUEUETYPE_DIRECT) { queueShutdownWorkers(pThis); } - /* if still running DA, terminate disk queue */ - if(pThis->qRunsDA != QRUNS_REGULAR) + /* if still running DA, terminate disk queue (note that the DA queue is NULL if it was never used) */ + if(pThis->bRunsDA && pThis->pqDA != NULL) queueDestruct(&pThis->pqDA); /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) */ @@ -2118,9 +1375,10 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove } /* ... then free resources */ - if(pThis->pWrkThrds != NULL) { - free(pThis->pWrkThrds); - pThis->pWrkThrds = NULL; + if(pThis->qType != QUEUETYPE_DIRECT) { + wtpDestruct(&pThis->pWtpReg); + if(pThis->pWtpDA != NULL) + wtpDestruct(&pThis->pWtpDA); } if(pThis->pqParent == NULL) { @@ -2141,7 +1399,7 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove free(pThis); *ppThis = NULL; - return iRet; + RETiRet; } @@ -2167,7 +1425,7 @@ queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) pThis->lenFilePrefix = iLenPrefix; finalize_it: - return iRet; + RETiRet; } /* set the queue's maximum file size @@ -2187,7 +1445,7 @@ queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize) pThis->iMaxFileSize = iMaxFileSize; finalize_it: - return iRet; + RETiRet; } @@ -2203,65 +1461,64 @@ queueEnqObj(queue_t *pThis, void *pUsr) { DEFiRet; int iCancelStateSave; + int iMaxWorkers; int i; struct timespec t; - int iSeverity = 8; - rsRetVal iRetLocal; ISOBJ_TYPE_assert(pThis, queue); - /* process any pending thread requests */ - queueChkWrkThrdChanges(pThis); - +dbgprintf("Queue %p: EnqObj() 1\n", pThis); /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE * during its execution. If that is not done, race conditions occur if the * thread is canceled (most important use case is input module termination). * rgerhards, 2008-01-08 */ - if(pThis->pWrkThrds != NULL) { + if(pThis->qType != QUEUETYPE_DIRECT) { pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(pThis->mut); } - /* first check if we can discard anything */ - if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) { - iRetLocal = objGetSeverity(pUsr, &iSeverity); - if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { - dbgprintf("Queue 0x%lx: queue nearly full (%d entries), discarded severity %d message\n", - queueGetID(pThis), pThis->iQueueSize, iSeverity); - objDestruct(pUsr); - ABORT_FINALIZE(RS_RET_QUEUE_FULL); - } else { - dbgprintf("Queue 0x%lx: queue nearly full (%d entries), but could not drop msg " - "(iRet: %d, severity %d)\n", queueGetID(pThis), pThis->iQueueSize, - iRetLocal, iSeverity); - } - } + /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ + CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); +dbgprintf("Queue %p: EnqObj() 10\n", pThis); /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) CHKiRet(queueChkStrtDA(pThis)); - /* re-process any new pending thread requests and see if we need to start workers */ - queueChkAndStrtWrk(pThis); +RUNLOG_VAR("%d", pThis->bIsDA); + /* make sure at least one worker is running. */ + if(pThis->bRunsDA) { +RUNLOG; + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ + } else { + if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; + } else { + iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1; + } +RUNLOG; + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); + } - /* and finally (try to) enqueue what is left over */ + /* wait for the queue to be ready... */ while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis)); - queueTimeoutComp(&t, pThis->toEnq); + timeoutComp(&t, pThis->toEnq); if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", queueGetID(pThis)); objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } } + + /* and finally enqueue the message */ CHKiRet(queueAdd(pThis, pUsr)); queueChkPersist(pThis); finalize_it: - /* now awake sleeping worker threads */ - if(pThis->pWrkThrds != NULL) { + if(pThis->qType != QUEUETYPE_DIRECT) { d_pthread_mutex_unlock(pThis->mut); i = pthread_cond_signal(&pThis->notEmpty); dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); @@ -2269,7 +1526,7 @@ finalize_it: } - return iRet; + RETiRet; } @@ -2307,7 +1564,8 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly) /* this means we need to terminate all workers - that's it... */ dbgprintf("Queue 0x%lx: switching to enqueue-only mode, terminating all worker threads\n", queueGetID(pThis)); - queueWrkThrdReqTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0); + wtpWakeupAllWrkr(pThis->pWtpDA); + wtpWakeupAllWrkr(pThis->pWtpReg); } else { /* switch back to regular mode */ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */ @@ -2321,7 +1579,7 @@ finalize_it: d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); } - return iRet; + RETiRet; } @@ -2360,19 +1618,18 @@ static rsRetVal queueSetProperty(queue_t *pThis, property_t *pProp) } finalize_it: - return iRet; + RETiRet; } #undef isProp - - /* Initialize the stream class. Must be called as the very first method * before anything else is called inside this class. * rgerhards, 2008-01-09 */ BEGINObjClassInit(queue, 1) - //OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty); //OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize); +//fprintf(stdout, "queueChkStopWrkrReg: %p\n", queueChkStopWrkrReg); +//fprintf(stdout, "queueChkStopWrkrDA: %p\n", queueChkStopWrkrDA); ENDObjClassInit(queue) /* @@ -25,27 +25,9 @@ #include <pthread.h> #include "obj.h" +#include "wtp.h" #include "stream.h" -/* some information about disk files used by the queue. In the long term, we may - * export this settings to a separate file module - or not (if they are too - * queue-specific. I just thought I mention it here so that everyone is aware - * of this possibility. -- rgerhards, 2008-01-07 - */ -typedef struct { - int fd; /* the file descriptor, -1 if closed */ - uchar *pszFileName; /* name of current file (if open) */ - int iCurrFileNum;/* current file number (NOT descriptor, but the number in the file name!) */ - size_t iCurrOffs;/* current offset */ - uchar *pIOBuf; /* io Buffer */ - int iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */ - int iBufPtr; /* pointer into current buffer */ - int iUngetC; /* char set via UngetChar() call or -1 if none set */ - int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */ -} queueFileDescription_t; -#define qFILE_IOBUF_SIZE 4096 /* size of the IO buffer */ - - /* queue types */ typedef enum { QUEUETYPE_FIXED_ARRAY = 0,/* a simple queue made out of a fixed (initially malloced) array fast but memoryhog */ @@ -60,17 +42,6 @@ typedef struct qLinkedList_S { void *pUsr; } qLinkedList_t; -/* commands and states for worker threads. */ -typedef enum { - eWRKTHRD_STOPPED = 0, /* worker thread is not running (either actually never ran or was shut down) */ - eWRKTHRD_TERMINATING = 1,/* worker thread has shut down, but some finalzing is still needed */ - /* ALL active states MUST be numerically higher than eWRKTHRD_TERMINATED and NONE must be lower! */ - eWRKTHRD_RUN_CREATED = 2,/* worker thread has been created, but not yet begun initialization (prob. not yet scheduled) */ - eWRKTHRD_RUN_INIT = 3, /* worker thread is initializing, but not yet fully running */ - eWRKTHRD_RUNNING = 4, /* worker thread is up and running and shall continue to do so */ - eWRKTHRD_SHUTDOWN = 5, /* worker thread is running but shall terminate when queue is empty */ - eWRKTHRD_SHUTDOWN_IMMEDIATE = 6/* worker thread is running but shall terminate even if queue is full */ -} qWrkCmd_t; typedef struct qWrkThrd_s { pthread_t thrdID; /* thread ID */ @@ -95,7 +66,8 @@ typedef struct queue_s { int iNumWorkerThreads;/* number of worker threads to use */ int iCurNumWrkThrd;/* current number of active worker threads */ int iMinMsgsPerWrkr;/* minimum nbr of msgs per worker thread, if more, a new worker is started until max wrkrs */ - qWrkThrd_t *pWrkThrds;/* array with control structure for the worker thread(s) associated with this queue */ + wtp_t *pWtpDA; + wtp_t *pWtpReg; int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ int iHighWtrMrk; /* high water mark for disk-assisted memory queues */ @@ -138,11 +110,7 @@ typedef struct queue_s { int iNumberFiles; /* how many files make up the queue? */ size_t iMaxFileSize; /* max size for a single queue file */ int bIsDA; /* is this queue disk assisted? */ - enum { - QRUNS_REGULAR, - QRUNS_DA_INIT, - QRUNS_DA - } qRunsDA; /* is this queue actually *running* disk assisted? if so, which mode? */ + int bRunsDA; /* is this queue actually *running* disk assisted? */ pthread_mutex_t mutDA; /* mutex for low water mark algo */ pthread_cond_t condDA; /* and its matching condition */ struct queue_s *pqDA; /* queue for disk-assisted modes */ @@ -112,6 +112,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_TIMED_OUT = -2041, /**< timeout occured (not necessarily an error) */ RS_RET_QSIZE_ZERO = -2042, /**< queue size is zero where this is not supported */ RS_RET_ALREADY_STARTING = -2043, /**< something (a thread?) is already starting - not necessarily an error */ + RS_RET_NO_MORE_THREADS = -2044, /**< no more threads available, not necessarily an error */ RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */ RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */ RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */ @@ -128,11 +129,9 @@ typedef enum rsRetVal_ rsRetVal; /**< friendly type for global return value */ #define CHKiRet_Hdlr(code) if((iRet = code) != RS_RET_OK) /* macro below is used in conjunction with CHKiRet_Hdlr, else use ABORT_FINALIZE */ #define FINALIZE goto finalize_it; -#if 0 /* DEV debug: set to 1 to get a rough call trace -- rgerhards, 2008-01-13 */ -# define DEFiRet dbgprintf("Entering %s, line %d\n", __FILE__, __LINE__); rsRetVal iRet = RS_RET_OK -#else -# define DEFiRet rsRetVal iRet = RS_RET_OK -#endif +#define DEFiRet BEGINfunc rsRetVal iRet = RS_RET_OK +#define RETiRet do{ ENDfunc return iRet; }while(0) + #define ABORT_FINALIZE(errCode) \ do { \ iRet = errCode; \ @@ -197,6 +196,8 @@ typedef unsigned char uchar; /* The following prototype is convenient, even though it may not be the 100% correct place.. -- rgerhards 2008-01-07 */ void dbgprintf(char *, ...) __attribute__((format(printf, 1, 2))); +#include "debug.h" + #endif /* multi-include protection */ /* * vi:set ai: @@ -291,7 +291,7 @@ rsRetVal genFileName(uchar **ppName, uchar *pDirName, size_t lenDirName, uchar * *ppName = pName; finalize_it: - return iRet; + RETiRet; } /* get the number of digits required to represent a given number. We use an @@ -337,8 +337,10 @@ timeoutComp(struct timespec *pt, int iTimeout) void mutexCancelCleanup(void *arg) { + BEGINfunc assert(arg != NULL); d_pthread_mutex_unlock((pthread_mutex_t*) arg); + ENDfunc } @@ -107,7 +107,7 @@ static rsRetVal strmOpenFile(strm_t *pThis) iFlags, pThis->fd); finalize_it: - return iRet; + RETiRet; } @@ -138,7 +138,7 @@ static rsRetVal strmCloseFile(strm_t *pThis) pThis->pszCurrFName = NULL; } - return iRet; + RETiRet; } @@ -165,7 +165,7 @@ strmNextFile(strm_t *pThis) pThis->iCurrFNum = (pThis->iCurrFNum + 1) % pThis->iMaxFiles; finalize_it: - return iRet; + RETiRet; } @@ -228,7 +228,7 @@ rsRetVal strmReadChar(strm_t *pThis, uchar *pC) //dbgprintf("ReadChar: read %c, offset %d\n", *pC, pThis->iCurrOffs); finalize_it: - return iRet; + RETiRet; } @@ -285,7 +285,7 @@ finalize_it: if(iRet != RS_RET_OK && pCStr != NULL) rsCStrDestruct(pCStr); - return iRet; + RETiRet; } #endif /* #if 0 - saved code */ @@ -293,13 +293,13 @@ finalize_it: /* Standard-Constructor for the strm object */ -BEGINobjConstruct(strm) +BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro! */ pThis->iCurrFNum = 1; pThis->fd = -1; pThis->iUngetC = -1; pThis->sType = STREAMTYPE_FILE_SINGLE; pThis->sIOBufSize = glblGetIOBufSize(); - pThis->tOpenMode = 0600; + pThis->tOpenMode = 0600; /* TODO: make configurable */ ENDobjConstruct(strm) @@ -319,7 +319,7 @@ rsRetVal strmConstructFinalize(strm_t *pThis) } finalize_it: - return iRet; + RETiRet; } @@ -347,7 +347,7 @@ rsRetVal strmDestruct(strm_t **ppThis) free(pThis); *ppThis = NULL; - return iRet; + RETiRet; } @@ -370,7 +370,7 @@ static rsRetVal strmCheckNextOutputFile(strm_t *pThis) } finalize_it: - return iRet; + RETiRet; } /* write memory buffer to a stream object. @@ -413,7 +413,7 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) finalize_it: pThis->iBufPtr = 0; /* see comment above */ - return iRet; + RETiRet; } @@ -432,7 +432,7 @@ rsRetVal strmFlush(strm_t *pThis) iRet = strmWriteInternal(pThis, pThis->pIOBuf, pThis->iBufPtr); } - return iRet; + RETiRet; } @@ -457,7 +457,7 @@ dbgprintf("seek(%d, %ld): %d\n", pThis->fd, offs, i); pThis->iCurrOffs = offs; /* we are now at *this* offset */ pThis->iBufPtr = 0; /* buffer invalidated */ - return iRet; + RETiRet; } @@ -471,7 +471,7 @@ rsRetVal strmSeekCurrOffs(strm_t *pThis) ISOBJ_TYPE_assert(pThis, strm); iRet = strmSeek(pThis, pThis->iCurrOffs); - return iRet; + RETiRet; } @@ -492,7 +492,7 @@ rsRetVal strmWriteChar(strm_t *pThis, uchar c) pThis->iBufPtr++; finalize_it: - return iRet; + RETiRet; } @@ -508,7 +508,7 @@ rsRetVal strmWriteLong(strm_t *pThis, long i) CHKiRet(strmWrite(pThis, szBuf, strlen((char*)szBuf))); finalize_it: - return iRet; + RETiRet; } @@ -551,7 +551,7 @@ rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) } finalize_it: - return iRet; + RETiRet; } @@ -581,7 +581,7 @@ rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal) pThis->iAddtlOpenFlags = iNewVal; finalize_it: - return iRet; + RETiRet; } @@ -608,7 +608,7 @@ strmSetFName(strm_t *pThis, uchar *pszName, size_t iLenName) pThis->lenFName = iLenName; finalize_it: - return iRet; + RETiRet; } @@ -635,7 +635,7 @@ strmSetDir(strm_t *pThis, uchar *pszDir, size_t iLenDir) pThis->lenDir = iLenDir; finalize_it: - return iRet; + RETiRet; } @@ -678,7 +678,7 @@ rsRetVal strmRecordEnd(strm_t *pThis) pThis->bInRecord = 0; iRet = strmCheckNextOutputFile(pThis); /* check if we need to switch files */ - return iRet; + RETiRet; } /* end stream record support functions */ @@ -723,7 +723,7 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) CHKiRet(objEndSerialize(pStrm)); finalize_it: - return iRet; + RETiRet; } @@ -763,7 +763,7 @@ rsRetVal strmSetProperty(strm_t *pThis, property_t *pProp) } finalize_it: - return iRet; + RETiRet; } #undef isProp diff --git a/stringbuf.c b/stringbuf.c index b777c40d..e89ce768 100755 --- a/stringbuf.c +++ b/stringbuf.c @@ -180,7 +180,7 @@ static rsRetVal rsCStrExtendBuf(rsCStrObj *pThis, size_t iMinNeeded) pThis->pBuf = pNewBuf; finalize_it: - return iRet; + RETiRet; } @@ -191,7 +191,7 @@ finalize_it: */ rsRetVal rsCStrAppendStrWithLen(rsCStrObj *pThis, uchar* psz, size_t iStrLen) { - rsRetVal iRet; + DEFiRet; rsCHECKVALIDOBJECT(pThis, OIDrsCStr); assert(psz != NULL); @@ -206,7 +206,7 @@ rsRetVal rsCStrAppendStrWithLen(rsCStrObj *pThis, uchar* psz, size_t iStrLen) pThis->iStrLen += iStrLen; finalize_it: - return iRet; + RETiRet; } @@ -223,15 +223,16 @@ rsRetVal rsCStrAppendStr(rsCStrObj *pThis, uchar* psz) rsRetVal rsCStrAppendInt(rsCStrObj *pThis, long i) { - rsRetVal iRet; + DEFiRet; uchar szBuf[32]; rsCHECKVALIDOBJECT(pThis, OIDrsCStr); - if((iRet = srUtilItoA((char*) szBuf, sizeof(szBuf), i)) != RS_RET_OK) - return iRet; + CHKiRet(srUtilItoA((char*) szBuf, sizeof(szBuf), i)); - return rsCStrAppendStr(pThis, szBuf); + iRet = rsCStrAppendStr(pThis, szBuf); +finalize_it: + RETiRet; } @@ -255,7 +256,7 @@ rsRetVal rsCStrAppendChar(rsCStrObj *pThis, uchar c) } finalize_it: - return iRet; + RETiRet; } @@ -420,7 +421,7 @@ finalize_it: free(pThis->pBuf); RSFREEOBJ(pThis); - return(iRet); + RETiRet; } @@ -625,10 +626,12 @@ int rsCStrStartsWithSzStr(rsCStrObj *pCS1, uchar *psz, size_t iLenSz) int rsCStrSzStrMatchRegex(rsCStrObj *pCS1, uchar *psz) { regex_t preq; + BEGINfunc regcomp(&preq, (char*) rsCStrGetSzStr(pCS1), 0); - int iRet = regexec(&preq, (char*) psz, 0, NULL, 0); + int ret = regexec(&preq, (char*) psz, 0, NULL, 0); regfree(&preq); - return iRet; + ENDfunc + return ret; } /* compare a rsCStr object with a classical sz string. This function @@ -654,6 +657,7 @@ int rsCStrSzStrMatchRegex(rsCStrObj *pCS1, uchar *psz) */ int rsCStrOffsetSzStrCmp(rsCStrObj *pCS1, size_t iOffset, uchar *psz, size_t iLenSz) { + BEGINfunc rsCHECKVALIDOBJECT(pCS1, OIDrsCStr); assert(iOffset < pCS1->iStrLen); assert(psz != NULL); @@ -662,9 +666,10 @@ int rsCStrOffsetSzStrCmp(rsCStrObj *pCS1, size_t iOffset, uchar *psz, size_t iLe /* we are using iLenSz below, because the lengths * are equal and iLenSz is faster to access */ - if(iLenSz == 0) + if(iLenSz == 0) { return 0; /* zero-sized strings are equal ;) */ - else { /* we now have two non-empty strings of equal + ENDfunc + } else { /* we now have two non-empty strings of equal * length, so we need to actually check if they * are equal. */ @@ -675,10 +680,13 @@ int rsCStrOffsetSzStrCmp(rsCStrObj *pCS1, size_t iOffset, uchar *psz, size_t iLe } /* if we arrive here, the strings are equal */ return 0; + ENDfunc } } - else + else { return pCS1->iStrLen - iOffset - iLenSz; + ENDfunc + } } @@ -178,6 +178,8 @@ #include "threads.h" #include "queue.h" #include "stream.h" +#include "wti.h" +#include "wtp.h" /* We define our own set of syslog defintions so that we * do not need to rely on (possibly different) implementations. @@ -259,7 +261,6 @@ char ctty[] = _PATH_CONSOLE; /* this is read-only; used by omfile -- TODO: remov static pid_t myPid; /* our pid for use in self-generated messages, e.g. on startup */ /* mypid is read-only after the initial fork() */ -static int debugging_on = 0; /* read-only, except on sig USR1 */ static int restart = 0; /* do restart (config read) - multithread safe */ int glblHadMemShortage = 0; /* indicates if we had memory shortage some time during the run */ @@ -368,7 +369,6 @@ static int bDropTrailingLF = 1; /* drop trailing LF's on reception? */ int iCompatibilityMode = 0; /* version we should be compatible with; 0 means sysklogd. It is the default, so if no -c<n> option is given, we make ourselvs as compatible to sysklogd as possible. */ -int Debug; /* debug flag - read-only after startup */ static int bDebugPrintTemplateList = 1;/* output template list in debug mode? */ static int bDebugPrintCfSysLineHandlerList = 1;/* output cfsyslinehandler list in debug mode? */ static int bDebugPrintModuleList = 1;/* output module list in debug mode? */ @@ -558,7 +558,6 @@ static void debug_switch(); static rsRetVal cfline(uchar *line, selector_t **pfCurr); static int decode(uchar *name, struct code *codetab); static void sighup_handler(); -//static void die(int sig); static void freeSelectors(void); static rsRetVal processConfFile(uchar *pConfFile); static rsRetVal selectorAddList(selector_t *f); @@ -1157,7 +1156,7 @@ finalize_it: } } *ppThis = pThis; - return iRet; + RETiRet; } @@ -1315,7 +1314,7 @@ rsRetVal printline(char *hname, char *msg, int bParseHost) logmsg(pri, pMsg, SYNC_FILE); finalize_it: - return iRet; + RETiRet; } @@ -1531,7 +1530,7 @@ logmsgInternal(int pri, char *msg, int flags) logmsg(pri, pMsg, flags); } finalize_it: - return iRet; + RETiRet; } /* This functions looks at the given message and checks if it matches the @@ -1761,7 +1760,7 @@ finalize_it: UnlockObj(pAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ pthread_setcancelstate(iCancelStateSave, NULL); - return iRet; + RETiRet; } @@ -1798,7 +1797,7 @@ DEFFUNC_llExecFunc(processMsgDoActions) } finalize_it: - return iRet; + RETiRet; } @@ -1812,6 +1811,7 @@ processMsg(msg_t *pMsg) int bContinue; processMsgDoActions_t DoActData; + BEGINfunc assert(pMsg != NULL); /* log the message to the particular outputs */ @@ -1829,6 +1829,7 @@ processMsg(msg_t *pMsg) if(llExecFunc(&f->llActList, processMsgDoActions, (void*)&DoActData) == RS_RET_DISCARDMSG) bContinue = 0; } + ENDfunc } @@ -1843,6 +1844,7 @@ processMsg(msg_t *pMsg) static rsRetVal msgConsumer(void *pUsr) { + DEFiRet; msg_t *pMsg = (msg_t*) pUsr; assert(pMsg != NULL); @@ -1850,7 +1852,7 @@ msgConsumer(void *pUsr) processMsg(pMsg); MsgDestruct(&pMsg); - return RS_RET_OK; + RETiRet; } @@ -2251,6 +2253,7 @@ logmsg(int pri, msg_t *pMsg, int flags) char *msg; char PRItext[20]; + BEGINfunc assert(pMsg != NULL); assert(pMsg->pszUxTradMsg != NULL); msg = (char*) pMsg->pszUxTradMsg; @@ -2320,6 +2323,7 @@ logmsg(int pri, msg_t *pMsg, int flags) pMsg->msgFlags = flags; MsgPrepareEnqueue(pMsg); queueEnqObj(pMsgQueue, (void*) pMsg); + ENDfunc } @@ -2432,7 +2436,7 @@ finalize_it: pAction->f_pMsg = pMsgSave; /* restore it */ } - return iRet; + RETiRet; } @@ -2545,6 +2549,7 @@ void logerror(char *type) char buf[1024]; char errStr[1024]; + BEGINfunc dbgprintf("Called logerr, msg: %s\n", type); if (errno == 0) @@ -2556,6 +2561,7 @@ void logerror(char *type) buf[sizeof(buf)/sizeof(char) - 1] = '\0'; /* just to be on the safe side... */ errno = 0; logmsgInternal(LOG_SYSLOG|LOG_ERR, buf, ADDDATE); + ENDfunc return; } @@ -2569,7 +2575,12 @@ void logerror(char *type) */ static void doDie(int sig) { + static int iRetries = 0; /* debug aid */ dbgprintf("DoDie called.\n"); + if(iRetries++ == 4) { + dbgprintf("DoDie called 5 times - unconditional exit\n"); + exit(1); + } bFinished = sig; } @@ -2663,6 +2674,9 @@ die(int sig) if(pModDir != NULL) free(pModDir); + /* exit classes... */ + dbgClassExit(); + dbgprintf("Clean shutdown completed, bye.\n"); exit(0); /* "good" exit, this is the terminator function for rsyslog [die()] */ } @@ -2747,7 +2761,7 @@ finalize_it: if(pDir != NULL) closedir(pDir); - return iRet; + RETiRet; } @@ -2797,7 +2811,7 @@ static rsRetVal doIncludeLine(uchar **pp, __attribute__((unused)) void* pVal) globfree(&cfgFiles); finalize_it: - return iRet; + RETiRet; } @@ -2873,7 +2887,7 @@ static rsRetVal doModLoad(uchar **pp, __attribute__((unused)) void* pVal) skipWhiteSpace(pp); /* skip over any whitespace */ finalize_it: - return iRet; + RETiRet; } /* parse and interpret a $-config line that starts with @@ -2933,7 +2947,7 @@ static rsRetVal doNameLine(uchar **pp, void* pVal) *pp = p; finalize_it: - return iRet; + RETiRet; } @@ -2995,7 +3009,7 @@ rsRetVal cfsysline(uchar *p) } finalize_it: - return iRet; + RETiRet; } @@ -3059,7 +3073,7 @@ DEFFUNC_llExecFunc(dbgPrintInitInfoAction) iRet = actionDbgPrint((action_t*) pData); printf("\n"); - return iRet; + RETiRet; } /* print debug information as part of init(). This pretty much @@ -3235,7 +3249,7 @@ finalize_it: dbgprintf("error %d processing config file '%s'; os error (if any): %s\n", iRet, pConfFile, errStr); } - return iRet; + RETiRet; } @@ -3263,6 +3277,7 @@ startInputModules(void) pMod = modGetNxtType(pMod, eMOD_IN); } + ENDfunc return RS_RET_OK; /* intentional: we do not care about module errors */ } @@ -3448,6 +3463,7 @@ init(void) sigaction(SIGHUP, &sigAct, NULL); dbgprintf(" (re)started.\n"); + ENDfunc } @@ -3510,7 +3526,7 @@ rsRetVal cflineParseTemplateName(uchar** pp, omodStringRequest_t *pOMSR, int iEn finalize_it: *pp = p; - return iRet; + RETiRet; } /* Helper to cfline(). Parses a file name up until the first @@ -3540,7 +3556,7 @@ rsRetVal cflineParseFileName(uchar* p, uchar *pFileName, omodStringRequest_t *pO iRet = cflineParseTemplateName(&p, pOMSR, iEntry, iTplOpts, (uchar*) " TradFmt"); - return iRet; + RETiRet; } @@ -3999,7 +4015,7 @@ finalize_it: actionDestruct(pAction); } - return iRet; + RETiRet; } @@ -4037,7 +4053,7 @@ static rsRetVal cflineDoFilter(uchar **pp, selector_t *f) return(iRet); } - return iRet; + RETiRet; } @@ -4087,7 +4103,7 @@ static rsRetVal cflineDoAction(uchar **p, action_t **ppAction) } *ppAction = pAction; - return iRet; + RETiRet; } @@ -4105,7 +4121,7 @@ DEFFUNC_llExecFunc(selectorAddListCheckActionsChecker) Forwarding++; } - return iRet; + RETiRet; } /* loop through a list of actions and perform necessary checks and @@ -4123,7 +4139,7 @@ static rsRetVal selectorAddListCheckActions(selector_t *f) CHKiRet(llExecFunc(&f->llActList, selectorAddListCheckActionsChecker, NULL)); finalize_it: - return iRet; + RETiRet; } @@ -4174,7 +4190,7 @@ static rsRetVal selectorAddList(selector_t *f) } finalize_it: - return iRet; + RETiRet; } @@ -4215,7 +4231,7 @@ static rsRetVal cflineClassic(uchar *p, selector_t **pfCurr) finalize_it: *pfCurr = fCurr; - return iRet; + RETiRet; } @@ -4249,7 +4265,7 @@ static rsRetVal cfline(uchar *line, selector_t **pfCurr) break; } - return iRet; + RETiRet; } @@ -4278,7 +4294,7 @@ static rsRetVal setMainMsgQueType(void __attribute__((unused)) *pVal, uchar *psz } free(pszType); /* no longer needed */ - return iRet; + RETiRet; } @@ -4312,49 +4328,6 @@ int decode(uchar *name, struct code *codetab) return (-1); } -extern void dbgprintf(char *fmt, ...) __attribute__((format(printf,1, 2))); -void -dbgprintf(char *fmt, ...) -{ - static pthread_t ptLastThrdID = 0; - static int bWasNL = FALSE; - va_list ap; - - if ( !(Debug && debugging_on) ) - return; - - /* The bWasNL handler does not really work. It works if no thread - * switching occurs during non-NL messages. Else, things are messed - * up. Anyhow, it works well enough to provide useful help during - * getting this up and running. It is questionable if the extra effort - * is worth fixing it, giving the limited appliability. - * rgerhards, 2005-10-25 - * I have decided that it is not worth fixing it - especially as it works - * pretty well. - * rgerhards, 2007-06-15 - */ - if(ptLastThrdID != pthread_self()) { - if(!bWasNL) { - fprintf(stdout, "\n"); - bWasNL = 1; - } - ptLastThrdID = pthread_self(); - } - - if(bWasNL) { - fprintf(stdout, "%8.8x: ", (unsigned int) pthread_self()); - //fprintf(stderr, "%8.8x: ", (unsigned int) pthread_self()); - } - bWasNL = (*(fmt + strlen(fmt) - 1) == '\n') ? TRUE : FALSE; - va_start(ap, fmt); - vfprintf(stdout, fmt, ap); - //vfprintf(stderr, fmt, ap); - va_end(ap); - - //fflush(stderr); - fflush(stdout); - return; -} /* @@ -4448,6 +4421,7 @@ mainloop(void) { struct timeval tvSelectTimeout; + BEGINfunc while(!bFinished){ /* first check if we have any internal messages queued and spit them out */ /* TODO: do we need this any longer? I doubt it, but let's care about it @@ -4493,6 +4467,7 @@ mainloop(void) continue; } } + ENDfunc } /* If user is not root, prints warnings or even exits @@ -4542,16 +4517,20 @@ static rsRetVal loadBuildInModules(void) { DEFiRet; - if((iRet = doModInit(modInitFile, (uchar*) "builtin-file", NULL)) != RS_RET_OK) - return iRet; + if((iRet = doModInit(modInitFile, (uchar*) "builtin-file", NULL)) != RS_RET_OK) { + RETiRet; + } #ifdef SYSLOG_INET - if((iRet = doModInit(modInitFwd, (uchar*) "builtin-fwd", NULL)) != RS_RET_OK) - return iRet; + if((iRet = doModInit(modInitFwd, (uchar*) "builtin-fwd", NULL)) != RS_RET_OK) { + RETiRet; + } #endif - if((iRet = doModInit(modInitShell, (uchar*) "builtin-shell", NULL)) != RS_RET_OK) - return iRet; - if((iRet = doModInit(modInitDiscard, (uchar*) "builtin-discard", NULL)) != RS_RET_OK) - return iRet; + if((iRet = doModInit(modInitShell, (uchar*) "builtin-shell", NULL)) != RS_RET_OK) { + RETiRet; + } + if((iRet = doModInit(modInitDiscard, (uchar*) "builtin-discard", NULL)) != RS_RET_OK) { + RETiRet; + } /* dirty, but this must be for the time being: the usrmsg module must always be * loaded as last module. This is because it processes any time of action selector. @@ -4563,7 +4542,7 @@ static rsRetVal loadBuildInModules(void) * [a-zA-Z0-9_.] */ if((iRet = doModInit(modInitUsrMsg, (uchar*) "builtin-usrmsg", NULL)) != RS_RET_OK) - return iRet; + RETiRet; /* ok, initialization of the command handler probably does not 100% belong right in * this space here. However, with the current design, this is actually quite a good @@ -4607,7 +4586,7 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL)); finalize_it: - return iRet; + RETiRet; } @@ -4708,6 +4687,7 @@ static void mainThread() */ mainloop(); + ENDfunc } @@ -4719,20 +4699,26 @@ static rsRetVal InitGlobalClasses(void) DEFiRet; CHKiRet(objClassInit()); /* *THIS* *MUST* always be the first class initilizere called! */ +fprintf(stdout, " calling dbgClassInit\n"); + //CHKiRet(dbgClassInit()); CHKiRet(MsgClassInit()); CHKiRet(strmClassInit()); + CHKiRet(wtiClassInit()); + CHKiRet(wtpClassInit()); CHKiRet(queueClassInit()); finalize_it: - return iRet; + RETiRet; } + /* This is the main entry point into rsyslogd. Over time, we should try to * modularize it a bit more... */ int main(int argc, char **argv) { +dbgClassInit(); DEFiRet; register int i; @@ -4987,6 +4973,8 @@ int main(int argc, char **argv) memset(&sigAct, 0, sizeof (sigAct)); sigemptyset(&sigAct.sa_mask); + sigAct.sa_handler = sigsegvHdlr; + sigaction(SIGSEGV, &sigAct, NULL); sigAct.sa_handler = doDie; sigaction(SIGTERM, &sigAct, NULL); sigAct.sa_handler = Debug ? doDie : SIG_IGN; @@ -4995,7 +4983,8 @@ int main(int argc, char **argv) sigAct.sa_handler = reapchild; sigaction(SIGCHLD, &sigAct, NULL); sigAct.sa_handler = Debug ? debug_switch : SIG_IGN; - sigaction(SIGUSR1, &sigAct, NULL); +// TODO: use signal 2 + //sigaction(SIGUSR1, &sigAct, NULL); sigAct.sa_handler = SIG_IGN; sigaction(SIGPIPE, &sigAct, NULL); sigaction(SIGXFSZ, &sigAct, NULL); /* do not abort if 2gig file limit is hit */ @@ -5012,6 +5001,7 @@ finalize_it: if(iRet != RS_RET_OK) fprintf(stderr, "rsyslogd run failed with error %d.\n", iRet); + ENDfunc return 0; } diff --git a/tcpsyslog.c b/tcpsyslog.c index 01655620..80f0d82a 100644 --- a/tcpsyslog.c +++ b/tcpsyslog.c @@ -1220,7 +1220,7 @@ static rsRetVal TCPSendBldFrame(TCPFRAMINGMODE rqdFraming, char **pmsg, size_t * } finalize_it: - return iRet; + RETiRet; } @@ -1282,7 +1282,7 @@ int TCPSend(void *pData, char *msg, size_t len, TCPFRAMINGMODE rqdFraming, finalize_it: if(bMsgMustBeFreed) free(msg); - return iRet; + RETiRet; } @@ -132,7 +132,7 @@ rsRetVal tplToString(struct template *pTpl, msg_t *pMsg, uchar** ppSz) finalize_it: *ppSz = (iRet == RS_RET_OK) ? pVal : NULL; - return iRet; + RETiRet; } /* Helper to doSQLEscape. This is called if doSQLEscape @@ -143,6 +143,7 @@ static void* thrdStarter(void *arg) iRet = pThis->pUsrThrdMain(pThis); dbgprintf("thrdStarter: usrThrdMain 0x%lx returned with iRet %d, exiting now.\n", (unsigned long) pThis->thrdID, iRet); + ENDfunc pthread_exit(0); } @@ -166,7 +167,7 @@ rsRetVal thrdCreate(rsRetVal (*thrdMain)(thrdInfo_t*), rsRetVal(*afterRun)(thrdI CHKiRet(llAppend(&llThrds, NULL, pThis)); finalize_it: - return iRet; + RETiRet; } @@ -195,7 +196,7 @@ rsRetVal thrdInit(void) sigAct.sa_handler = sigusr2Dummy; sigaction(SIGUSR2, &sigAct, NULL); - return iRet; + RETiRet; } @@ -208,7 +209,7 @@ rsRetVal thrdExit(void) iRet = llDestroy(&llThrds); - return iRet; + RETiRet; } @@ -232,7 +233,7 @@ thrdSleep(thrdInfo_t *pThis, int iSeconds, int iuSeconds) select(1, NULL, NULL, NULL, &tvSelectTimeout); if(pThis->bShallStop) iRet = RS_RET_TERMINATE_NOW; - return iRet; + RETiRet; } @@ -73,18 +73,20 @@ wtiGetDbgHdr(wti_t *pThis) * bite in the long term... -- rgerhards, 2008-01-17 * TODO: may be performance optimized by atomic operations */ -static inline qWrkCmd_t +qWrkCmd_t wtiGetState(wti_t *pThis, int bLockMutex) { DEFVARS_mutexProtection; qWrkCmd_t tCmd; + BEGINfunc ISOBJ_TYPE_assert(pThis, wti); BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); tCmd = pThis->tCurrCmd; END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + ENDfunc return tCmd; } @@ -95,19 +97,20 @@ wtiGetState(wti_t *pThis, int bLockMutex) * in an active state. -- rgerhards, 2008-01-20 */ rsRetVal -wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly) +wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex) { DEFiRet; - DEFVARS_mutex_cancelsafeLock; + DEFVARS_mutexProtection; ISOBJ_TYPE_assert(pThis, wti); assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE); - mutex_cancelsafe_lock(&pThis->mut); + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); +RUNLOG_VAR("%d", bActiveOnly); /* all worker states must be followed sequentially, only termination can be set in any state */ if( (bActiveOnly && (pThis->tCurrCmd < eWRKTHRD_RUN_CREATED)) - || (pThis->tCurrCmd > tCmd && tCmd != eWRKTHRD_TERMINATING)) { + || (pThis->tCurrCmd > tCmd && !(tCmd == eWRKTHRD_TERMINATING || tCmd == eWRKTHRD_STOPPED))) { dbgprintf("%s: command %d can not be accepted in current %d processing state - ignored\n", wtiGetDbgHdr(pThis), tCmd, pThis->tCurrCmd); } else { @@ -137,7 +140,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly) pThis->tCurrCmd = tCmd; /* apply the new state */ } - mutex_cancelsafe_unlock(&pThis->mut); + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; } @@ -188,15 +191,17 @@ ENDobjConstruct(wti) rsRetVal wtiConstructFinalize(wti_t *pThis) { + DEFiRet; + ISOBJ_TYPE_assert(pThis, wti); dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); /* initialize our thread instance descriptor */ - pThis->pUsr = NULL; + pThis->pUsrp = NULL; pThis->tCurrCmd = eWRKTHRD_STOPPED; - return RS_RET_OK; + RETiRet; } @@ -211,7 +216,9 @@ wtiJoinThrd(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); dbgprintf("wti: waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd); pthread_join(pThis->thrdID, NULL); - wtiSetState(pThis, eWRKTHRD_STOPPED, 0); /* back to virgin... */ +RUNLOG; + wtiSetState(pThis, eWRKTHRD_STOPPED, 0, MUTEX_ALREADY_LOCKED); /* back to virgin... */ +RUNLOG; pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ dbgprintf("wti: worker %s has stopped\n", wtiGetDbgHdr(pThis)); @@ -261,6 +268,7 @@ wtiWorkerCancelCleanup(void *arg) wtp_t *pWtp; int iCancelStateSave; + BEGINfunc ISOBJ_TYPE_assert(pThis, wti); pWtp = pThis->pWtp; ISOBJ_TYPE_assert(pWtp, wtp); @@ -268,17 +276,17 @@ wtiWorkerCancelCleanup(void *arg) dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); /* call user supplied handler (that one e.g. requeues the element) */ - pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr); + pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pWtp->mut); - wtiSetState(pThis, eWRKTHRD_TERMINATING, 0); + wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); // TODO: sync access! pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - pthread_cond_signal(&pWtp->condThrdTrm); /* activate anyone waiting on thread shutdown */ d_pthread_mutex_unlock(&pWtp->mut); pthread_setcancelstate(iCancelStateSave, NULL); + ENDfunc } @@ -318,8 +326,11 @@ wtiWorker(wti_t *pThis) pWtp = pThis->pWtp; /* shortcut */ ISOBJ_TYPE_assert(pWtp, wtp); + dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); + pWtp->pfOnWorkerStartup(pWtp->pUsr); + /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd); @@ -349,21 +360,22 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), if(pWtp->toWrkShutdown == -1) { dbgprintf("worker never times out!\n"); // DEL /* never shut down any started worker */ - pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); + d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); } else { timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ - if(pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { + if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { dbgprintf("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis)); bInactivityTOOccured = 1; /* indicate we had a timeout */ } } - dbgprintf("%s: post condwait ->notEmpty\n", wtiGetDbgHdr(pThis)); // DEL + dbgprintf("%s: post condwait ->Busy or timeout\n", wtiGetDbgHdr(pThis)); // DEL END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr); continue; /* request next iteration */ } /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */ - pWtp->pfDoWork(pThis, iCancelStateSave); + dbgprintf("%s: calling consumer\n", wtiGetDbgHdr(pThis)); + pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave); /* TODO: move this above into one of the chck Term functions */ //if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0) @@ -371,6 +383,8 @@ dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), // " %d messages to process.\n", wtiGetDbgHdr(pThis), pThis->iQueueSize); } + pWtp->pfOnWorkerShutdown(pWtp->pUsr); + /* indicate termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); dbgprintf("%s: worker waiting for mutex\n", wtiGetDbgHdr(pThis)); @@ -391,11 +405,9 @@ dbgprintf("%s: setting termination state\n", wtiGetDbgHdr(pThis)); wtiSetState(pWrkrInst, eWRKTHRD_TERMINATING, 0); } #else - wtiSetState(pThis, eWRKTHRD_TERMINATING, 0); + wtiSetState(pThis, eWRKTHRD_TERMINATING, 0, MUTEX_ALREADY_LOCKED); #endif - // TODO: call, mutex: pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - pthread_cond_signal(&pWtp->condThrdTrm); /* activate anyone waiting on thread shutdown */ d_pthread_mutex_unlock(&pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); @@ -404,6 +416,7 @@ dbgprintf("%s: setting termination state\n", wtiGetDbgHdr(pThis)); /* some simple object access methods */ +DEFpropSetMeth(wti, pWtp, wtp_t*); /* set the debug header message * The passed-in string is duplicated. So if the caller does not need @@ -441,7 +454,7 @@ finalize_it: * rgerhards, 2008-01-09 */ BEGINObjClassInit(wti, 1) /* one is the object version (most important for persisting) */ -ENDObjClassInit(queue) +ENDObjClassInit(wti) /* * vi:set ai: @@ -32,7 +32,7 @@ typedef struct wti_s { BEGINobjInstance; pthread_t thrdID; /* thread ID */ qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ - obj_t *pUsr; /* current user object being processed (or NULL if none) */ + obj_t *pUsrp; /* pointer to an object meaningful for current user pointer (e.g. queue pUsr data elemt) */ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */ pthread_mutex_t mut; @@ -47,8 +47,13 @@ rsRetVal wtiConstruct(wti_t **ppThis); rsRetVal wtiConstructFinalize(wti_t *pThis); rsRetVal wtiDestruct(wti_t **ppThis); rsRetVal wtiWorker(wti_t *pThis); +rsRetVal wtiProcessThrdChanges(wti_t *pThis, int bLockMutex); +rsRetVal wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg); +rsRetVal wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly, int bLockMutex); +rsRetVal wtiJoinThrd(wti_t *pThis); +qWrkCmd_t wtiGetState(wti_t *pThis, int bLockMutex); PROTOTYPEObjClassInit(wti); PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*); -#define wtiGetID(pThis) ((unsigned long) pThis) +PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*); #endif /* #ifndef WTI_H_INCLUDED */ @@ -71,52 +71,59 @@ wtpGetDbgHdr(wtp_t *pThis) /* Not implemented dummy function for constructor */ -static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; } +static rsRetVal NotImplementedDummy() { return RS_RET_OK; } /* Standard-Constructor for the wtp object */ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */ - int i; - uchar pszBuf[64]; - size_t lenBuf; - wti_t *pWti; - pthread_mutex_init(&pThis->mut, NULL); pthread_cond_init(&pThis->condThrdTrm, NULL); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; - pThis->pfOnShutdownAdvise = NotImplementedDummy; pThis->pfOnIdle = NotImplementedDummy; pThis->pfOnWorkerCancel = NotImplementedDummy; + pThis->pfOnWorkerStartup = NotImplementedDummy; + pThis->pfOnWorkerShutdown = NotImplementedDummy; +ENDobjConstruct(wtp) + + +/* Construction finalizer + * rgerhards, 2008-01-17 + */ +rsRetVal +wtpConstructFinalize(wtp_t *pThis) +{ + DEFiRet; + int i; + uchar pszBuf[64]; + size_t lenBuf; + wti_t *pWti; - /* alloc and construct workers */ + ISOBJ_TYPE_assert(pThis, wtp); + + dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); + /* alloc and construct workers - this can only be done in finalizer as we previously do + * not know the max number of workers + */ +RUNLOG; if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { +RUNLOG_VAR("%d", i); +RUNLOG_VAR("%p", pThis->pWrkr[i]); CHKiRet(wtiConstruct(&pThis->pWrkr[i])); pWti = pThis->pWrkr[i]; lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s/w%d", wtpGetDbgHdr(pThis), i); CHKiRet(wtiSetDbgHdr(pWti, pszBuf, lenBuf)); + CHKiRet(wtiSetpWtp(pWti, pThis)); CHKiRet(wtiConstructFinalize(pWti)); } -finalize_it: -ENDobjConstruct(wtp) - -/* Construction finalizer - * rgerhards, 2008-01-17 - */ -rsRetVal -wtpConstructFinalize(wtp_t __attribute__((unused)) *pThis) -{ - ISOBJ_TYPE_assert(pThis, wtp); - - dbgprintf("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); - - return RS_RET_OK; +finalize_it: + RETiRet; } @@ -129,6 +136,8 @@ wtpDestruct(wtp_t **ppThis) int iCancelStateSave; int i; +dbgPrintAllDebugInfo(); +RUNLOG; assert(ppThis != NULL); pThis = *ppThis; ISOBJ_TYPE_assert(pThis, wtp); @@ -161,17 +170,31 @@ wtpDestruct(wtp_t **ppThis) } -/* wake up all worker threads. Param bWithDAWrk tells if the DA worker - * is to be awaken, too. It needs special handling because it waits on - * two different conditions depending on processing state. +/* wake up at least one worker thread. + * rgerhards, 2008-01-20 + */ +rsRetVal +wtpWakeupWrkr(wtp_t *pThis) +{ + DEFiRet; + + // TODO; mutex? + ISOBJ_TYPE_assert(pThis, wtp); +dbgprintf("wtpWakeupWrkr 1, cond %p\n", pThis->pcondBusy); + pthread_cond_signal(pThis->pcondBusy); +dbgprintf("wtpWakeupWrkr 2\n"); + RETiRet; +} +/* wake up all worker threads. * rgerhards, 2008-01-16 */ -static inline rsRetVal -wtpWakeupWrks(wtp_t *pThis) +rsRetVal +wtpWakeupAllWrkr(wtp_t *pThis) { DEFiRet; ISOBJ_TYPE_assert(pThis, wtp); + // TODO; mutex? pthread_cond_broadcast(pThis->pcondBusy); RETiRet; } @@ -189,8 +212,10 @@ wtpProcessThrdChanges(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); + RUNLOG; if(pThis->bThrdStateChanged == 0) FINALIZE; + RUNLOG; /* go through all threads */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { @@ -198,6 +223,7 @@ wtpProcessThrdChanges(wtp_t *pThis) } finalize_it: + RUNLOG; RETiRet; } @@ -218,39 +244,68 @@ wtpSetState(wtp_t *pThis, wtpState_t iNewState) } -#if 0 +/* check if the worker shall shutdown (1 = yes, 0 = no) + * TODO: check if we can use atomic operations to enhance performance + * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user" + * (e.g. the queue clas) + * rgerhards, 2008-01-21 + */ +rsRetVal +wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) +{ + DEFiRet; + DEFVARS_mutexProtection; + + BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); + if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) + || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex))) + iRet = RS_RET_TERMINATE_NOW; + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + + /* try customer handler if one was set and we do not yet have a definite result */ + if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) { + iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex); + } + + RETiRet; +} + + /* Send a shutdown command to all workers and see if they terminate. * A timeout may be specified. * rgerhards, 2008-01-14 */ -static rsRetVal -wtpWrkrShutdown(wtp_t *pThis) +rsRetVal +wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, long iTimeout) { DEFiRet; int bTimedOut; struct timespec t; int iCancelStateSave; - // TODO: implement - ISOBJ_TYPE_assert(pThis); - queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ - queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ - /* race: must make sure all are running! */ - queueTimeoutComp(&t, iTimeout);/* get timeout */ +dbgPrintAllDebugInfo(); +RUNLOG_VAR("%p", pThis); +RUNLOG_VAR("%ld", iTimeout); +RUNLOG_VAR("%d", tShutdownCmd); + ISOBJ_TYPE_assert(pThis, wtp); + + wtpSetState(pThis, tShutdownCmd); + wtpWakeupAllWrkr(pThis); + timeoutComp(&t, iTimeout);/* get timeout */ /* and wait for their termination */ -dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt); +dbgprintf("%s: waiting for mutex %p\n", wtpGetDbgHdr(pThis), &pThis->mut); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt); + d_pthread_mutex_lock(&pThis->mut); + pthread_cleanup_push(mutexCancelCleanup, &pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { - dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n", - queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd); + dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", + wtpGetDbgHdr(pThis), iTimeout, pThis->iCurNumWrkThrd); - if(pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutThrdMgmt, &t) != 0) { - dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", queueGetID(pThis)); + if(d_pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mut, &t) != 0) { + dbgprintf("%s: timeout waiting on worker thread termination\n", wtpGetDbgHdr(pThis)); bTimedOut = 1; /* we exit the loop on timeout */ } } @@ -259,122 +314,61 @@ dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt); if(bTimedOut) iRet = RS_RET_TIMED_OUT; +dbgprintf("wtpShutdownAll exit"); RETiRet; } -/* Unconditionally cancel all running worker threads. - * rgerhards, 2008-01-14 +/* indicate that a thread has terminated and awake anyone waiting on it + * rgerhards, 2008-01-23 */ -static rsRetVal -wtpWrkrCancel(wtp_t *pThis) +rsRetVal wtpSignalWrkrTermination(wtp_t *pThis) { DEFiRet; - int i; - // TODO: we need to implement peek(), without it (today!) we lose one message upon - // worker cancellation! -- rgerhards, 2008-01-14 - - ISOB_TYPE_assert(pThis); - /* process any pending thread requests so that we know who actually is still running */ - queueChkWrkThrdChanges(pThis); - - /* awake the workers one more time, just to be sure */ - queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ + //TODO: mutex or not mutex, that's the question ;)DEFVARS_mutexProtection; - /* first tell the workers our request */ - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) { - dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i); - pthread_cancel(pThis->pWrkThrds[i].thrdID); - } - } + ISOBJ_TYPE_assert(pThis, wtp); + //BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX); +dbgprintf("signaling thread termination, cond %p\n", &pThis->condThrdTrm); + pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ + //END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; } -/* Worker thread management function carried out when the main - * worker is about to terminate. +/* Unconditionally cancel all running worker threads. + * rgerhards, 2008-01-14 */ -static rsRetVal -wtpShutdownWorkers(wtp_t *pThis) +rsRetVal +wtpCancelAll(wtp_t *pThis) { DEFiRet; int i; + // TODO: we need to implement peek(), without it (today!) we lose one message upon + // worker cancellation! -- rgerhards, 2008-01-14 - assert(pThis != NULL); + ISOBJ_TYPE_assert(pThis, wtp); - dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis); + /* process any pending thread requests so that we know who actually is still running */ + wtpProcessThrdChanges(pThis); - ISOB_TYPE_assert(pThis); - /* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This - * is necessary so that all threads get sent the termination command. With a timeout of 0, however, - * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as - * good. - */ - iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN, pThis->toQShutdown); - if(iRet == RS_RET_TIMED_OUT) { - if(pThis->toQShutdown == 0) { - iRet = RS_RET_OK; - } else { - /* OK, we now need to try force the shutdown */ - dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", - queueGetID(pThis)); - iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); - } - } + /* awake the workers one more time, just to be sure */ + wtpWakeupAllWrkr(pThis); - if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */ - /* still didn't work out - so we now need to cancel the workers */ - dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis); - iRet = queueWrkThrdCancel(pThis); - } - - /* finally join the threads - * In case of a cancellation, this may actually take some time. This is also - * needed to clean up the thread descriptors, even with a regular termination. - * And, most importantly, this is needed if we have an indifitite termination - * time set (timeout == 0)! -- rgerhards, 2008-01-14 - */ + /* first tell the workers our request */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRD_STOPPED) { - queueJoinWrkThrd(pThis, i); + // TODO: mutex lock! + if(pThis->pWrkr[i]->tCurrCmd >= eWRKTHRD_TERMINATING) { + dbgprintf("%s: canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); + pthread_cancel(pThis->pWrkr[i]->thrdID); } } - dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", - queueGetID(pThis), pThis->iQueueSize); - RETiRet; } -#endif - - - -/* check if the worker shall shutdown - * TODO: check if we can use atomic operations to enhance performance - * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user" - * (e.g. the queue clas) - * rgerhards, 2008-01-21 - */ -rsRetVal -wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) -{ - DEFiRet; - DEFVARS_mutexProtection; - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) - || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex))) - iRet = RS_RET_TERMINATE_NOW; - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); - /* try customer handler if one was set and we do not yet have a definite result */ - if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) - iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex); - - RETiRet; -} /* Set the Inactivity Guard * rgerhards, 2008-01-21 @@ -385,9 +379,13 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex) DEFiRet; DEFVARS_mutexProtection; +RUNLOG; BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); +RUNLOG; pThis->bInactivityGuard = bNewState; +RUNLOG; END_MTX_PROTECTED_OPERATIONS(&pThis->mut); +RUNLOG; RETiRet; } @@ -404,6 +402,7 @@ wtpWrkrExecCancelCleanup(void *arg) ISOBJ_TYPE_assert(pThis, wtp); pThis->iCurNumWrkThrd--; + wtpSignalWrkrTermination(pThis); dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd); @@ -415,7 +414,7 @@ wtpWrkrExecCancelCleanup(void *arg) * rgerhards, 2008-01-21 */ static void * -wtpWorker(void *arg) +wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in wtp! */ { DEFiRet; DEFVARS_mutexProtection; @@ -442,7 +441,7 @@ wtpWorker(void *arg) * our init. That would be a bad race... -- rgerhards, 2008-01-16 */ //if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT) - wtiSetState(pWti, eWRKTHRD_RUNNING, MUTEX_ALREADY_LOCKED); /* we are running now! */ + wtiSetState(pWti, eWRKTHRD_RUNNING, 0, MUTEX_ALREADY_LOCKED); /* we are running now! */ do { END_MTX_PROTECTED_OPERATIONS(&pThis->mut); @@ -459,12 +458,14 @@ wtpWorker(void *arg) pthread_cleanup_pop(0); pThis->iCurNumWrkThrd--; + wtpSignalWrkrTermination(pThis); dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n", wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd); END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + ENDfunc pthread_exit(0); } @@ -492,7 +493,9 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { // TODO: sync! - if(pThis->pWrkr[i]->tCurrCmd == eWRKTHRD_STOPPED) { +RUNLOG; +dbgprintf("%s: i %d, wti_T* %p\n", wtpGetDbgHdr(pThis), i, pThis->pWrkr[i]); + if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) { break; } } @@ -502,11 +505,12 @@ dbgprintf("%s: after thrd search: i %d, max %d\n", wtpGetDbgHdr(pThis), i, pThis ABORT_FINALIZE(RS_RET_NO_MORE_THREADS); pWti = pThis->pWrkr[i]; - wtiSetState(pWti, eWRKTHRD_RUN_CREATED, LOCK_MUTEX); // TODO: thuink about mutex lock + wtiSetState(pWti, eWRKTHRD_RUN_CREATED, 0, LOCK_MUTEX); iState = pthread_create(&(pWti->thrdID), NULL, wtpWorker, (void*) pWti); dbgprintf("%s: started with state %d, num workers now %d\n", wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd); +RUNLOG; /* we try to give the starting worker a little boost. It won't help much as we still * hold the queue's mutex, but at least it has a chance to start on a single-CPU system. */ @@ -517,6 +521,7 @@ dbgprintf("%s: after thrd search: i %d, max %d\n", wtpGetDbgHdr(pThis), i, pThis finalize_it: END_MTX_PROTECTED_OPERATIONS(&pThis->mut); +RUNLOG; RETiRet; } @@ -524,7 +529,9 @@ finalize_it: /* set the number of worker threads that should be running. If less than currently running, * a new worker may be started. Please note that there is no guarantee the number of workers * said will be running after we exit this function. It is just a hint. If the number is - * higher than one, the "busy" condition is also signaled to awake a worker. + * higher than one, and no worker is started, the "busy" condition is signaled to awake a worker. + * So the caller can assume that there is at least one worker re-checking if there is "work to do" + * after this function call. * rgerhards, 2008-01-21 */ rsRetVal @@ -535,8 +542,10 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) int nMissing; /* number workers missing to run */ int i; + if(pThis == NULL) dbgPrintAllDebugInfo(); ISOBJ_TYPE_assert(pThis, wtp); +dbgprintf("%s: wtpAdviseMaxWorker with %d called, currNum %d, max %d\n", wtpGetDbgHdr(pThis), nMaxWrkr, pThis->iCurNumWrkThrd, pThis->iNumWorkerThreads); if(nMaxWrkr == 0) FINALIZE; @@ -548,15 +557,20 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) else if(nMissing < 0) nMissing = 0; - dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); - /* start the rqtd nbr of workers */ - for(i = 0 ; i < nMissing ; ++i) { - CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED)); + if(nMissing > 0) { + dbgprintf("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); + /* start the rqtd nbr of workers */ + for(i = 0 ; i < nMissing ; ++i) { + CHKiRet(wtpStartWrkr(pThis, MUTEX_ALREADY_LOCKED)); + } + } else { +dbgprintf("wtpAdviseMaxWorkers signals busy\n"); + wtpWakeupWrkr(pThis); } - - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + finalize_it: + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; } @@ -564,6 +578,17 @@ finalize_it: /* some simple object access methods */ DEFpropSetMeth(wtp, toWrkShutdown, long); DEFpropSetMeth(wtp, wtpState, wtpState_t); +DEFpropSetMeth(wtp, iNumWorkerThreads, int); +DEFpropSetMeth(wtp, pUsr, void*); +DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t); +DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t); +DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); +DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)); +DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)); +DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)); +DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*)); +DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)); +DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*)); /* set the debug header message @@ -572,11 +597,12 @@ DEFpropSetMeth(wtp, wtpState, wtpState_t); * rgerhards, 2008-01-09 */ rsRetVal -wtpSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg) +wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg) { DEFiRet; - ISOBJ_TYPE_assert(pThis, wti); +dbgprintf("objID: %d\n", pThis->pObjInfo->objID); + ISOBJ_TYPE_assert(pThis, wtp); assert(pszMsg != NULL); if(lenMsg < 1) @@ -67,12 +67,13 @@ typedef struct wtp_s { void *pUsr; /* pointer to user object */ pthread_mutex_t *pmutUsr; pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */ - rsRetVal (*pfChkStopWrkr)(void *, int); - rsRetVal (*pfIsIdle)(void *, int); - rsRetVal (*pfDoWork)(void *, int); - rsRetVal (*pfOnShutdownAdvise)(void *, int); - rsRetVal (*pfOnIdle)(void *, int); - rsRetVal (*pfOnWorkerCancel)(void *); + rsRetVal (*pfChkStopWrkr)(void *pUsr, int); + rsRetVal (*pfIsIdle)(void *pUsr, int); + rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int); + rsRetVal (*pfOnIdle)(void *pUsr, int); + rsRetVal (*pfOnWorkerCancel)(void *pUsr, void*pWti); + rsRetVal (*pfOnWorkerStartup)(void *pUsr); + rsRetVal (*pfOnWorkerShutdown)(void *pUsr); /* end user objects */ uchar *pszDbgHdr; /* header string for debug messages */ } wtp_t; @@ -89,9 +90,25 @@ rsRetVal wtpProcessThrdChanges(wtp_t *pThis); rsRetVal wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex); rsRetVal wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex); rsRetVal wtpSetState(wtp_t *pThis, wtpState_t iNewState); +rsRetVal wtpWakeupWrkr(wtp_t *pThis); +rsRetVal wtpWakeupAllWrkr(wtp_t *pThis); +rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, long iTimeout); +rsRetVal wtpCancelAll(wtp_t *pThis); +rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg); +rsRetVal wtpSignalWrkrTermination(wtp_t *pWtp); PROTOTYPEObjClassInit(wtp); +PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); +PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)); +PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)); +PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)); +PROTOTYPEpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*,void*)); +PROTOTYPEpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)); +PROTOTYPEpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*)); PROTOTYPEpropSetMeth(wtp, toWrkShutdown, long); PROTOTYPEpropSetMeth(wtp, wtpState, wtpState_t); -#define wtpGetID(pThis) ((unsigned long) pThis) +PROTOTYPEpropSetMeth(wtp, iMaxWorkerThreads, int); +PROTOTYPEpropSetMeth(wtp, pUsr, void*); +PROTOTYPEpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t); +PROTOTYPEpropSetMethPTR(wtp, pcondBusy, pthread_cond_t); #endif /* #ifndef WTP_H_INCLUDED */ |