summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--queue.c276
-rw-r--r--queue.h11
-rw-r--r--stream.c2
3 files changed, 163 insertions, 126 deletions
diff --git a/queue.c b/queue.c
index 9bc39464..a33c677a 100644
--- a/queue.c
+++ b/queue.c
@@ -63,6 +63,7 @@ queueTellWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
{
DEFiRet;
+ dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx);
ISOBJ_TYPE_assert(pThis, queue);
assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads);
@@ -87,7 +88,7 @@ queueJoinWrkThrd(queue_t *pThis, int iIdx)
pThis->pWrkThrds[iIdx].tCurrCmd);
pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL);
pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRDCMD_NEVER_RAN; /* back to virgin... */
- dbgprintf("Queue 0x%lx: thread %d state %d, exited\n", queueGetID(pThis), iIdx,
+ dbgprintf("Queue 0x%lx: thread %d state %d, has exited\n", queueGetID(pThis), iIdx,
pThis->pWrkThrds[iIdx].tCurrCmd);
return iRet;
@@ -191,13 +192,15 @@ queueTurnOffDAMode(queue_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, queue);
- assert(pThis->bRunsDA == 1);
+ assert(pThis->qRunsDA != QRUNS_REGULAR);
- /* pull any data that we still need from the (child) disk queue... */
- pThis->pConsumer = pThis->pqDA->pConsumer; /* restore regular consumer */
+ /* if we need to pull any data that we still need from the (child) disk queue,
+ * now would be the time to do so. At present, we do not need this, but I'd like to
+ * keep that comment if future need arises.
+ */
queueStrtAllWrkThrds(pThis); /* restore our regular worker threads */
- pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
+ pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */
/* note: a disk queue alsways has a single worker and it alwas has the ID 1 */
queueTellWrkThrd(pThis->pqDA, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* tell the DA worker to terminate... */
@@ -205,8 +208,12 @@ queueTurnOffDAMode(queue_t *pThis)
queueJoinWrkThrd(pThis->pqDA, 1); /* ... and wait for the shutdown to happen */
queueDestruct(pThis->pqDA); /* and now we are ready to destruct the DA queue */
pThis->pqDA = NULL;
- queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */
+ /* now free the remaining resources */
+ pthread_mutex_destroy(&pThis->mutDA);
+ pthread_cond_destroy(&pThis->condDA);
+
+ queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */
dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
queueGetID(pThis), iRet);
@@ -233,9 +240,6 @@ queueChkWrkThrdChanges(queue_t *pThis)
case eWRKTHRDCMD_TERMINATED:
queueJoinWrkThrd(pThis, i);
break;
- case eWRKTHRDCMD_TURN_OFF_DA_MODE:
- queueTurnOffDAMode(pThis);
- break;
/* these cases just to satisfy the compiler, we do not act an them: */
case eWRKTHRDCMD_NEVER_RAN:
case eWRKTHRDCMD_RUN:
@@ -292,14 +296,14 @@ queueDAConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr)
ISOBJ_TYPE_assert(pThis, queue);
ISOBJ_assert(pUsr);
- assert(pThis->bRunsDA);
+ assert(pThis->qRunsDA != QRUNS_REGULAR);
dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, pThis->iQueueSize);
CHKiRet(queueEnqObj(pThis->pqDA, pUsr));
if(pThis->iQueueSize == pThis->iLowWtrMrk) {
- dbgprintf("Queue 0x%lx: %d entries - passed low water mark in DA mode, sleeping\n",
- queueGetID(pThis), pThis->iQueueSize);
+ dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n",
+ queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
dbgprintf("pre mutex lock (think about CLEANUP!)\n");
pthread_mutex_lock(&pThis->mutDA);
@@ -311,8 +315,8 @@ dbgprintf("condition returned\n");
* non-DA mode
*/
if(pThis->pqDA->iQueueSize == 0) {
- dbgprintf("Queue 0x%lx: %d entries - disk assisted child queue signaled it is empty\n",
- queueGetID(pThis), pThis->iQueueSize);
+ dbgprintf("Queue 0x%lx/w%d: %d entries - disk assisted child queue signaled it is empty\n",
+ queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
CHKiRet(queueTurnOffDAMode(pThis)); /* this also unlocks the mutex! */
} else {
pthread_mutex_unlock(&pThis->mutDA);
@@ -320,58 +324,29 @@ dbgprintf("condition returned\n");
dbgprintf("mutex unlocked (think about CLEANUP!)\n");
pthread_setcancelstate(iCancelStateSave, NULL);
}
-dbgprintf("DAConsumer returns\n");
finalize_it:
+dbgprintf("DAConsumer returns with iRet %d\n", iRet);
return iRet;
}
-/* check if we need to start disk assisted mode
- * rgerhards, 2008-01-14
+/* Start disk-assisted queue mode. All internal settings are changed. This is supposed
+ * to be called from the DA worker, which must have been started before. The most important
+ * chore of this function is to create the DA queue object. If that function fails,
+ * the DA worker should return with an appropriate state, which in turn should lead to
+ * a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this
+ * function is called, else a race on pThis->qRunsDA may happen.
+ * rgerhards, 2008-01-15
*/
static rsRetVal
-queueChkStrtDA(queue_t *pThis)
+queueStrtDA(queue_t *pThis)
{
DEFiRet;
- int iCancelStateSave;
ISOBJ_TYPE_assert(pThis, queue);
- if(pThis->bRunsDA) {
- if(pThis->iQueueSize < pThis->iHighWtrMrk)
- pThis->bWasBelowHighWtr = 1;
- else if(pThis->iQueueSize == pThis->iHighWtrMrk && pThis->bWasBelowHighWtr) {
- /* then we need to signal that we are at the high water mark again.*/
- dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n",
- queueGetID(pThis), pThis->iQueueSize);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- pthread_mutex_lock(&pThis->mutDA);
- pthread_cond_signal(&pThis->condDA);
- pthread_mutex_unlock(&pThis->mutDA);
- pthread_setcancelstate(iCancelStateSave, NULL);
- queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */
- }
- /* we need to re-check if we run disk-assisted, because that status may have changed
- * in our high water mark processing.
- */
- if(pThis->bRunsDA)
- FINALIZE;
- }
-
-
- /* we run into this part if we are NOT currently running DA.
- * TODO: split this function, I think that would make the code easier
- * to read. -- rgerhards, 2008-10-15
- */
- /* if we do not hit the high water mark, we have nothing to do */
- if(pThis->iQueueSize != pThis->iHighWtrMrk)
- ABORT_FINALIZE(RS_RET_OK);
-
- dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n",
- queueGetID(pThis), pThis->iQueueSize);
-
- /* set up sync objects for low water mark algo */
+ /* set up sync objects */
pthread_mutex_init(&pThis->mutDA, NULL);
pthread_cond_init(&pThis->condDA, NULL);
@@ -399,18 +374,14 @@ queueChkStrtDA(queue_t *pThis)
if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
FINALIZE; /* something is wrong */
- /* if we reach this point, we have a working disk queue
- * so we now need to change our consumer to utilize it.
- */
- pThis->bRunsDA = 1; /* and that's all we need to do - the worker handles the rest ;) */
- pThis->bWasBelowHighWtr = 0;/* init to be sure */
-
- /* now we must start our DA worker thread and shutdown all others */
- CHKiRet(queueStrtWrkThrd(pThis, 0));
+ /* tell our fellow workers to shut down */
CHKiRet(queueTellWrkThrds(pThis, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE));
+ pThis->qRunsDA = QRUNS_DA; /* we are now in DA mode! */
+
dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n",
queueGetID(pThis), queueGetID(pThis->pqDA));
+
finalize_it:
if(iRet != RS_RET_OK) {
if(pThis->pqDA != NULL) {
@@ -426,6 +397,60 @@ finalize_it:
}
+/* check if we need to start disk assisted mode and send some signals to
+ * keep it running if we are already in it.
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueChkStrtDA(queue_t *pThis)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+ /* if we do not hit the high water mark, we have nothing to do */
+ if(pThis->iQueueSize != pThis->iHighWtrMrk)
+ ABORT_FINALIZE(RS_RET_OK);
+
+ if(pThis->qRunsDA != QRUNS_REGULAR) {
+ /* then we need to signal that we are at the high water mark again. If that happens
+ * on our way down the queue, that doesn't matter, because then nobody is waiting
+ * on the condition variable.
+ */
+ dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n",
+ queueGetID(pThis), pThis->iQueueSize);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ pthread_mutex_lock(&pThis->mutDA);
+ pthread_cond_signal(&pThis->condDA);
+ pthread_mutex_unlock(&pThis->mutDA);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */
+
+ /* we need to re-check if we run disk-assisted, because that status may have changed
+ * in our high water mark processing.
+ */
+ if(pThis->qRunsDA != QRUNS_REGULAR)
+ FINALIZE;
+ }
+
+ /* if we reach this point, we are NOT currently running in DA mode.
+ * TODO: split this function, I think that would make the code easier
+ * to read. -- rgerhards, 2008-10-15
+ */
+ dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n",
+ queueGetID(pThis), pThis->iQueueSize);
+
+ pThis->qRunsDA = QRUNS_DA_INIT; /* indicate we now run in DA mode - this is reset by the DA worker if it fails */
+
+ /* now we must start our DA worker thread - it does the rest of the initialization */
+ CHKiRet(queueStrtWrkThrd(pThis, 0));
+
+finalize_it:
+ return iRet;
+}
+
+
/* --------------- end code for disk-assisted queue modes -------------------- */
@@ -865,7 +890,7 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
/* awake them... */
pthread_cond_broadcast(pThis->notEmpty);
dbgprintf("queueWrkThrdTrm broadcasted notEmpty\n");
- if(pThis->bRunsDA) /* if running disk-assisted, workers may wait on that condition, too */
+ if(pThis->qRunsDA != QRUNS_REGULAR) /* if running disk-assisted, workers may wait on that condition, too */
pthread_cond_broadcast(&pThis->condDA);
/* get timeout */
@@ -905,7 +930,7 @@ queueWrkThrdCancel(queue_t *pThis)
/* awake the workers one more time, just to be sure */
pthread_cond_broadcast(pThis->notEmpty);
- if(pThis->bRunsDA) /* if running disk-assisted, workers may wait on that condition, too */
+ if(pThis->qRunsDA != QRUNS_REGULAR) /* if running disk-assisted, workers may wait on that condition, too */
pthread_cond_broadcast(&pThis->condDA);
/* first tell the workers our request */
@@ -983,18 +1008,45 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* This is a helper for queueWorker() it either calls the configured
- * consumer or the DA-consumer (if in disk-assisted mode). It is NOT
- * protected by the queue mutex.
+ * consumer or the DA-consumer (if in disk-assisted mode). It is
+ * protected by the queue mutex, but MUST release it as soon as possible.
+ * Most importantly, it must release it before the consumer is called.
* rgerhards, 2008-01-14
*/
static inline rsRetVal
-queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr)
+queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave)
{
DEFiRet;
rsRetVal iRetLocal;
int iSeverity;
+ void *pUsr;
+ int qRunsDA;
+
+
+ /* first check if we have still something to process */
+ if(pThis->iQueueSize == 0) {
+ pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ FINALIZE;
+ }
+
+ /* dequeue element (still protected from mutex) */
+ iRet = queueDel(pThis, &pUsr);
+ queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
+ qRunsDA = pThis->qRunsDA; /* do a local copy so that we prevent a race after mutex release */
+ pthread_mutex_unlock(pThis->mut);
+ pthread_cond_signal(pThis->notFull);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ /* do actual processing (the lengthy part, runs in parallel)
+ * If we had a problem while dequeing, we do not call the consumer,
+ * but we otherwise ignore it. This is in the hopes that it will be
+ * self-healing. However, this is really not a good thing.
+ * rgerhards, 2008-01-03
+ */
+ if(iRet != RS_RET_OK)
+ FINALIZE;
- if(pThis->bRunsDA) {
+ if(qRunsDA == QRUNS_DA) {
queueDAConsumer(pThis, iMyThrdIndx, pUsr);
} else {
/* we are running in normal, non-disk-assisted mode */
@@ -1017,7 +1069,12 @@ queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr)
}
}
-dbgprintf("CallConsumer returns\n");
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things "
+ "may happen\n", queueGetID(pThis), iMyThrdIndx, iRet);
+ }
+dbgprintf("CallConsumer returns %d\n", iRet);
return iRet;
}
@@ -1029,18 +1086,12 @@ dbgprintf("CallConsumer returns\n");
static void *
queueWorker(void *arg)
{
- DEFiRet;
queue_t *pThis = (queue_t*) arg;
- void *pUsr;
sigset_t sigSet;
int iMyThrdIndx; /* index for this thread in queue thread table */
int iCancelStateSave;
- int bInitialEmpty = 1; /* if running as a DA child, we do NOT need to signal the parent
- * on the first occasion we are empty (because that happens on every
- * startup. This var keeps track of the state.
- */
- assert(pThis != NULL);
+ ISOBJ_TYPE_assert(pThis, queue);
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
@@ -1053,12 +1104,22 @@ queueWorker(void *arg)
dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx);
- /* tell the world there is one more worker */
+ /* do some one-time initialization */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
- pThis->iCurNumWrkThrd++;
+
+ pThis->iCurNumWrkThrd++; /* tell the world there is one more worker */
+
+ if(iMyThrdIndx == 0) { /* are we the DA worker? */
+ if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */
+ /* if we could not init the DA queue, we have nothing to do, so shut down. */
+ queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);
+ }
+ }
+
pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
+ /* end one-time stuff */
/* now we have our identity, on to real processing */
while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN
@@ -1069,22 +1130,17 @@ queueWorker(void *arg)
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
queueGetID(pThis), iMyThrdIndx);
if(pThis->bSignalOnEmpty) {
- if(bInitialEmpty == 1) {
- /* ignore */
- bInitialEmpty = 0;
- } else {
- /* we need to signal our parent queue that we are empty */
+ /* we need to signal our parent queue that we are empty */
dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx);
- pthread_mutex_lock(pThis->mutSignalOnEmpty);
- pthread_cond_signal(pThis->condSignalOnEmpty);
- pthread_mutex_unlock(pThis->mutSignalOnEmpty);
+ pthread_mutex_lock(pThis->mutSignalOnEmpty);
+ pthread_cond_signal(pThis->condSignalOnEmpty);
+ pthread_mutex_unlock(pThis->mutSignalOnEmpty);
dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx);
- /* we now need to re-check if we still shall continue to
- * run. This is important because the parent may have changed our
- * state. So we simply go back to the begin of the loop.
- */
- continue;
- }
+ /* we now need to re-check if we still shall continue to
+ * run. This is important because the parent may have changed our
+ * state. So we simply go back to the begin of the loop.
+ */
+ continue;
}
/* If we arrive here, we have the regular case, where we can safely assume that
* iQueueSize and tCmd have not changed since the while().
@@ -1093,35 +1149,13 @@ dbgprintf("Queue %p/w%d: pre condwait ->notEmpty\n", pThis, iMyThrdIndx);
pthread_cond_wait(pThis->notEmpty, pThis->mut);
dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx);
}
- if(pThis->iQueueSize > 0) {
- /* dequeue element (still protected from mutex) */
- iRet = queueDel(pThis, &pUsr);
- queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
- pthread_mutex_unlock(pThis->mut);
- pthread_cond_signal(pThis->notFull);
- pthread_setcancelstate(iCancelStateSave, NULL);
- /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
- * a cancellation point in itself. As we run most of the time without cancel enabled, I fear
- * we may never get cancelled if we do not create a cancellation point ourselfs.
- */
- pthread_testcancel();
- /* do actual processing (the lengthy part, runs in parallel)
- * If we had a problem while dequeing, we do not call the consumer,
- * but we otherwise ignore it. This is in the hopes that it will be
- * self-healing. However, this is really not a good thing.
- * rgerhards, 2008-01-03
- */
- if(iRet == RS_RET_OK) {
- queueWorkerCallConsumer(pThis, iMyThrdIndx, pUsr);
- } else {
- dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things "
- "may happen\n", queueGetID(pThis), iMyThrdIndx, iRet);
- }
- } else { /* the mutex must be unlocked in any case (important for termination) */
- pthread_mutex_unlock(pThis->mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
- }
+ queueWorkerCallConsumer(pThis, iMyThrdIndx, iCancelStateSave);
+ /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
+ * a cancellation point in itself. As we run most of the time without cancel enabled, I fear
+ * we may never get cancelled if we do not create a cancellation point ourselfs.
+ */
+ pthread_testcancel();
/* We now yield to give the other threads a chance to obtain the mutex. If we do not
* do that, this thread may very well aquire the mutex again before another thread
* has even a chance to run. The reason is that mutex operations are free to be
@@ -1404,7 +1438,7 @@ rsRetVal queueDestruct(queue_t *pThis)
}
/* if running DA, terminate disk queue */
- if(pThis->bRunsDA)
+ if(pThis->qRunsDA != QRUNS_REGULAR)
queueDestruct(pThis->pqDA);
/* persist the queue (we always do that - queuePersits() does cleanup it the queue is empty) */
@@ -1495,7 +1529,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
int iSeverity = 8;
rsRetVal iRetLocal;
- assert(pThis != NULL);
+ ISOBJ_TYPE_assert(pThis, queue);
/* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
diff --git a/queue.h b/queue.h
index e29cf7da..503edd38 100644
--- a/queue.h
+++ b/queue.h
@@ -66,8 +66,7 @@ typedef enum {
/* ALL active states MUST be numerically higher than eWRKTHRDCMD_TERMINATED and NONE must be lower! */
eWRKTHRDCMD_RUN = 2,
eWRKTHRDCMD_SHUTDOWN = 3,
- eWRKTHRDCMD_SHUTDOWN_IMMEDIATE = 4,
- eWRKTHRDCMD_TURN_OFF_DA_MODE = 5
+ eWRKTHRDCMD_SHUTDOWN_IMMEDIATE = 4
} qWrkCmd_t; /* commands for queue worker threads */
typedef struct qWrkThrd_s {
@@ -87,7 +86,7 @@ typedef struct queue_s {
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
int iHighWtrMrk; /* high water mark for disk-assisted memory queues */
- int bWasBelowHighWtr;/* when running in DA mode: queue was below high wtr mrk at least once */
+ //int bWasBelowHighWtr;/* when running in DA mode: queue was below high wtr mrk at least once */
int iLowWtrMrk; /* low water mark for disk-assisted memory queues */
int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */
int iDiscardSeverity;/* messages of this severity above are discarded on too-full queue */
@@ -123,7 +122,11 @@ typedef struct queue_s {
int iNumberFiles; /* how many files make up the queue? */
size_t iMaxFileSize; /* max size for a single queue file */
int bIsDA; /* is this queue disk assisted? */
- int bRunsDA; /* is this queue actually *running* disk assisted? */
+ enum {
+ QRUNS_REGULAR,
+ QRUNS_DA_INIT,
+ QRUNS_DA
+ } qRunsDA; /* is this queue actually *running* disk assisted? if so, which mode? */
pthread_mutex_t mutDA; /* mutex for low water mark algo */
pthread_cond_t condDA; /* and its matching condition */
struct queue_s *pqDA; /* queue for disk-assisted modes */
diff --git a/stream.c b/stream.c
index 8ff66217..27fc8a41 100644
--- a/stream.c
+++ b/stream.c
@@ -389,7 +389,7 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
iWritten = write(pThis->fd, pBuf, lenBuf);
dbgprintf("Stream 0x%lx: file %d write wrote %d bytes, errno: %d\n", (unsigned long) pThis,
- iWritten, pThis->fd, errno);
+ pThis->fd, iWritten, errno);
/* TODO: handle error case -- rgerhards, 2008-01-07 */
/* Now indicate buffer empty again. We do this in any case, because there