summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--msg.c15
-rw-r--r--obj.h1
-rw-r--r--queue.c21
3 files changed, 35 insertions, 2 deletions
diff --git a/msg.c b/msg.c
index 9825ea77..3ffaa6bb 100644
--- a/msg.c
+++ b/msg.c
@@ -2142,6 +2142,20 @@ static rsRetVal MsgConstructFinalizer(msg_t *pThis)
}
+/* get the severity - this is an entry point that
+ * satisfies the base object class getSeverity semantics.
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+MsgGetSeverity(obj_t *pThis, int *piSeverity)
+{
+ ISOBJ_TYPE_assert(pThis, Msg);
+ assert(piSeverity != NULL);
+ *piSeverity = ((msg_t*) pThis)->iSeverity;
+ return RS_RET_OK;
+}
+
+
/* Initialize the message class. Must be called as the very first method
* before anything else is called inside this class.
* rgerhards, 2008-01-04
@@ -2150,6 +2164,7 @@ BEGINObjClassInit(Msg, 1)
OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize);
OBJSetMethodHandler(objMethod_SETPROPERTY, MsgSetProperty);
OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, MsgConstructFinalizer);
+ OBJSetMethodHandler(objMethod_GETSEVERITY, MsgGetSeverity);
/* initially, we have no need to lock message objects */
funcLock = MsgLockingDummy;
funcUnlock = MsgLockingDummy;
diff --git a/obj.h b/obj.h
index acdbd24e..f4a9aee6 100644
--- a/obj.h
+++ b/obj.h
@@ -80,6 +80,7 @@
#endif
#define objDestruct(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_DESTRUCT])(pThis)
#define objSerialize(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_SERIALIZE])
+#define objGetSeverity(pThis, piSever) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_GETSEVERITY])(pThis, piSever)
#define OBJSetMethodHandler(methodID, pHdlr) \
CHKiRet(objInfoSetMethod(pObjInfoOBJ, methodID, (rsRetVal (*)(void*)) pHdlr))
diff --git a/queue.c b/queue.c
index 8010a45d..3559835b 100644
--- a/queue.c
+++ b/queue.c
@@ -1051,6 +1051,8 @@ queueEnqObj(queue_t *pThis, void *pUsr)
int iCancelStateSave;
int i;
struct timespec t;
+ int iSeverity = 8;
+ rsRetVal iRetLocal;
assert(pThis != NULL);
@@ -1064,13 +1066,28 @@ queueEnqObj(queue_t *pThis, void *pUsr)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
}
+
+ if(pThis->iQueueSize >= pThis->iDiscardMrk) {
+ iRetLocal = objGetSeverity(pUsr, &iSeverity);
+ if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
+ dbgprintf("Queue 0x%lx: queue nearly full (%d entries), discarded severity %d message\n",
+ queueGetID(pThis), pThis->iQueueSize, iSeverity);
+ objDestruct(pUsr);
+ ABORT_FINALIZE(RS_RET_QUEUE_FULL);
+ } else {
+ dbgprintf("Queue 0x%lx: queue nearly full (%d entries), but could not drop msg "
+ "(iRet: %d, severity %d)\n", queueGetID(pThis), pThis->iQueueSize,
+ iRetLocal, iSeverity);
+ }
+ }
+
while(pThis->iQueueSize >= pThis->iMaxQueueSize) {
- dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", (unsigned long) pThis);
+ dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis));
queueTimeoutComp(&t, pThis->toEnq);
if(pthread_cond_timedwait (pThis->notFull,
pThis->mut, &t) != 0) {
- dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", (unsigned long) pThis);
+ dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", queueGetID(pThis));
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}