summaryrefslogtreecommitdiffstats
path: root/runtime/wtp.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wtp.c')
-rw-r--r--runtime/wtp.c41
1 files changed, 26 insertions, 15 deletions
diff --git a/runtime/wtp.c b/runtime/wtp.c
index ba1f94b3..59553984 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -42,10 +42,10 @@
#include <atomic.h>
#include <sys/prctl.h>
-#ifdef OS_SOLARIS
-# include <sched.h>
-# define pthread_yield() sched_yield()
-#endif
+/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20
+//#ifdef OS_SOLARIS
+//# include <sched.h>
+//#endif
#include "rsyslog.h"
#include "stringbuf.h"
@@ -81,7 +81,7 @@ wtpGetDbgHdr(wtp_t *pThis)
/* Not implemented dummy function for constructor */
-static rsRetVal NotImplementedDummy() { return RS_RET_OK; }
+static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; }
/* Standard-Constructor for the wtp object
*/
BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */
@@ -90,12 +90,15 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pthread_cond_init(&pThis->condThrdTrm, NULL);
/* set all function pointers to "not implemented" dummy so that we can safely call them */
pThis->pfChkStopWrkr = NotImplementedDummy;
+ pThis->pfGetDeqBatchSize = NotImplementedDummy;
pThis->pfIsIdle = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
+ pThis->pfObjProcessed = NotImplementedDummy;
pThis->pfOnIdle = NotImplementedDummy;
pThis->pfOnWorkerCancel = NotImplementedDummy;
pThis->pfOnWorkerStartup = NotImplementedDummy;
pThis->pfOnWorkerShutdown = NotImplementedDummy;
+dbgprintf("XXX: wtpConstruct: %d\n", pThis->wtpState);
ENDobjConstruct(wtp)
@@ -119,7 +122,7 @@ wtpConstructFinalize(wtp_t *pThis)
*/
if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
-
+
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
CHKiRet(wtiConstruct(&pThis->pWrkr[i]));
pWti = pThis->pWrkr[i];
@@ -249,7 +252,6 @@ wtpSetState(wtp_t *pThis, wtpState_t iNewState)
/* check if the worker shall shutdown (1 = yes, 0 = no)
- * TODO: check if we can use atomic operations to enhance performance
* Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user"
* (e.g. the queue clas)
* rgerhards, 2008-01-21
@@ -263,16 +265,19 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
ISOBJ_TYPE_assert(pThis, wtp);
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
- || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex)))
- iRet = RS_RET_TERMINATE_NOW;
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
+ ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
+ } else if(pThis->wtpState == wtpState_SHUTDOWN) {
+ ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE);
+ }
/* try customer handler if one was set and we do not yet have a definite result */
- if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) {
+ if(pThis->pfChkStopWrkr != NULL) {
iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
}
+finalize_it:
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
}
@@ -488,11 +493,12 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
static rsRetVal
wtpStartWrkr(wtp_t *pThis, int bLockMutex)
{
- DEFiRet;
DEFVARS_mutexProtection;
wti_t *pWti;
int i;
int iState;
+ pthread_attr_t attr;
+ DEFiRet;
ISOBJ_TYPE_assert(pThis, wtp);
@@ -516,7 +522,10 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
pWti = pThis->pWrkr[i];
wtiSetState(pWti, eWRKTHRD_RUN_CREATED, 0, LOCK_MUTEX);
- iState = pthread_create(&(pWti->thrdID), NULL, wtpWorker, (void*) pWti);
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti);
+ pthread_attr_destroy(&attr); /* TODO: we could globally reuse such an attribute 2009-07-08 */
dbgprintf("%s: started with state %d, num workers now %d\n",
wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd);
@@ -586,8 +595,10 @@ DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t)
DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t)
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
-DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int))
+DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
+DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*))
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int))
+DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*))
DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*))
DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*))