From cdecd7e524a5114ccff4f2909b32738bdb33c6ea Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 7 Oct 2008 11:46:46 +0200 Subject: slightly improved lock contention situation by moving out of the critical section what could so with acceptable consequences --- runtime/queue.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 76c2f10f..f5f770b3 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2101,6 +2101,15 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) ISOBJ_TYPE_assert(pThis, queue); + /* first check if we need to discard this message (which will cause CHKiRet() to exit) + * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize + * and bRunsDA parameters may not reflect the correct settings here, but they are + * "good enough" in the sense that they can be used to drive the decision. Valgrind's + * threading tools may point this access to be an error, but this is done + * intentional. I do not see this causes problems to us. + */ + CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); + /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE * during its execution. If that is not done, race conditions occur if the @@ -2112,9 +2121,6 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) d_pthread_mutex_lock(pThis->mut); } - /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ - CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); - /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) CHKiRet(queueChkStrtDA(pThis)); -- cgit From ace4f2f75202aec39449dac11b9eb1deca7428d7 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 8 Oct 2008 18:55:11 +0200 Subject: reordered imudp processing. Message parsing is now done as part of main message queue worker processing (was part of the input thread) This should also improve performance, as potentially more work is done in parallel. --- runtime/queue.c | 2 -- 1 file changed, 2 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index f5f770b3..25c0bd5f 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1513,8 +1513,6 @@ queueRateLimiter(queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); - dbgoprint((obj_t*) pThis, "entering rate limiter\n"); - iDelay = 0; if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */ /* time calls are expensive, so only do them when needed */ -- cgit From af3c944563b8651c14b2b9087ea76f7eeacfc065 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 9 Oct 2008 11:21:24 +0200 Subject: added experimental pthread_yield() which so far seems to increase performance. There is also reason for it to do so, see http://kb.monitorware.com/post14216.html#p14216 --- runtime/queue.c | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 25c0bd5f..3ef6d5a0 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2185,6 +2185,11 @@ finalize_it: /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); + /* the following pthread_yield is experimental, but brought us performance + * benefit. For details, please see http://kb.monitorware.com/post14216.html#p14216 + * rgerhards, 2008-10-09 + */ + pthread_yield(); } RETiRet; -- cgit From 1229143ca3d194bb0daaa5252809d59ee0c3a0bf Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 9 Oct 2008 12:56:28 +0200 Subject: minor: reorder to slightly reduce size of critical section --- runtime/queue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 3ef6d5a0..2d1bf7e3 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2181,10 +2181,10 @@ finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { /* make sure at least one worker is running. */ queueAdviseMaxWorkers(pThis); - dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n"); /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); + dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n"); /* the following pthread_yield is experimental, but brought us performance * benefit. For details, please see http://kb.monitorware.com/post14216.html#p14216 * rgerhards, 2008-10-09 -- cgit From 6c6e9a0f3f7d454ba9553a750b195d7f99c7299a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 9 Oct 2008 13:45:56 +0200 Subject: moved bParseHostname and bIsParsed to msgFlags This enables us to use more efficient calling conventions and also helps us keep the on-disk structure of a msg object more consistent in future releases. --- runtime/queue.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index b0043ef5..42b8137d 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -49,6 +49,7 @@ #include "obj.h" #include "wtp.h" #include "wti.h" +#include "atomic.h" /* static data */ DEFobjStaticHelpers @@ -996,7 +997,7 @@ queueAdd(queue_t *pThis, void *pUsr) CHKiRet(pThis->qAdd(pThis, pUsr)); if(pThis->qType != QUEUETYPE_DIRECT) { - ++pThis->iQueueSize; + ATOMIC_INC(pThis->iQueueSize); dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize); } @@ -1025,7 +1026,7 @@ queueDel(queue_t *pThis, void *pUsr) iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr); } else { iRet = pThis->qDel(pThis, pUsr); - --pThis->iQueueSize; + ATOMIC_DEC(pThis->iQueueSize); } dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n", -- cgit From a27e249e445deecb1d9ef57fbbb203d71bf061dd Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 21 Oct 2008 10:55:33 +0200 Subject: bugfix: (potentially big) memory leak on HUP This occured if queues could not be drained before timeout. Thanks to David Lang for pointing this out. --- runtime/queue.c | 45 ++++++++++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 13 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 42b8137d..7fa2df91 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -88,6 +88,30 @@ ENDfunc return pThis->iQueueSize + pThis->iUngottenObjs; } + +/* This function drains the queue in cases where this needs to be done. The most probable + * reason is a HUP which needs to discard data (because the queue is configured to be lossy). + * During a shutdown, this is typically not needed, as the OS frees up ressources and does + * this much quicker than when we clean up ourselvs. -- rgerhards, 2008-10-21 + * This function returns void, as it makes no sense to communicate an error back, even if + * it happens. + */ +static inline void queueDrain(queue_t *pThis) +{ + void *pUsr; + + ASSERT(pThis != NULL); + + /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ + while(pThis->iQueueSize-- > 0) { + pThis->qDel(pThis, &pUsr); + if(pUsr != NULL) { + objDestruct(pUsr); + } + } +} + + /* --------------- code for disk-assisted (DA) queue modes -------------------- */ @@ -196,14 +220,6 @@ queueTurnOffDAMode(queue_t *pThis) } } - /* TODO: we have a *really biiiiig* memory leak here: if the queue could not be persisted, all of - * its data elements are still in memory. That doesn't really matter if we are terminated, but on - * HUP this memory leaks. We MUST add a loop of destructor calls here. However, this takes time - * (possibly a lot), so it is probably best to have a config variable for that. - * Something for 3.11.1! - * rgerhards, 2008-01-30 - */ - RETiRet; } @@ -461,12 +477,15 @@ static rsRetVal qDestructFixedArray(queue_t *pThis) ASSERT(pThis != NULL); + queueDrain(pThis); /* discard any remaining queue entries */ + if(pThis->tVars.farray.pBuf != NULL) free(pThis->tVars.farray.pBuf); RETiRet; } + static rsRetVal qAddFixedArray(queue_t *pThis, void* in) { DEFiRet; @@ -570,11 +589,11 @@ static rsRetVal qConstructLinkedList(queue_t *pThis) static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis) { DEFiRet; - - /* with the linked list type, there is nothing to do here. The - * reason is that the Destructor is only called after all entries - * have bene taken off the queue. In this case, there is nothing - * dynamic left with the linked list. + + queueDrain(pThis); /* discard any remaining queue entries */ + + /* with the linked list type, there is nothing left to do here. The + * reason is that there are no dynamic elements for the list itself. */ RETiRet; -- cgit From cf38fc81759b01af5125b1a05e0d6fe8e2e1bc21 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 22 Oct 2008 13:54:40 +0200 Subject: added a setting "$OptimizeForUniprocessor" ...to enable users to turn off pthread_yield calls which are counter-productive on multiprocessor machines (but have been shown to be useful on uniprocessors) --- runtime/queue.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 7fa2df91..a3e29a96 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1285,6 +1285,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, /* we have an object, so let's fill the properties */ objConstructSetObjInfo(pThis); + pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc(); if((pThis->pszSpoolDir = (uchar*) strdup((char*)glbl.GetWorkDir())) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -2208,8 +2209,10 @@ finalize_it: /* the following pthread_yield is experimental, but brought us performance * benefit. For details, please see http://kb.monitorware.com/post14216.html#p14216 * rgerhards, 2008-10-09 + * but this is only true for uniprocessors, so we guard it with an optimize flag -- rgerhards, 2008-10-22 */ - pthread_yield(); + if(pThis->bOptimizeUniProc) + pthread_yield(); } RETiRet; -- cgit From 2e388db9ac91eae35ac836b329c8bcadd319a409 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 5 Mar 2009 11:10:43 +0100 Subject: integrated various patches for solaris Unfortunatley, I do not have the full list of contributors available. The patch set was compiled by Ben Taylor, and I made some further changes to adopt it to the news rsyslog branch. Others provided much of the base work, but I can not find the names of the original authors. If you happen to be one of them, please let me know so that I can give proper credits. --- runtime/queue.c | 420 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 210 insertions(+), 210 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index a3e29a96..7d78460c 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -56,14 +56,14 @@ DEFobjStaticHelpers DEFobjCurrIf(glbl) /* forward-definitions */ -rsRetVal queueChkPersist(queue_t *pThis); -static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex); -static rsRetVal queueRateLimiter(queue_t *pThis); -static int queueChkStopWrkrDA(queue_t *pThis); -static int queueIsIdleDA(queue_t *pThis); -static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave); -static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2); -static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex); +rsRetVal qqueueChkPersist(qqueue_t *pThis); +static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); +static rsRetVal qqueueRateLimiter(qqueue_t *pThis); +static int qqueueChkStopWrkrDA(qqueue_t *pThis); +static int qqueueIsIdleDA(qqueue_t *pThis); +static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); +static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2); +static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -77,7 +77,7 @@ static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex); * rgerhards, 2008-01-29 */ static inline int -queueGetOverallQueueSize(queue_t *pThis) +qqueueGetOverallQueueSize(qqueue_t *pThis) { #if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */ BEGINfunc @@ -96,7 +96,7 @@ ENDfunc * This function returns void, as it makes no sense to communicate an error back, even if * it happens. */ -static inline void queueDrain(queue_t *pThis) +static inline void queueDrain(qqueue_t *pThis) { void *pUsr; @@ -119,26 +119,26 @@ static inline void queueDrain(queue_t *pThis) * this point in time. The mutex must be locked when * ths function is called. -- rgerhards, 2008-01-25 */ -static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) +static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) { DEFiRet; int iMaxWorkers; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(!pThis->bEnqOnly) { if(pThis->bRunsDA) { /* if we have not yet reached the high water mark, there is no need to start a * worker. -- rgerhards, 2008-01-26 */ - if(queueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { + if(qqueueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } } else { if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { iMaxWorkers = 1; } else { - iMaxWorkers = queueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; + iMaxWorkers = qqueueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ } @@ -153,11 +153,11 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) * rgerhards, 2008-02-27 */ static rsRetVal -queueWaitDAModeInitialized(queue_t *pThis) +qqueueWaitDAModeInitialized(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->bRunsDA); while(pThis->bRunsDA != 2) { @@ -179,17 +179,17 @@ queueWaitDAModeInitialized(queue_t *pThis) * rgerhards, 2008-01-15 */ static rsRetVal -queueTurnOffDAMode(queue_t *pThis) +qqueueTurnOffDAMode(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->bRunsDA); /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need * to wait for its startup... -- rgerhards, 2008-01-25 */ - queueWaitDAModeInitialized(pThis); + qqueueWaitDAModeInitialized(pThis); /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to @@ -208,15 +208,15 @@ queueTurnOffDAMode(queue_t *pThis) /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, * this will be quick. */ - queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ + qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n", iRet); /* now we need to check if the regular queue has some messages. This may be the case * when it is waiting that the high water mark is reached again. If so, we need to start up * a regular worker. -- rgerhards, 2008-01-26 */ - if(queueGetOverallQueueSize(pThis) > 0) { - queueAdviseMaxWorkers(pThis); + if(qqueueGetOverallQueueSize(pThis) > 0) { + qqueueAdviseMaxWorkers(pThis); } } @@ -232,11 +232,11 @@ queueTurnOffDAMode(queue_t *pThis) * rgerhards, 2008-01-14 */ static rsRetVal -queueChkIsDA(queue_t *pThis) +qqueueChkIsDA(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->pszFilePrefix != NULL) { pThis->bIsDA = 1; dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n"); @@ -260,18 +260,18 @@ queueChkIsDA(queue_t *pThis) * rgerhards, 2008-01-15 */ static rsRetVal -queueStartDA(queue_t *pThis) +qqueueStartDA(qqueue_t *pThis) { DEFiRet; uchar pszDAQName[128]; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */ FINALIZE; /* ... then we are already done! */ /* create message queue */ - CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer)); + CHKiRet(qqueueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer)); /* give it a name */ snprintf((char*) pszDAQName, sizeof(pszDAQName)/sizeof(uchar), "%s[DA]", obj.GetName((obj_t*) pThis)); @@ -282,30 +282,30 @@ queueStartDA(queue_t *pThis) */ pThis->pqDA->pqParent = pThis; - CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr)); - CHKiRet(queueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax)); - CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); - CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); - CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); - CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); - CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); - CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); - CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); - CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); - CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); - CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); - CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); + CHKiRet(qqueueSetpUsr(pThis->pqDA, pThis->pUsr)); + CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax)); + CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); + CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); + CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); + CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); + CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq)); + CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); + CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); + CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); + CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0)); + CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0)); if(pThis->toQShutdown == 0) { - CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ + CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ } else { /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND * have an obviously large backlog, we can't finish it in any case. So there is no point * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15 */ - CHKiRet(queueSettoQShutdown(pThis->pqDA, 1)); + CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1)); } - iRet = queueStart(pThis->pqDA); + iRet = qqueueStart(pThis->pqDA); /* file not found is expected, that means it is no previous QIF available */ if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) FINALIZE; /* something is wrong */ @@ -323,12 +323,12 @@ queueStartDA(queue_t *pThis) pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n", - queueGetID(pThis->pqDA)); + qqueueGetID(pThis->pqDA)); finalize_it: if(iRet != RS_RET_OK) { if(pThis->pqDA != NULL) { - queueDestruct(&pThis->pqDA); + qqueueDestruct(&pThis->pqDA); } dbgoprint((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet); pThis->bIsDA = 0; @@ -345,7 +345,7 @@ finalize_it: * rgerhards, 2008-01-16 */ static inline rsRetVal -queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) +qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; @@ -363,12 +363,12 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) queueConsumerCancelCleanup)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueStartDA)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueTurnOffDAMode)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); @@ -401,14 +401,14 @@ finalize_it: * rgerhards, 2008-01-14 */ static inline rsRetVal -queueChkStrtDA(queue_t *pThis) +qqueueChkStrtDA(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); /* if we do not hit the high water mark, we have nothing to do */ - if(queueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk) + if(qqueueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk) ABORT_FINALIZE(RS_RET_OK); if(pThis->bRunsDA) { @@ -422,15 +422,15 @@ queueChkStrtDA(queue_t *pThis) * we need at least one). */ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n", - queueGetOverallQueueSize(pThis)); - queueAdviseMaxWorkers(pThis); + qqueueGetOverallQueueSize(pThis)); + qqueueAdviseMaxWorkers(pThis); } else { /* this is the case when we are currently not running in DA mode. So it is time * to turn it back on. */ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n", - queueGetOverallQueueSize(pThis)); - queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ + qqueueGetOverallQueueSize(pThis)); + qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ } finalize_it: @@ -448,7 +448,7 @@ finalize_it: */ /* -------------------- fixed array -------------------- */ -static rsRetVal qConstructFixedArray(queue_t *pThis) +static rsRetVal qConstructFixedArray(qqueue_t *pThis) { DEFiRet; @@ -464,14 +464,14 @@ static rsRetVal qConstructFixedArray(queue_t *pThis) pThis->tVars.farray.head = 0; pThis->tVars.farray.tail = 0; - queueChkIsDA(pThis); + qqueueChkIsDA(pThis); finalize_it: RETiRet; } -static rsRetVal qDestructFixedArray(queue_t *pThis) +static rsRetVal qDestructFixedArray(qqueue_t *pThis) { DEFiRet; @@ -486,7 +486,7 @@ static rsRetVal qDestructFixedArray(queue_t *pThis) } -static rsRetVal qAddFixedArray(queue_t *pThis, void* in) +static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) { DEFiRet; @@ -499,7 +499,7 @@ static rsRetVal qAddFixedArray(queue_t *pThis, void* in) RETiRet; } -static rsRetVal qDelFixedArray(queue_t *pThis, void **out) +static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out) { DEFiRet; @@ -518,7 +518,7 @@ static rsRetVal qDelFixedArray(queue_t *pThis, void **out) /* first some generic functions which are also used for the unget linked list */ -static inline rsRetVal queueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr) +static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr) { DEFiRet; qLinkedList_t *pEntry; @@ -544,7 +544,7 @@ finalize_it: RETiRet; } -static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr) +static inline rsRetVal qqueueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr) { DEFiRet; qLinkedList_t *pEntry; @@ -571,7 +571,7 @@ static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t /* end generic functions which are also used for the unget linked list */ -static rsRetVal qConstructLinkedList(queue_t *pThis) +static rsRetVal qConstructLinkedList(qqueue_t *pThis) { DEFiRet; @@ -580,13 +580,13 @@ static rsRetVal qConstructLinkedList(queue_t *pThis) pThis->tVars.linklist.pRoot = 0; pThis->tVars.linklist.pLast = 0; - queueChkIsDA(pThis); + qqueueChkIsDA(pThis); RETiRet; } -static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis) +static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) { DEFiRet; @@ -599,11 +599,11 @@ static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis) RETiRet; } -static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr) +static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) { DEFiRet; - iRet = queueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr); + iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr); #if 0 qLinkedList_t *pEntry; @@ -627,10 +627,10 @@ finalize_it: RETiRet; } -static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr) +static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr) { DEFiRet; - iRet = queueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr); + iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr); #if 0 qLinkedList_t *pEntry; @@ -657,11 +657,11 @@ static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr) static rsRetVal -queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t __attribute__((unused)) *pThis) +qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pThis) { DEFiRet; ISOBJ_TYPE_assert(pStrm, strm); - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); CHKiRet(strmSetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); finalize_it: RETiRet; @@ -673,14 +673,14 @@ finalize_it: * rgerhards, 2008-01-15 */ static rsRetVal -queueHaveQIF(queue_t *pThis) +qqueueHaveQIF(qqueue_t *pThis) { DEFiRet; uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; struct stat stat_buf; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->pszFilePrefix == NULL) ABORT_FINALIZE(RS_RET_NO_FILEPREFIX); @@ -710,7 +710,7 @@ finalize_it: * rgerhards, 2008-01-11 */ static rsRetVal -queueTryLoadPersistedInfo(queue_t *pThis) +qqueueTryLoadPersistedInfo(qqueue_t *pThis) { DEFiRet; strm_t *psQIF = NULL; @@ -720,7 +720,7 @@ queueTryLoadPersistedInfo(queue_t *pThis) int iUngottenObjs; obj_t *pUsr; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", @@ -755,15 +755,15 @@ queueTryLoadPersistedInfo(queue_t *pThis) while(iUngottenObjs > 0) { /* fill the queue from disk */ CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL)); - queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); + qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); --iUngottenObjs; /* one less */ } /* and now the stream objects (some order as when persisted!) */ CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF, - (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis)); + (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF, - (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis)); + (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite)); CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead)); @@ -793,7 +793,7 @@ finalize_it: * allowed file size at this point - that should be a config setting... * rgerhards, 2008-01-10 */ -static rsRetVal qConstructDisk(queue_t *pThis) +static rsRetVal qConstructDisk(qqueue_t *pThis) { DEFiRet; int bRestarted = 0; @@ -801,7 +801,7 @@ static rsRetVal qConstructDisk(queue_t *pThis) ASSERT(pThis != NULL); /* and now check if there is some persistent information that needs to be read in */ - iRet = queueTryLoadPersistedInfo(pThis); + iRet = qqueueTryLoadPersistedInfo(pThis); if(iRet == RS_RET_OK) bRestarted = 1; else if(iRet != RS_RET_FILE_NOT_FOUND) @@ -843,7 +843,7 @@ finalize_it: } -static rsRetVal qDestructDisk(queue_t *pThis) +static rsRetVal qDestructDisk(qqueue_t *pThis) { DEFiRet; @@ -855,7 +855,7 @@ static rsRetVal qDestructDisk(queue_t *pThis) RETiRet; } -static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) +static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) { DEFiRet; number_t nWriteCount; @@ -882,7 +882,7 @@ finalize_it: RETiRet; } -static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) +static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr) { DEFiRet; @@ -913,18 +913,18 @@ finalize_it: } /* -------------------- direct (no queueing) -------------------- */ -static rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis) +static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis) { return RS_RET_OK; } -static rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis) +static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) { return RS_RET_OK; } -static rsRetVal qAddDirect(queue_t *pThis, void* pUsr) +static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) { DEFiRet; @@ -941,7 +941,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr) RETiRet; } -static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) +static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) { return RS_RET_OK; } @@ -956,12 +956,12 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__ * rgerhards, 2008-01-20 */ static rsRetVal -queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex) +qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15 The second time I noticed it the queue was in destruction with NO worker threads running. The pUsr ptr was totally off and provided no clue what it may be pointing @@ -970,7 +970,7 @@ queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex) dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr)); BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex); - iRet = queueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr); + iRet = qqueueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr); ++pThis->iUngottenObjs; /* indicate one more */ END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -986,14 +986,14 @@ queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex) * rgerhards, 2008-01-29 */ static rsRetVal -queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr) +qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(ppUsr != NULL); - iRet = queueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr); + iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr); --pThis->iUngottenObjs; /* indicate one less */ dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr)); @@ -1007,7 +1007,7 @@ queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr) * things truely different. -- rgerhards, 2008-02-12 */ static rsRetVal -queueAdd(queue_t *pThis, void *pUsr) +qqueueAdd(qqueue_t *pThis, void *pUsr) { DEFiRet; @@ -1030,7 +1030,7 @@ finalize_it: * ungotten list and, if so, dequeue it first. */ static rsRetVal -queueDel(queue_t *pThis, void *pUsr) +qqueueDel(qqueue_t *pThis, void *pUsr) { DEFiRet; @@ -1042,7 +1042,7 @@ queueDel(queue_t *pThis, void *pUsr) * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ if(pThis->iUngottenObjs > 0) { - iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr); + iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr); } else { iRet = pThis->qDel(pThis, pUsr); ATOMIC_DEC(pThis->iQueueSize); @@ -1066,14 +1066,14 @@ queueDel(queue_t *pThis, void *pUsr) * complex) if each would have its own shutdown. The function does not self check * this condition - the caller must make sure it is not called with a parent. */ -static rsRetVal queueShutdownWorkers(queue_t *pThis) +static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) { DEFiRet; DEFVARS_mutexProtection; struct timespec tTimeout; rsRetVal iRetLocal; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); @@ -1087,7 +1087,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* first try to shutdown the queue within the regular shutdown period */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(queueGetOverallQueueSize(pThis) > 0) { + if(qqueueGetOverallQueueSize(pThis) > 0) { if(pThis->bRunsDA) { /* We may have waited on the low water mark. As it may have changed, we * see if we reactivate the worker. @@ -1125,7 +1125,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) if(pThis->bRunsDA) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", - queueGetID(pThis->pqDA)); + qqueueGetID(pThis->pqDA)); /* we use the same absolute timeout as above, so we do not use more than the configured * timeout interval! */ @@ -1154,19 +1154,19 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ if(pThis->bRunsDA) - queueWaitDAModeInitialized(pThis); + qqueueWaitDAModeInitialized(pThis); BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ - if(pThis->bIsDA && queueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { + if(pThis->bIsDA && qqueueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { /* switch to enqueue-only mode so that no more actions happen */ if(pThis->bRunsDA == 0) { - queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ + qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ } else { /* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1) * but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27 */ - queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */ + qqueueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */ } END_MTX_PROTECTED_OPERATIONS(pThis->mut); /* make sure we do not timeout before we are done */ @@ -1188,7 +1188,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) * they will automatically terminate as there no longer is any message left to process. */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(queueGetOverallQueueSize(pThis) > 0) { + if(qqueueGetOverallQueueSize(pThis) > 0) { timeoutComp(&tTimeout, pThis->toActShutdown); if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -1257,7 +1257,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) * Well, more precisely, they *are in termination*. Some cancel cleanup handlers * may still be running. */ - dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", queueGetOverallQueueSize(pThis)); + dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", qqueueGetOverallQueueSize(pThis)); RETiRet; } @@ -1269,17 +1269,17 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) * is done by queueStart(). The reason is that we want to give the caller a chance * to modify some parameters before the queue is actually started. */ -rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, +rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)) { DEFiRet; - queue_t *pThis; + qqueue_t *pThis; ASSERT(ppThis != NULL); ASSERT(pConsumer != NULL); ASSERT(iWorkerThreads >= 0); - if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) { + if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } @@ -1316,7 +1316,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, pThis->qConstruct = qConstructLinkedList; pThis->qDestruct = qDestructLinkedList; pThis->qAdd = qAddLinkedList; - pThis->qDel = (rsRetVal (*)(queue_t*,void**)) qDelLinkedList; + pThis->qDel = (rsRetVal (*)(qqueue_t*,void**)) qDelLinkedList; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; @@ -1343,25 +1343,25 @@ finalize_it: /* cancellation cleanup handler for queueWorker () * Updates admin structure and frees ressources. * Params: - * arg1 - user pointer (in this case a queue_t) + * arg1 - user pointer (in this case a qqueue_t) * arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!]) * Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists! * rgerhards, 2008-01-16 */ static rsRetVal -queueConsumerCancelCleanup(void *arg1, void *arg2) +qqueueConsumerCancelCleanup(void *arg1, void *arg2) { DEFiRet; - queue_t *pThis = (queue_t*) arg1; + qqueue_t *pThis = (qqueue_t*) arg1; obj_t *pUsr = (obj_t*) arg2; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pUsr != NULL) { /* make sure the data element is not lost */ dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n"); - CHKiRet(queueUngetObj(pThis, pUsr, LOCK_MUTEX)); + CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX)); } finalize_it: @@ -1383,13 +1383,13 @@ finalize_it: * the return state! * rgerhards, 2008-01-24 */ -static int queueChkDiscardMsg(queue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr) +static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr) { DEFiRet; rsRetVal iRetLocal; int iSeverity; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_assert(pUsr); if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) { @@ -1414,7 +1414,7 @@ finalize_it: * rgerhards, 2008-10-21 */ static rsRetVal -queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; void *pUsr; @@ -1422,9 +1422,9 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) int bRunsDA; /* cache for early mutex release */ /* dequeue element (still protected from mutex) */ - iRet = queueDel(pThis, &pUsr); - queueChkPersist(pThis); - iQueueSize = queueGetOverallQueueSize(pThis); /* cache this for after mutex release */ + iRet = qqueueDel(pThis, &pUsr); + qqueueChkPersist(pThis); + iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */ bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ /* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY @@ -1475,7 +1475,7 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) * provide real-time creation of spool files. * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong. */ - CHKiRet(queueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr)); finalize_it: if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { @@ -1524,7 +1524,7 @@ finalize_it: * but you get the idea from the code above. */ static rsRetVal -queueRateLimiter(queue_t *pThis) +qqueueRateLimiter(qqueue_t *pThis) { DEFiRet; int iDelay; @@ -1532,7 +1532,7 @@ queueRateLimiter(queue_t *pThis) time_t tCurr; struct tm m; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); iDelay = 0; if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */ @@ -1587,14 +1587,14 @@ queueRateLimiter(queue_t *pThis) * rgerhards, 2008-01-21 */ static rsRetVal -queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave)); CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp)); /* we now need to check if we should deliberately delay processing a bit @@ -1621,15 +1621,15 @@ finalize_it: * rgerhards, 2008-01-14 */ static rsRetVal -queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(queueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp)); + CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp)); finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); @@ -1645,7 +1645,7 @@ finalize_it: * the DA queue */ static int -queueChkStopWrkrDA(queue_t *pThis) +qqueueChkStopWrkrDA(qqueue_t *pThis) { /* if our queue is in destruction, we drain to the DA queue and so we shall not terminate * until we have done so. @@ -1664,7 +1664,7 @@ queueChkStopWrkrDA(queue_t *pThis) && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { /* this queue can never grow, so we can give up... */ bStopWrkr = 1; - } else if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { + } else if(qqueueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { bStopWrkr = 1; } else { bStopWrkr = 0; @@ -1687,9 +1687,9 @@ queueChkStopWrkrDA(queue_t *pThis) * the DA queue */ static int -queueChkStopWrkrReg(queue_t *pThis) +qqueueChkStopWrkrReg(qqueue_t *pThis) { - return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && queueGetOverallQueueSize(pThis) == 0); + return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0); } @@ -1697,26 +1697,26 @@ queueChkStopWrkrReg(queue_t *pThis) * are not stable! DA queue version */ static int -queueIsIdleDA(queue_t *pThis) +qqueueIsIdleDA(qqueue_t *pThis) { /* remember: iQueueSize is the DA queue size, not the main queue! */ /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */ - return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); } /* must only be called when the queue mutex is locked, else results * are not stable! Regular queue version */ static int -queueIsIdleReg(queue_t *pThis) +qqueueIsIdleReg(qqueue_t *pThis) { #if 0 /* enable for performance testing */ int ret; - ret = queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk); + ret = qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk); if(ret) fprintf(stderr, "queue is idle\n"); return ret; #else /* regular code! */ - return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); #endif } @@ -1735,11 +1735,11 @@ queueIsIdleReg(queue_t *pThis) * I am telling this, because I, too, always get confused by those... */ static rsRetVal -queueRegOnWrkrShutdown(queue_t *pThis) +qqueueRegOnWrkrShutdown(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->pqParent != NULL) { pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ @@ -1756,11 +1756,11 @@ queueRegOnWrkrShutdown(queue_t *pThis) * hook to indicate in the parent queue (if we are a child) that we are not done yet. */ static rsRetVal -queueRegOnWrkrStartup(queue_t *pThis) +qqueueRegOnWrkrStartup(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->pqParent != NULL) { pThis->pqParent->bChildIsDone = 0; @@ -1773,7 +1773,7 @@ queueRegOnWrkrStartup(queue_t *pThis) /* start up the queue - it must have been constructed and parameters defined * before. */ -rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ +rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; rsRetVal iRetLocal; @@ -1811,7 +1811,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, - queueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1); + qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ @@ -1822,13 +1822,13 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); - CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))queueConsumerCancelCleanup)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrStartup)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrShutdown)); + CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); @@ -1841,10 +1841,10 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ /* If we are disk-assisted, we need to check if there is a QIF file * which we need to load. -- rgerhards, 2008-01-15 */ - iRetLocal = queueHaveQIF(pThis); + iRetLocal = qqueueHaveQIF(pThis); if(iRetLocal == RS_RET_OK) { dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n"); - queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ + qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ bInitialized = 1; /* we are done */ } else { /* TODO: use logerror? -- rgerhards, 2008-01-16 */ @@ -1861,7 +1861,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ /* if the queue already contains data, we need to start the correct number of worker threads. This can be * the case when a disk queue has been loaded. If we did not start it here, it would never start. */ - queueAdviseMaxWorkers(pThis); + qqueueAdviseMaxWorkers(pThis); pThis->bQueueStarted = 1; finalize_it: @@ -1876,7 +1876,7 @@ finalize_it: * and 0 otherwise. * rgerhards, 2008-01-10 */ -static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) +static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) { DEFiRet; strm_t *psQIF = NULL; /* Queue Info File */ @@ -1887,7 +1887,7 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) ASSERT(pThis != NULL); if(pThis->qType != QUEUETYPE_DISK) { - if(queueGetOverallQueueSize(pThis) > 0) { + if(qqueueGetOverallQueueSize(pThis) > 0) { /* This error code is OK, but we will probably not implement this any time * The reason is that persistence happens via DA queues. But I would like to * leave the code as is, as we so have a hook in case we need one. @@ -1898,13 +1898,13 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) FINALIZE; /* if the queue is empty, we are happy and done... */ } - dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", queueGetOverallQueueSize(pThis)); + dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", qqueueGetOverallQueueSize(pThis)); /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - if((bIsCheckpoint != QUEUE_CHECKPOINT) && (queueGetOverallQueueSize(pThis) == 0)) { + if((bIsCheckpoint != QUEUE_CHECKPOINT) && (qqueueGetOverallQueueSize(pThis) == 0)) { if(pThis->bNeedDelQIF) { unlink((char*)pszQIFNam); pThis->bNeedDelQIF = 0; @@ -1938,7 +1938,7 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) * to the regular files. -- rgerhards, 2008-01-29 */ while(pThis->iUngottenObjs > 0) { - CHKiRet(queueGetUngottenObj(pThis, &pUsr)); + CHKiRet(qqueueGetUngottenObj(pThis, &pUsr)); CHKiRet((objSerialize(pUsr))(pUsr, psQIF)); objDestruct(pUsr); } @@ -1972,14 +1972,14 @@ finalize_it: * abide to our regular call interface)... * rgerhards, 2008-01-13 */ -rsRetVal queueChkPersist(queue_t *pThis) +rsRetVal qqueueChkPersist(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) { - queuePersist(pThis, QUEUE_CHECKPOINT); + qqueuePersist(pThis, QUEUE_CHECKPOINT); pThis->iUpdsSincePersist = 0; } @@ -1988,8 +1988,8 @@ rsRetVal queueChkPersist(queue_t *pThis) /* destructor for the queue object */ -BEGINobjDestruct(queue) /* be sure to specify the object type also in END and CODESTART macros! */ -CODESTARTobjDestruct(queue) +BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */ +CODESTARTobjDestruct(qqueue) pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ /* shut down all workers (handles *all* of the persistence logic) @@ -1999,7 +1999,7 @@ CODESTARTobjDestruct(queue) * with a child! -- rgerhards, 2008-01-28 */ if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL) - queueShutdownWorkers(pThis); + qqueueShutdownWorkers(pThis); /* finally destruct our (regular) worker thread pool * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen, @@ -2024,7 +2024,7 @@ CODESTARTobjDestruct(queue) wtpDestruct(&pThis->pWtpDA); } if(pThis->pqDA != NULL) { - queueDestruct(&pThis->pqDA); + qqueueDestruct(&pThis->pqDA); } /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) @@ -2034,7 +2034,7 @@ CODESTARTobjDestruct(queue) * disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here * if need arises (what I doubt...) -- rgerhards, 2008-01-25 */ - CHKiRet_Hdlr(queuePersist(pThis, QUEUE_NO_CHECKPOINT)) { + CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) { dbgoprint((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet); } @@ -2059,7 +2059,7 @@ CODESTARTobjDestruct(queue) if(pThis->pszSpoolDir != NULL) free(pThis->pszSpoolDir); -ENDobjDestruct(queue) +ENDobjDestruct(qqueue) /* set the queue's file prefix @@ -2068,7 +2068,7 @@ ENDobjDestruct(queue) * rgerhards, 2008-01-09 */ rsRetVal -queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) +qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) { DEFiRet; @@ -2091,11 +2091,11 @@ finalize_it: * rgerhards, 2008-01-09 */ rsRetVal -queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize) +qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(iMaxFileSize < 1024) { ABORT_FINALIZE(RS_RET_VALUE_TOO_LOW); @@ -2112,13 +2112,13 @@ finalize_it: * Enqueues the new element and awakes worker thread. */ rsRetVal -queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) +qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) { DEFiRet; int iCancelStateSave; struct timespec t; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); /* first check if we need to discard this message (which will cause CHKiRet() to exit) * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize @@ -2127,7 +2127,7 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) * threading tools may point this access to be an error, but this is done * intentional. I do not see this causes problems to us. */ - CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE @@ -2142,7 +2142,7 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) - CHKiRet(queueChkStrtDA(pThis)); + CHKiRet(qqueueChkStrtDA(pThis)); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. @@ -2195,13 +2195,13 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) } /* and finally enqueue the message */ - CHKiRet(queueAdd(pThis, pUsr)); - queueChkPersist(pThis); + CHKiRet(qqueueAdd(pThis, pUsr)); + qqueueChkPersist(pThis); finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { /* make sure at least one worker is running. */ - queueAdviseMaxWorkers(pThis); + qqueueAdviseMaxWorkers(pThis); /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); @@ -2228,12 +2228,12 @@ finalize_it: * rgerhards, 2008-01-16 */ static rsRetVal -queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex) +qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); /* for simplicity, we do one big mutex lock. This method is extremely seldom * called, so that doesn't matter... -- rgerhards, 2008-01-16 @@ -2272,24 +2272,24 @@ finalize_it: /* some simple object access methods */ -DEFpropSetMeth(queue, iPersistUpdCnt, int) -DEFpropSetMeth(queue, iDeqtWinFromHr, int) -DEFpropSetMeth(queue, iDeqtWinToHr, int) -DEFpropSetMeth(queue, toQShutdown, long) -DEFpropSetMeth(queue, toActShutdown, long) -DEFpropSetMeth(queue, toWrkShutdown, long) -DEFpropSetMeth(queue, toEnq, long) -DEFpropSetMeth(queue, iHighWtrMrk, int) -DEFpropSetMeth(queue, iLowWtrMrk, int) -DEFpropSetMeth(queue, iDiscardMrk, int) -DEFpropSetMeth(queue, iFullDlyMrk, int) -DEFpropSetMeth(queue, iDiscardSeverity, int) -DEFpropSetMeth(queue, bIsDA, int) -DEFpropSetMeth(queue, iMinMsgsPerWrkr, int) -DEFpropSetMeth(queue, bSaveOnShutdown, int) -DEFpropSetMeth(queue, pUsr, void*) -DEFpropSetMeth(queue, iDeqSlowdown, int) -DEFpropSetMeth(queue, sizeOnDiskMax, int64) +DEFpropSetMeth(qqueue, iPersistUpdCnt, int) +DEFpropSetMeth(qqueue, iDeqtWinFromHr, int) +DEFpropSetMeth(qqueue, iDeqtWinToHr, int) +DEFpropSetMeth(qqueue, toQShutdown, long) +DEFpropSetMeth(qqueue, toActShutdown, long) +DEFpropSetMeth(qqueue, toWrkShutdown, long) +DEFpropSetMeth(qqueue, toEnq, long) +DEFpropSetMeth(qqueue, iHighWtrMrk, int) +DEFpropSetMeth(qqueue, iLowWtrMrk, int) +DEFpropSetMeth(qqueue, iDiscardMrk, int) +DEFpropSetMeth(qqueue, iFullDlyMrk, int) +DEFpropSetMeth(qqueue, iDiscardSeverity, int) +DEFpropSetMeth(qqueue, bIsDA, int) +DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) +DEFpropSetMeth(qqueue, bSaveOnShutdown, int) +DEFpropSetMeth(qqueue, pUsr, void*) +DEFpropSetMeth(qqueue, iDeqSlowdown, int) +DEFpropSetMeth(qqueue, sizeOnDiskMax, int64) /* This function can be used as a generic way to set properties. Only the subset @@ -2298,11 +2298,11 @@ DEFpropSetMeth(queue, sizeOnDiskMax, int64) * rgerhards, 2008-01-11 */ #define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1) -static rsRetVal queueSetProperty(queue_t *pThis, var_t *pProp) +static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pProp != NULL); if(isProp("iQueueSize")) { @@ -2324,19 +2324,19 @@ finalize_it: #undef isProp /* dummy */ -rsRetVal queueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; } +rsRetVal qqueueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; } /* Initialize the stream class. Must be called as the very first method * before anything else is called inside this class. * rgerhards, 2008-01-09 */ -BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE) +BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); /* now set our own handlers */ - OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty); -ENDObjClassInit(queue) + OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty); +ENDObjClassInit(qqueue) /* vi:set ai: */ -- cgit From f67cf99ee5cd88bda499aa52d6008bb7d4afe483 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 6 Mar 2009 14:27:15 +0100 Subject: fixed a platform issue the prevented building on solaris --- runtime/queue.c | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 7d78460c..c4a0fad2 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -51,6 +51,11 @@ #include "wti.h" #include "atomic.h" +#ifdef OS_SOLARIS +# include +# define pthread_yield() sched_yield() +#endif + /* static data */ DEFobjStaticHelpers DEFobjCurrIf(glbl) -- cgit