diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 29 |
1 files changed, 8 insertions, 21 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 24fcee10..00f811a0 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -815,8 +815,8 @@ static rsRetVal qConstructDisk(queue_t *pThis) * for example file name generation must not be changed as that would break the * ability to read existing queue files. -- rgerhards, 2008-01-12 */ -CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize)); -CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize)); + CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize)); + CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize)); finalize_it: RETiRet; @@ -849,6 +849,12 @@ static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) pThis->tVars.disk.sizeOnDisk += nWriteCount; + /* we have enqueued the user element to disk. So we now need to destruct + * the in-memory representation. The instance will be re-created upon + * dequeue. -- rgerhards, 2008-07-09 + */ + objDestruct(pUsr); + dbgoprint((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n", nWriteCount, pThis->tVars.disk.sizeOnDisk); @@ -2080,10 +2086,6 @@ finalize_it: /* enqueue a new user data element * Enqueues the new element and awakes worker thread. - * TODO: this code still uses the "discard if queue full" approach from - * the main queue. This needs to be reconsidered or, better, done via a - * caller-selectable parameter mode. For the time being, I leave it in. - * rgerhards, 2008-01-03 */ rsRetVal queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) @@ -2164,21 +2166,6 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) } } -#if 0 // previous code, remove when done with advanced flow control - /* wait for the queue to be ready... */ - while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) - || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0 - && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { - dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); - timeoutComp(&t, pThis->toEnq); - if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { - dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); - objDestruct(pUsr); - ABORT_FINALIZE(RS_RET_QUEUE_FULL); - } - } -#endif - /* and finally enqueue the message */ CHKiRet(queueAdd(pThis, pUsr)); queueChkPersist(pThis); |