diff options
-rw-r--r-- | msg.c | 15 | ||||
-rw-r--r-- | obj.h | 1 | ||||
-rw-r--r-- | queue.c | 21 |
3 files changed, 35 insertions, 2 deletions
@@ -2142,6 +2142,20 @@ static rsRetVal MsgConstructFinalizer(msg_t *pThis) } +/* get the severity - this is an entry point that + * satisfies the base object class getSeverity semantics. + * rgerhards, 2008-01-14 + */ +static rsRetVal +MsgGetSeverity(obj_t *pThis, int *piSeverity) +{ + ISOBJ_TYPE_assert(pThis, Msg); + assert(piSeverity != NULL); + *piSeverity = ((msg_t*) pThis)->iSeverity; + return RS_RET_OK; +} + + /* Initialize the message class. Must be called as the very first method * before anything else is called inside this class. * rgerhards, 2008-01-04 @@ -2150,6 +2164,7 @@ BEGINObjClassInit(Msg, 1) OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, MsgSetProperty); OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, MsgConstructFinalizer); + OBJSetMethodHandler(objMethod_GETSEVERITY, MsgGetSeverity); /* initially, we have no need to lock message objects */ funcLock = MsgLockingDummy; funcUnlock = MsgLockingDummy; @@ -80,6 +80,7 @@ #endif #define objDestruct(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_DESTRUCT])(pThis) #define objSerialize(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_SERIALIZE]) +#define objGetSeverity(pThis, piSever) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_GETSEVERITY])(pThis, piSever) #define OBJSetMethodHandler(methodID, pHdlr) \ CHKiRet(objInfoSetMethod(pObjInfoOBJ, methodID, (rsRetVal (*)(void*)) pHdlr)) @@ -1051,6 +1051,8 @@ queueEnqObj(queue_t *pThis, void *pUsr) int iCancelStateSave; int i; struct timespec t; + int iSeverity = 8; + rsRetVal iRetLocal; assert(pThis != NULL); @@ -1064,13 +1066,28 @@ queueEnqObj(queue_t *pThis, void *pUsr) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); } + + if(pThis->iQueueSize >= pThis->iDiscardMrk) { + iRetLocal = objGetSeverity(pUsr, &iSeverity); + if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { + dbgprintf("Queue 0x%lx: queue nearly full (%d entries), discarded severity %d message\n", + queueGetID(pThis), pThis->iQueueSize, iSeverity); + objDestruct(pUsr); + ABORT_FINALIZE(RS_RET_QUEUE_FULL); + } else { + dbgprintf("Queue 0x%lx: queue nearly full (%d entries), but could not drop msg " + "(iRet: %d, severity %d)\n", queueGetID(pThis), pThis->iQueueSize, + iRetLocal, iSeverity); + } + } + while(pThis->iQueueSize >= pThis->iMaxQueueSize) { - dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", (unsigned long) pThis); + dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis)); queueTimeoutComp(&t, pThis->toEnq); if(pthread_cond_timedwait (pThis->notFull, pThis->mut, &t) != 0) { - dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", (unsigned long) pThis); + dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", queueGetID(pThis)); objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } |