summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c452
1 files changed, 235 insertions, 217 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 93b33967..c4a0fad2 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -49,20 +49,26 @@
#include "obj.h"
#include "wtp.h"
#include "wti.h"
+#include "atomic.h"
+
+#ifdef OS_SOLARIS
+# include <sched.h>
+# define pthread_yield() sched_yield()
+#endif
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(glbl)
/* forward-definitions */
-rsRetVal queueChkPersist(queue_t *pThis);
-static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex);
-static rsRetVal queueRateLimiter(queue_t *pThis);
-static int queueChkStopWrkrDA(queue_t *pThis);
-static int queueIsIdleDA(queue_t *pThis);
-static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
-static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2);
-static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex);
+rsRetVal qqueueChkPersist(qqueue_t *pThis);
+static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
+static rsRetVal qqueueRateLimiter(qqueue_t *pThis);
+static int qqueueChkStopWrkrDA(qqueue_t *pThis);
+static int qqueueIsIdleDA(qqueue_t *pThis);
+static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
+static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2);
+static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -76,7 +82,7 @@ static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex);
* rgerhards, 2008-01-29
*/
static inline int
-queueGetOverallQueueSize(queue_t *pThis)
+qqueueGetOverallQueueSize(qqueue_t *pThis)
{
#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */
BEGINfunc
@@ -95,7 +101,7 @@ ENDfunc
* This function returns void, as it makes no sense to communicate an error back, even if
* it happens.
*/
-static inline void queueDrain(queue_t *pThis)
+static inline void queueDrain(qqueue_t *pThis)
{
void *pUsr;
@@ -118,26 +124,26 @@ static inline void queueDrain(queue_t *pThis)
* this point in time. The mutex must be locked when
* ths function is called. -- rgerhards, 2008-01-25
*/
-static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
+static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
{
DEFiRet;
int iMaxWorkers;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(!pThis->bEnqOnly) {
if(pThis->bRunsDA) {
/* if we have not yet reached the high water mark, there is no need to start a
* worker. -- rgerhards, 2008-01-26
*/
- if(queueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
+ if(qqueueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
}
} else {
if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
iMaxWorkers = 1;
} else {
- iMaxWorkers = queueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
+ iMaxWorkers = qqueueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
}
wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
}
@@ -152,11 +158,11 @@ static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
* rgerhards, 2008-02-27
*/
static rsRetVal
-queueWaitDAModeInitialized(queue_t *pThis)
+qqueueWaitDAModeInitialized(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->bRunsDA);
while(pThis->bRunsDA != 2) {
@@ -178,17 +184,17 @@ queueWaitDAModeInitialized(queue_t *pThis)
* rgerhards, 2008-01-15
*/
static rsRetVal
-queueTurnOffDAMode(queue_t *pThis)
+qqueueTurnOffDAMode(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->bRunsDA);
/* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
* to wait for its startup... -- rgerhards, 2008-01-25
*/
- queueWaitDAModeInitialized(pThis);
+ qqueueWaitDAModeInitialized(pThis);
/* if we need to pull any data that we still need from the (child) disk queue,
* now would be the time to do so. At present, we do not need this, but I'd like to
@@ -207,15 +213,15 @@ queueTurnOffDAMode(queue_t *pThis)
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
* this will be quick.
*/
- queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
+ qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
iRet);
/* now we need to check if the regular queue has some messages. This may be the case
* when it is waiting that the high water mark is reached again. If so, we need to start up
* a regular worker. -- rgerhards, 2008-01-26
*/
- if(queueGetOverallQueueSize(pThis) > 0) {
- queueAdviseMaxWorkers(pThis);
+ if(qqueueGetOverallQueueSize(pThis) > 0) {
+ qqueueAdviseMaxWorkers(pThis);
}
}
@@ -231,11 +237,11 @@ queueTurnOffDAMode(queue_t *pThis)
* rgerhards, 2008-01-14
*/
static rsRetVal
-queueChkIsDA(queue_t *pThis)
+qqueueChkIsDA(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pszFilePrefix != NULL) {
pThis->bIsDA = 1;
dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n");
@@ -259,18 +265,18 @@ queueChkIsDA(queue_t *pThis)
* rgerhards, 2008-01-15
*/
static rsRetVal
-queueStartDA(queue_t *pThis)
+qqueueStartDA(qqueue_t *pThis)
{
DEFiRet;
uchar pszDAQName[128];
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */
FINALIZE; /* ... then we are already done! */
/* create message queue */
- CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
+ CHKiRet(qqueueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
/* give it a name */
snprintf((char*) pszDAQName, sizeof(pszDAQName)/sizeof(uchar), "%s[DA]", obj.GetName((obj_t*) pThis));
@@ -281,30 +287,30 @@ queueStartDA(queue_t *pThis)
*/
pThis->pqDA->pqParent = pThis;
- CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr));
- CHKiRet(queueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
- CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
- CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
- CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
- CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
- CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
- CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
- CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
- CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
- CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
- CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
- CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
+ CHKiRet(qqueueSetpUsr(pThis->pqDA, pThis->pUsr));
+ CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
+ CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
+ CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
+ CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
+ CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
+ CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq));
+ CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
+ CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
+ CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
+ CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0));
+ CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0));
if(pThis->toQShutdown == 0) {
- CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
+ CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
} else {
/* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND
* have an obviously large backlog, we can't finish it in any case. So there is no point
* in holding shutdown longer than necessary. -- rgerhards, 2008-01-15
*/
- CHKiRet(queueSettoQShutdown(pThis->pqDA, 1));
+ CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1));
}
- iRet = queueStart(pThis->pqDA);
+ iRet = qqueueStart(pThis->pqDA);
/* file not found is expected, that means it is no previous QIF available */
if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
FINALIZE; /* something is wrong */
@@ -322,12 +328,12 @@ queueStartDA(queue_t *pThis)
pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
- queueGetID(pThis->pqDA));
+ qqueueGetID(pThis->pqDA));
finalize_it:
if(iRet != RS_RET_OK) {
if(pThis->pqDA != NULL) {
- queueDestruct(&pThis->pqDA);
+ qqueueDestruct(&pThis->pqDA);
}
dbgoprint((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet);
pThis->bIsDA = 0;
@@ -344,7 +350,7 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static inline rsRetVal
-queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
+qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
@@ -362,12 +368,12 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) queueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueStartDA));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueTurnOffDAMode));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
@@ -400,14 +406,14 @@ finalize_it:
* rgerhards, 2008-01-14
*/
static inline rsRetVal
-queueChkStrtDA(queue_t *pThis)
+qqueueChkStrtDA(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
/* if we do not hit the high water mark, we have nothing to do */
- if(queueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk)
+ if(qqueueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk)
ABORT_FINALIZE(RS_RET_OK);
if(pThis->bRunsDA) {
@@ -421,15 +427,15 @@ queueChkStrtDA(queue_t *pThis)
* we need at least one).
*/
dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n",
- queueGetOverallQueueSize(pThis));
- queueAdviseMaxWorkers(pThis);
+ qqueueGetOverallQueueSize(pThis));
+ qqueueAdviseMaxWorkers(pThis);
} else {
/* this is the case when we are currently not running in DA mode. So it is time
* to turn it back on.
*/
dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
- queueGetOverallQueueSize(pThis));
- queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
+ qqueueGetOverallQueueSize(pThis));
+ qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
}
finalize_it:
@@ -447,7 +453,7 @@ finalize_it:
*/
/* -------------------- fixed array -------------------- */
-static rsRetVal qConstructFixedArray(queue_t *pThis)
+static rsRetVal qConstructFixedArray(qqueue_t *pThis)
{
DEFiRet;
@@ -463,14 +469,14 @@ static rsRetVal qConstructFixedArray(queue_t *pThis)
pThis->tVars.farray.head = 0;
pThis->tVars.farray.tail = 0;
- queueChkIsDA(pThis);
+ qqueueChkIsDA(pThis);
finalize_it:
RETiRet;
}
-static rsRetVal qDestructFixedArray(queue_t *pThis)
+static rsRetVal qDestructFixedArray(qqueue_t *pThis)
{
DEFiRet;
@@ -485,7 +491,7 @@ static rsRetVal qDestructFixedArray(queue_t *pThis)
}
-static rsRetVal qAddFixedArray(queue_t *pThis, void* in)
+static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
{
DEFiRet;
@@ -498,7 +504,7 @@ static rsRetVal qAddFixedArray(queue_t *pThis, void* in)
RETiRet;
}
-static rsRetVal qDelFixedArray(queue_t *pThis, void **out)
+static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out)
{
DEFiRet;
@@ -517,7 +523,7 @@ static rsRetVal qDelFixedArray(queue_t *pThis, void **out)
/* first some generic functions which are also used for the unget linked list */
-static inline rsRetVal queueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr)
+static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr)
{
DEFiRet;
qLinkedList_t *pEntry;
@@ -543,7 +549,7 @@ finalize_it:
RETiRet;
}
-static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr)
+static inline rsRetVal qqueueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr)
{
DEFiRet;
qLinkedList_t *pEntry;
@@ -570,7 +576,7 @@ static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t
/* end generic functions which are also used for the unget linked list */
-static rsRetVal qConstructLinkedList(queue_t *pThis)
+static rsRetVal qConstructLinkedList(qqueue_t *pThis)
{
DEFiRet;
@@ -579,13 +585,13 @@ static rsRetVal qConstructLinkedList(queue_t *pThis)
pThis->tVars.linklist.pRoot = 0;
pThis->tVars.linklist.pLast = 0;
- queueChkIsDA(pThis);
+ qqueueChkIsDA(pThis);
RETiRet;
}
-static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis)
+static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis)
{
DEFiRet;
@@ -598,11 +604,11 @@ static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis)
RETiRet;
}
-static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr)
+static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
{
DEFiRet;
- iRet = queueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
+ iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
#if 0
qLinkedList_t *pEntry;
@@ -626,10 +632,10 @@ finalize_it:
RETiRet;
}
-static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr)
+static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
- iRet = queueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
+ iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
#if 0
qLinkedList_t *pEntry;
@@ -656,11 +662,11 @@ static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr)
static rsRetVal
-queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t __attribute__((unused)) *pThis)
+qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pThis)
{
DEFiRet;
ISOBJ_TYPE_assert(pStrm, strm);
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
CHKiRet(strmSetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
finalize_it:
RETiRet;
@@ -672,14 +678,14 @@ finalize_it:
* rgerhards, 2008-01-15
*/
static rsRetVal
-queueHaveQIF(queue_t *pThis)
+qqueueHaveQIF(qqueue_t *pThis)
{
DEFiRet;
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
struct stat stat_buf;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pszFilePrefix == NULL)
ABORT_FINALIZE(RS_RET_NO_FILEPREFIX);
@@ -709,7 +715,7 @@ finalize_it:
* rgerhards, 2008-01-11
*/
static rsRetVal
-queueTryLoadPersistedInfo(queue_t *pThis)
+qqueueTryLoadPersistedInfo(qqueue_t *pThis)
{
DEFiRet;
strm_t *psQIF = NULL;
@@ -719,7 +725,7 @@ queueTryLoadPersistedInfo(queue_t *pThis)
int iUngottenObjs;
obj_t *pUsr;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
@@ -754,15 +760,15 @@ queueTryLoadPersistedInfo(queue_t *pThis)
while(iUngottenObjs > 0) {
/* fill the queue from disk */
CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL));
- queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
+ qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
--iUngottenObjs; /* one less */
}
/* and now the stream objects (some order as when persisted!) */
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF,
- (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
+ (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF,
- (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis));
+ (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead));
@@ -792,7 +798,7 @@ finalize_it:
* allowed file size at this point - that should be a config setting...
* rgerhards, 2008-01-10
*/
-static rsRetVal qConstructDisk(queue_t *pThis)
+static rsRetVal qConstructDisk(qqueue_t *pThis)
{
DEFiRet;
int bRestarted = 0;
@@ -800,7 +806,7 @@ static rsRetVal qConstructDisk(queue_t *pThis)
ASSERT(pThis != NULL);
/* and now check if there is some persistent information that needs to be read in */
- iRet = queueTryLoadPersistedInfo(pThis);
+ iRet = qqueueTryLoadPersistedInfo(pThis);
if(iRet == RS_RET_OK)
bRestarted = 1;
else if(iRet != RS_RET_FILE_NOT_FOUND)
@@ -842,7 +848,7 @@ finalize_it:
}
-static rsRetVal qDestructDisk(queue_t *pThis)
+static rsRetVal qDestructDisk(qqueue_t *pThis)
{
DEFiRet;
@@ -854,7 +860,7 @@ static rsRetVal qDestructDisk(queue_t *pThis)
RETiRet;
}
-static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
+static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr)
{
DEFiRet;
number_t nWriteCount;
@@ -881,7 +887,7 @@ finalize_it:
RETiRet;
}
-static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
+static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr)
{
DEFiRet;
@@ -912,18 +918,18 @@ finalize_it:
}
/* -------------------- direct (no queueing) -------------------- */
-static rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis)
+static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
{
return RS_RET_OK;
}
-static rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis)
+static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
{
return RS_RET_OK;
}
-static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
+static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
DEFiRet;
@@ -940,7 +946,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
RETiRet;
}
-static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
+static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
{
return RS_RET_OK;
}
@@ -955,12 +961,12 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__
* rgerhards, 2008-01-20
*/
static rsRetVal
-queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex)
+qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15
The second time I noticed it the queue was in destruction with NO worker threads
running. The pUsr ptr was totally off and provided no clue what it may be pointing
@@ -969,7 +975,7 @@ queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex)
dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr));
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
- iRet = queueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
+ iRet = qqueueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
++pThis->iUngottenObjs; /* indicate one more */
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -985,14 +991,14 @@ queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex)
* rgerhards, 2008-01-29
*/
static rsRetVal
-queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr)
+qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(ppUsr != NULL);
- iRet = queueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
+ iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
--pThis->iUngottenObjs; /* indicate one less */
dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr));
@@ -1006,7 +1012,7 @@ queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr)
* things truely different. -- rgerhards, 2008-02-12
*/
static rsRetVal
-queueAdd(queue_t *pThis, void *pUsr)
+qqueueAdd(qqueue_t *pThis, void *pUsr)
{
DEFiRet;
@@ -1015,7 +1021,7 @@ queueAdd(queue_t *pThis, void *pUsr)
CHKiRet(pThis->qAdd(pThis, pUsr));
if(pThis->qType != QUEUETYPE_DIRECT) {
- ++pThis->iQueueSize;
+ ATOMIC_INC(pThis->iQueueSize);
dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize);
}
@@ -1029,7 +1035,7 @@ finalize_it:
* ungotten list and, if so, dequeue it first.
*/
static rsRetVal
-queueDel(queue_t *pThis, void *pUsr)
+qqueueDel(qqueue_t *pThis, void *pUsr)
{
DEFiRet;
@@ -1041,10 +1047,10 @@ queueDel(queue_t *pThis, void *pUsr)
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
if(pThis->iUngottenObjs > 0) {
- iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr);
+ iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
- --pThis->iQueueSize;
+ ATOMIC_DEC(pThis->iQueueSize);
}
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
@@ -1065,14 +1071,14 @@ queueDel(queue_t *pThis, void *pUsr)
* complex) if each would have its own shutdown. The function does not self check
* this condition - the caller must make sure it is not called with a parent.
*/
-static rsRetVal queueShutdownWorkers(queue_t *pThis)
+static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
{
DEFiRet;
DEFVARS_mutexProtection;
struct timespec tTimeout;
rsRetVal iRetLocal;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
@@ -1086,7 +1092,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* first try to shutdown the queue within the regular shutdown period */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(queueGetOverallQueueSize(pThis) > 0) {
+ if(qqueueGetOverallQueueSize(pThis) > 0) {
if(pThis->bRunsDA) {
/* We may have waited on the low water mark. As it may have changed, we
* see if we reactivate the worker.
@@ -1124,7 +1130,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
if(pThis->bRunsDA) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
- queueGetID(pThis->pqDA));
+ qqueueGetID(pThis->pqDA));
/* we use the same absolute timeout as above, so we do not use more than the configured
* timeout interval!
*/
@@ -1153,19 +1159,19 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
if(pThis->bRunsDA)
- queueWaitDAModeInitialized(pThis);
+ qqueueWaitDAModeInitialized(pThis);
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
- if(pThis->bIsDA && queueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
+ if(pThis->bIsDA && qqueueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
/* switch to enqueue-only mode so that no more actions happen */
if(pThis->bRunsDA == 0) {
- queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
+ qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
} else {
/* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1)
* but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27
*/
- queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
+ qqueueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
}
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
/* make sure we do not timeout before we are done */
@@ -1187,7 +1193,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* they will automatically terminate as there no longer is any message left to process.
*/
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- if(queueGetOverallQueueSize(pThis) > 0) {
+ if(qqueueGetOverallQueueSize(pThis) > 0) {
timeoutComp(&tTimeout, pThis->toActShutdown);
if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
@@ -1256,7 +1262,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running.
*/
- dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", queueGetOverallQueueSize(pThis));
+ dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", qqueueGetOverallQueueSize(pThis));
RETiRet;
}
@@ -1268,22 +1274,23 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* is done by queueStart(). The reason is that we want to give the caller a chance
* to modify some parameters before the queue is actually started.
*/
-rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
+rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*))
{
DEFiRet;
- queue_t *pThis;
+ qqueue_t *pThis;
ASSERT(ppThis != NULL);
ASSERT(pConsumer != NULL);
ASSERT(iWorkerThreads >= 0);
- if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) {
+ if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
/* we have an object, so let's fill the properties */
objConstructSetObjInfo(pThis);
+ pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc();
if((pThis->pszSpoolDir = (uchar*) strdup((char*)glbl.GetWorkDir())) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -1314,7 +1321,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
pThis->qConstruct = qConstructLinkedList;
pThis->qDestruct = qDestructLinkedList;
pThis->qAdd = qAddLinkedList;
- pThis->qDel = (rsRetVal (*)(queue_t*,void**)) qDelLinkedList;
+ pThis->qDel = (rsRetVal (*)(qqueue_t*,void**)) qDelLinkedList;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
@@ -1341,25 +1348,25 @@ finalize_it:
/* cancellation cleanup handler for queueWorker ()
* Updates admin structure and frees ressources.
* Params:
- * arg1 - user pointer (in this case a queue_t)
+ * arg1 - user pointer (in this case a qqueue_t)
* arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!])
* Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists!
* rgerhards, 2008-01-16
*/
static rsRetVal
-queueConsumerCancelCleanup(void *arg1, void *arg2)
+qqueueConsumerCancelCleanup(void *arg1, void *arg2)
{
DEFiRet;
- queue_t *pThis = (queue_t*) arg1;
+ qqueue_t *pThis = (qqueue_t*) arg1;
obj_t *pUsr = (obj_t*) arg2;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pUsr != NULL) {
/* make sure the data element is not lost */
dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
- CHKiRet(queueUngetObj(pThis, pUsr, LOCK_MUTEX));
+ CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX));
}
finalize_it:
@@ -1381,13 +1388,13 @@ finalize_it:
* the return state!
* rgerhards, 2008-01-24
*/
-static int queueChkDiscardMsg(queue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr)
+static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr)
{
DEFiRet;
rsRetVal iRetLocal;
int iSeverity;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_assert(pUsr);
if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) {
@@ -1412,7 +1419,7 @@ finalize_it:
* rgerhards, 2008-10-21
*/
static rsRetVal
-queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
+qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
void *pUsr;
@@ -1420,9 +1427,9 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
int bRunsDA; /* cache for early mutex release */
/* dequeue element (still protected from mutex) */
- iRet = queueDel(pThis, &pUsr);
- queueChkPersist(pThis);
- iQueueSize = queueGetOverallQueueSize(pThis); /* cache this for after mutex release */
+ iRet = qqueueDel(pThis, &pUsr);
+ qqueueChkPersist(pThis);
+ iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */
bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
/* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY
@@ -1473,7 +1480,7 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
* provide real-time creation of spool files.
* Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong.
*/
- CHKiRet(queueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
+ CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
finalize_it:
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
@@ -1522,7 +1529,7 @@ finalize_it:
* but you get the idea from the code above.
*/
static rsRetVal
-queueRateLimiter(queue_t *pThis)
+qqueueRateLimiter(qqueue_t *pThis)
{
DEFiRet;
int iDelay;
@@ -1530,9 +1537,7 @@ queueRateLimiter(queue_t *pThis)
time_t tCurr;
struct tm m;
- ISOBJ_TYPE_assert(pThis, queue);
-
- dbgoprint((obj_t*) pThis, "entering rate limiter\n");
+ ISOBJ_TYPE_assert(pThis, qqueue);
iDelay = 0;
if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */
@@ -1587,14 +1592,14 @@ queueRateLimiter(queue_t *pThis)
* rgerhards, 2008-01-21
*/
static rsRetVal
-queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
+qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
/* we now need to check if we should deliberately delay processing a bit
@@ -1621,15 +1626,15 @@ finalize_it:
* rgerhards, 2008-01-14
*/
static rsRetVal
-queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
+qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(queueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
+ CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
@@ -1645,7 +1650,7 @@ finalize_it:
* the DA queue
*/
static int
-queueChkStopWrkrDA(queue_t *pThis)
+qqueueChkStopWrkrDA(qqueue_t *pThis)
{
/* if our queue is in destruction, we drain to the DA queue and so we shall not terminate
* until we have done so.
@@ -1664,7 +1669,7 @@ queueChkStopWrkrDA(queue_t *pThis)
&& pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
/* this queue can never grow, so we can give up... */
bStopWrkr = 1;
- } else if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
+ } else if(qqueueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
bStopWrkr = 1;
} else {
bStopWrkr = 0;
@@ -1687,9 +1692,9 @@ queueChkStopWrkrDA(queue_t *pThis)
* the DA queue
*/
static int
-queueChkStopWrkrReg(queue_t *pThis)
+qqueueChkStopWrkrReg(qqueue_t *pThis)
{
- return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && queueGetOverallQueueSize(pThis) == 0);
+ return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0);
}
@@ -1697,26 +1702,26 @@ queueChkStopWrkrReg(queue_t *pThis)
* are not stable! DA queue version
*/
static int
-queueIsIdleDA(queue_t *pThis)
+qqueueIsIdleDA(qqueue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
/* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
- return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
}
/* must only be called when the queue mutex is locked, else results
* are not stable! Regular queue version
*/
static int
-queueIsIdleReg(queue_t *pThis)
+qqueueIsIdleReg(qqueue_t *pThis)
{
#if 0 /* enable for performance testing */
int ret;
- ret = queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk);
+ ret = qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk);
if(ret) fprintf(stderr, "queue is idle\n");
return ret;
#else
/* regular code! */
- return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk));
#endif
}
@@ -1735,11 +1740,11 @@ queueIsIdleReg(queue_t *pThis)
* I am telling this, because I, too, always get confused by those...
*/
static rsRetVal
-queueRegOnWrkrShutdown(queue_t *pThis)
+qqueueRegOnWrkrShutdown(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pqParent != NULL) {
pThis->pqParent->bChildIsDone = 1; /* indicate we are done */
@@ -1756,11 +1761,11 @@ queueRegOnWrkrShutdown(queue_t *pThis)
* hook to indicate in the parent queue (if we are a child) that we are not done yet.
*/
static rsRetVal
-queueRegOnWrkrStartup(queue_t *pThis)
+qqueueRegOnWrkrStartup(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->pqParent != NULL) {
pThis->pqParent->bChildIsDone = 0;
@@ -1773,7 +1778,7 @@ queueRegOnWrkrStartup(queue_t *pThis)
/* start up the queue - it must have been constructed and parameters defined
* before.
*/
-rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
+rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
rsRetVal iRetLocal;
@@ -1811,7 +1816,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
- queueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1);
+ qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
@@ -1822,13 +1827,13 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
- CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))queueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrStartup));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrShutdown));
+ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
@@ -1841,10 +1846,10 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
/* If we are disk-assisted, we need to check if there is a QIF file
* which we need to load. -- rgerhards, 2008-01-15
*/
- iRetLocal = queueHaveQIF(pThis);
+ iRetLocal = qqueueHaveQIF(pThis);
if(iRetLocal == RS_RET_OK) {
dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n");
- queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
+ qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
bInitialized = 1; /* we are done */
} else {
/* TODO: use logerror? -- rgerhards, 2008-01-16 */
@@ -1861,7 +1866,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
/* if the queue already contains data, we need to start the correct number of worker threads. This can be
* the case when a disk queue has been loaded. If we did not start it here, it would never start.
*/
- queueAdviseMaxWorkers(pThis);
+ qqueueAdviseMaxWorkers(pThis);
pThis->bQueueStarted = 1;
finalize_it:
@@ -1876,7 +1881,7 @@ finalize_it:
* and 0 otherwise.
* rgerhards, 2008-01-10
*/
-static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
+static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
{
DEFiRet;
strm_t *psQIF = NULL; /* Queue Info File */
@@ -1887,7 +1892,7 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
ASSERT(pThis != NULL);
if(pThis->qType != QUEUETYPE_DISK) {
- if(queueGetOverallQueueSize(pThis) > 0) {
+ if(qqueueGetOverallQueueSize(pThis) > 0) {
/* This error code is OK, but we will probably not implement this any time
* The reason is that persistence happens via DA queues. But I would like to
* leave the code as is, as we so have a hook in case we need one.
@@ -1898,13 +1903,13 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
FINALIZE; /* if the queue is empty, we are happy and done... */
}
- dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", queueGetOverallQueueSize(pThis));
+ dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", qqueueGetOverallQueueSize(pThis));
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
(char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
- if((bIsCheckpoint != QUEUE_CHECKPOINT) && (queueGetOverallQueueSize(pThis) == 0)) {
+ if((bIsCheckpoint != QUEUE_CHECKPOINT) && (qqueueGetOverallQueueSize(pThis) == 0)) {
if(pThis->bNeedDelQIF) {
unlink((char*)pszQIFNam);
pThis->bNeedDelQIF = 0;
@@ -1938,7 +1943,7 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
* to the regular files. -- rgerhards, 2008-01-29
*/
while(pThis->iUngottenObjs > 0) {
- CHKiRet(queueGetUngottenObj(pThis, &pUsr));
+ CHKiRet(qqueueGetUngottenObj(pThis, &pUsr));
CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
objDestruct(pUsr);
}
@@ -1972,14 +1977,14 @@ finalize_it:
* abide to our regular call interface)...
* rgerhards, 2008-01-13
*/
-rsRetVal queueChkPersist(queue_t *pThis)
+rsRetVal qqueueChkPersist(qqueue_t *pThis)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
- queuePersist(pThis, QUEUE_CHECKPOINT);
+ qqueuePersist(pThis, QUEUE_CHECKPOINT);
pThis->iUpdsSincePersist = 0;
}
@@ -1988,8 +1993,8 @@ rsRetVal queueChkPersist(queue_t *pThis)
/* destructor for the queue object */
-BEGINobjDestruct(queue) /* be sure to specify the object type also in END and CODESTART macros! */
-CODESTARTobjDestruct(queue)
+BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */
+CODESTARTobjDestruct(qqueue)
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
/* shut down all workers (handles *all* of the persistence logic)
@@ -1999,7 +2004,7 @@ CODESTARTobjDestruct(queue)
* with a child! -- rgerhards, 2008-01-28
*/
if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL)
- queueShutdownWorkers(pThis);
+ qqueueShutdownWorkers(pThis);
/* finally destruct our (regular) worker thread pool
* Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,
@@ -2024,7 +2029,7 @@ CODESTARTobjDestruct(queue)
wtpDestruct(&pThis->pWtpDA);
}
if(pThis->pqDA != NULL) {
- queueDestruct(&pThis->pqDA);
+ qqueueDestruct(&pThis->pqDA);
}
/* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty)
@@ -2034,7 +2039,7 @@ CODESTARTobjDestruct(queue)
* disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here
* if need arises (what I doubt...) -- rgerhards, 2008-01-25
*/
- CHKiRet_Hdlr(queuePersist(pThis, QUEUE_NO_CHECKPOINT)) {
+ CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) {
dbgoprint((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet);
}
@@ -2059,7 +2064,7 @@ CODESTARTobjDestruct(queue)
if(pThis->pszSpoolDir != NULL)
free(pThis->pszSpoolDir);
-ENDobjDestruct(queue)
+ENDobjDestruct(qqueue)
/* set the queue's file prefix
@@ -2068,7 +2073,7 @@ ENDobjDestruct(queue)
* rgerhards, 2008-01-09
*/
rsRetVal
-queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
+qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
{
DEFiRet;
@@ -2091,11 +2096,11 @@ finalize_it:
* rgerhards, 2008-01-09
*/
rsRetVal
-queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize)
+qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
if(iMaxFileSize < 1024) {
ABORT_FINALIZE(RS_RET_VALUE_TOO_LOW);
@@ -2112,13 +2117,22 @@ finalize_it:
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
-queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
+qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
{
DEFiRet;
int iCancelStateSave;
struct timespec t;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
+
+ /* first check if we need to discard this message (which will cause CHKiRet() to exit)
+ * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize
+ * and bRunsDA parameters may not reflect the correct settings here, but they are
+ * "good enough" in the sense that they can be used to drive the decision. Valgrind's
+ * threading tools may point this access to be an error, but this is done
+ * intentional. I do not see this causes problems to us.
+ */
+ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
@@ -2131,12 +2145,9 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
d_pthread_mutex_lock(pThis->mut);
}
- /* first check if we need to discard this message (which will cause CHKiRet() to exit) */
- CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
-
/* then check if we need to add an assistance disk queue */
if(pThis->bIsDA)
- CHKiRet(queueChkStrtDA(pThis));
+ CHKiRet(qqueueChkStrtDA(pThis));
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
@@ -2189,17 +2200,24 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
}
/* and finally enqueue the message */
- CHKiRet(queueAdd(pThis, pUsr));
- queueChkPersist(pThis);
+ CHKiRet(qqueueAdd(pThis, pUsr));
+ qqueueChkPersist(pThis);
finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
/* make sure at least one worker is running. */
- queueAdviseMaxWorkers(pThis);
- dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
+ qqueueAdviseMaxWorkers(pThis);
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
+ dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
+ /* the following pthread_yield is experimental, but brought us performance
+ * benefit. For details, please see http://kb.monitorware.com/post14216.html#p14216
+ * rgerhards, 2008-10-09
+ * but this is only true for uniprocessors, so we guard it with an optimize flag -- rgerhards, 2008-10-22
+ */
+ if(pThis->bOptimizeUniProc)
+ pthread_yield();
}
RETiRet;
@@ -2215,12 +2233,12 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static rsRetVal
-queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex)
+qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
/* for simplicity, we do one big mutex lock. This method is extremely seldom
* called, so that doesn't matter... -- rgerhards, 2008-01-16
@@ -2259,24 +2277,24 @@ finalize_it:
/* some simple object access methods */
-DEFpropSetMeth(queue, iPersistUpdCnt, int)
-DEFpropSetMeth(queue, iDeqtWinFromHr, int)
-DEFpropSetMeth(queue, iDeqtWinToHr, int)
-DEFpropSetMeth(queue, toQShutdown, long)
-DEFpropSetMeth(queue, toActShutdown, long)
-DEFpropSetMeth(queue, toWrkShutdown, long)
-DEFpropSetMeth(queue, toEnq, long)
-DEFpropSetMeth(queue, iHighWtrMrk, int)
-DEFpropSetMeth(queue, iLowWtrMrk, int)
-DEFpropSetMeth(queue, iDiscardMrk, int)
-DEFpropSetMeth(queue, iFullDlyMrk, int)
-DEFpropSetMeth(queue, iDiscardSeverity, int)
-DEFpropSetMeth(queue, bIsDA, int)
-DEFpropSetMeth(queue, iMinMsgsPerWrkr, int)
-DEFpropSetMeth(queue, bSaveOnShutdown, int)
-DEFpropSetMeth(queue, pUsr, void*)
-DEFpropSetMeth(queue, iDeqSlowdown, int)
-DEFpropSetMeth(queue, sizeOnDiskMax, int64)
+DEFpropSetMeth(qqueue, iPersistUpdCnt, int)
+DEFpropSetMeth(qqueue, iDeqtWinFromHr, int)
+DEFpropSetMeth(qqueue, iDeqtWinToHr, int)
+DEFpropSetMeth(qqueue, toQShutdown, long)
+DEFpropSetMeth(qqueue, toActShutdown, long)
+DEFpropSetMeth(qqueue, toWrkShutdown, long)
+DEFpropSetMeth(qqueue, toEnq, long)
+DEFpropSetMeth(qqueue, iHighWtrMrk, int)
+DEFpropSetMeth(qqueue, iLowWtrMrk, int)
+DEFpropSetMeth(qqueue, iDiscardMrk, int)
+DEFpropSetMeth(qqueue, iFullDlyMrk, int)
+DEFpropSetMeth(qqueue, iDiscardSeverity, int)
+DEFpropSetMeth(qqueue, bIsDA, int)
+DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
+DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
+DEFpropSetMeth(qqueue, pUsr, void*)
+DEFpropSetMeth(qqueue, iDeqSlowdown, int)
+DEFpropSetMeth(qqueue, sizeOnDiskMax, int64)
/* This function can be used as a generic way to set properties. Only the subset
@@ -2285,11 +2303,11 @@ DEFpropSetMeth(queue, sizeOnDiskMax, int64)
* rgerhards, 2008-01-11
*/
#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1)
-static rsRetVal queueSetProperty(queue_t *pThis, var_t *pProp)
+static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp)
{
DEFiRet;
- ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pProp != NULL);
if(isProp("iQueueSize")) {
@@ -2311,19 +2329,19 @@ finalize_it:
#undef isProp
/* dummy */
-rsRetVal queueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
+rsRetVal qqueueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
/* Initialize the stream class. Must be called as the very first method
* before anything else is called inside this class.
* rgerhards, 2008-01-09
*/
-BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE)
+BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
CHKiRet(objUse(glbl, CORE_COMPONENT));
/* now set our own handlers */
- OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
-ENDObjClassInit(queue)
+ OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty);
+ENDObjClassInit(qqueue)
/* vi:set ai:
*/