summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c29
1 files changed, 8 insertions, 21 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 24fcee10..00f811a0 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -815,8 +815,8 @@ static rsRetVal qConstructDisk(queue_t *pThis)
* for example file name generation must not be changed as that would break the
* ability to read existing queue files. -- rgerhards, 2008-01-12
*/
-CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize));
-CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize));
+ CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize));
+ CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize));
finalize_it:
RETiRet;
@@ -849,6 +849,12 @@ static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
pThis->tVars.disk.sizeOnDisk += nWriteCount;
+ /* we have enqueued the user element to disk. So we now need to destruct
+ * the in-memory representation. The instance will be re-created upon
+ * dequeue. -- rgerhards, 2008-07-09
+ */
+ objDestruct(pUsr);
+
dbgoprint((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n",
nWriteCount, pThis->tVars.disk.sizeOnDisk);
@@ -2080,10 +2086,6 @@ finalize_it:
/* enqueue a new user data element
* Enqueues the new element and awakes worker thread.
- * TODO: this code still uses the "discard if queue full" approach from
- * the main queue. This needs to be reconsidered or, better, done via a
- * caller-selectable parameter mode. For the time being, I leave it in.
- * rgerhards, 2008-01-03
*/
rsRetVal
queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
@@ -2164,21 +2166,6 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr)
}
}
-#if 0 // previous code, remove when done with advanced flow control
- /* wait for the queue to be ready... */
- while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize)
- || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
- && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
- timeoutComp(&t, pThis->toEnq);
- if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
- dbgoprint((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
- objDestruct(pUsr);
- ABORT_FINALIZE(RS_RET_QUEUE_FULL);
- }
- }
-#endif
-
/* and finally enqueue the message */
CHKiRet(queueAdd(pThis, pUsr));
queueChkPersist(pThis);