summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c29
1 files changed, 21 insertions, 8 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 93b33967..a3e29a96 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -49,6 +49,7 @@
#include "obj.h"
#include "wtp.h"
#include "wti.h"
+#include "atomic.h"
/* static data */
DEFobjStaticHelpers
@@ -1015,7 +1016,7 @@ queueAdd(queue_t *pThis, void *pUsr)
CHKiRet(pThis->qAdd(pThis, pUsr));
if(pThis->qType != QUEUETYPE_DIRECT) {
- ++pThis->iQueueSize;
+ ATOMIC_INC(pThis->iQueueSize);
dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize);
}
@@ -1044,7 +1045,7 @@ queueDel(queue_t *pThis, void *pUsr)
iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
- --pThis->iQueueSize;
+ ATOMIC_DEC(pThis->iQueueSize);
}
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
@@ -1284,6 +1285,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
/* we have an object, so let's fill the properties */
objConstructSetObjInfo(pThis);
+ pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc();
if((pThis->pszSpoolDir = (uchar*) strdup((char*)glbl.GetWorkDir())) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -1532,8 +1534,6 @@ queueRateLimiter(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
- dbgoprint((obj_t*) pThis, "entering rate limiter\n");
-
iDelay = 0;
if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */
/* time calls are expensive, so only do them when needed */
@@ -2120,6 +2120,15 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
ISOBJ_TYPE_assert(pThis, queue);
+ /* first check if we need to discard this message (which will cause CHKiRet() to exit)
+ * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize
+ * and bRunsDA parameters may not reflect the correct settings here, but they are
+ * "good enough" in the sense that they can be used to drive the decision. Valgrind's
+ * threading tools may point this access to be an error, but this is done
+ * intentional. I do not see this causes problems to us.
+ */
+ CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
+
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
* during its execution. If that is not done, race conditions occur if the
@@ -2131,9 +2140,6 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
d_pthread_mutex_lock(pThis->mut);
}
- /* first check if we need to discard this message (which will cause CHKiRet() to exit) */
- CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
-
/* then check if we need to add an assistance disk queue */
if(pThis->bIsDA)
CHKiRet(queueChkStrtDA(pThis));
@@ -2196,10 +2202,17 @@ finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
/* make sure at least one worker is running. */
queueAdviseMaxWorkers(pThis);
- dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
+ dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n");
+ /* the following pthread_yield is experimental, but brought us performance
+ * benefit. For details, please see http://kb.monitorware.com/post14216.html#p14216
+ * rgerhards, 2008-10-09
+ * but this is only true for uniprocessors, so we guard it with an optimize flag -- rgerhards, 2008-10-22
+ */
+ if(pThis->bOptimizeUniProc)
+ pthread_yield();
}
RETiRet;