diff options
-rw-r--r-- | queue.c | 276 | ||||
-rw-r--r-- | queue.h | 11 | ||||
-rw-r--r-- | stream.c | 2 |
3 files changed, 163 insertions, 126 deletions
@@ -63,6 +63,7 @@ queueTellWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) { DEFiRet; + dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx); ISOBJ_TYPE_assert(pThis, queue); assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); @@ -87,7 +88,7 @@ queueJoinWrkThrd(queue_t *pThis, int iIdx) pThis->pWrkThrds[iIdx].tCurrCmd); pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL); pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRDCMD_NEVER_RAN; /* back to virgin... */ - dbgprintf("Queue 0x%lx: thread %d state %d, exited\n", queueGetID(pThis), iIdx, + dbgprintf("Queue 0x%lx: thread %d state %d, has exited\n", queueGetID(pThis), iIdx, pThis->pWrkThrds[iIdx].tCurrCmd); return iRet; @@ -191,13 +192,15 @@ queueTurnOffDAMode(queue_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, queue); - assert(pThis->bRunsDA == 1); + assert(pThis->qRunsDA != QRUNS_REGULAR); - /* pull any data that we still need from the (child) disk queue... */ - pThis->pConsumer = pThis->pqDA->pConsumer; /* restore regular consumer */ + /* if we need to pull any data that we still need from the (child) disk queue, + * now would be the time to do so. At present, we do not need this, but I'd like to + * keep that comment if future need arises. + */ queueStrtAllWrkThrds(pThis); /* restore our regular worker threads */ - pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ + pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */ /* note: a disk queue alsways has a single worker and it alwas has the ID 1 */ queueTellWrkThrd(pThis->pqDA, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* tell the DA worker to terminate... */ @@ -205,8 +208,12 @@ queueTurnOffDAMode(queue_t *pThis) queueJoinWrkThrd(pThis->pqDA, 1); /* ... and wait for the shutdown to happen */ queueDestruct(pThis->pqDA); /* and now we are ready to destruct the DA queue */ pThis->pqDA = NULL; - queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */ + /* now free the remaining resources */ + pthread_mutex_destroy(&pThis->mutDA); + pthread_cond_destroy(&pThis->condDA); + + queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */ dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n", queueGetID(pThis), iRet); @@ -233,9 +240,6 @@ queueChkWrkThrdChanges(queue_t *pThis) case eWRKTHRDCMD_TERMINATED: queueJoinWrkThrd(pThis, i); break; - case eWRKTHRDCMD_TURN_OFF_DA_MODE: - queueTurnOffDAMode(pThis); - break; /* these cases just to satisfy the compiler, we do not act an them: */ case eWRKTHRDCMD_NEVER_RAN: case eWRKTHRDCMD_RUN: @@ -292,14 +296,14 @@ queueDAConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr) ISOBJ_TYPE_assert(pThis, queue); ISOBJ_assert(pUsr); - assert(pThis->bRunsDA); + assert(pThis->qRunsDA != QRUNS_REGULAR); dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, pThis->iQueueSize); CHKiRet(queueEnqObj(pThis->pqDA, pUsr)); if(pThis->iQueueSize == pThis->iLowWtrMrk) { - dbgprintf("Queue 0x%lx: %d entries - passed low water mark in DA mode, sleeping\n", - queueGetID(pThis), pThis->iQueueSize); + dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n", + queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); dbgprintf("pre mutex lock (think about CLEANUP!)\n"); pthread_mutex_lock(&pThis->mutDA); @@ -311,8 +315,8 @@ dbgprintf("condition returned\n"); * non-DA mode */ if(pThis->pqDA->iQueueSize == 0) { - dbgprintf("Queue 0x%lx: %d entries - disk assisted child queue signaled it is empty\n", - queueGetID(pThis), pThis->iQueueSize); + dbgprintf("Queue 0x%lx/w%d: %d entries - disk assisted child queue signaled it is empty\n", + queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); CHKiRet(queueTurnOffDAMode(pThis)); /* this also unlocks the mutex! */ } else { pthread_mutex_unlock(&pThis->mutDA); @@ -320,58 +324,29 @@ dbgprintf("condition returned\n"); dbgprintf("mutex unlocked (think about CLEANUP!)\n"); pthread_setcancelstate(iCancelStateSave, NULL); } -dbgprintf("DAConsumer returns\n"); finalize_it: +dbgprintf("DAConsumer returns with iRet %d\n", iRet); return iRet; } -/* check if we need to start disk assisted mode - * rgerhards, 2008-01-14 +/* Start disk-assisted queue mode. All internal settings are changed. This is supposed + * to be called from the DA worker, which must have been started before. The most important + * chore of this function is to create the DA queue object. If that function fails, + * the DA worker should return with an appropriate state, which in turn should lead to + * a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this + * function is called, else a race on pThis->qRunsDA may happen. + * rgerhards, 2008-01-15 */ static rsRetVal -queueChkStrtDA(queue_t *pThis) +queueStrtDA(queue_t *pThis) { DEFiRet; - int iCancelStateSave; ISOBJ_TYPE_assert(pThis, queue); - if(pThis->bRunsDA) { - if(pThis->iQueueSize < pThis->iHighWtrMrk) - pThis->bWasBelowHighWtr = 1; - else if(pThis->iQueueSize == pThis->iHighWtrMrk && pThis->bWasBelowHighWtr) { - /* then we need to signal that we are at the high water mark again.*/ - dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", - queueGetID(pThis), pThis->iQueueSize); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - pthread_mutex_lock(&pThis->mutDA); - pthread_cond_signal(&pThis->condDA); - pthread_mutex_unlock(&pThis->mutDA); - pthread_setcancelstate(iCancelStateSave, NULL); - queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */ - } - /* we need to re-check if we run disk-assisted, because that status may have changed - * in our high water mark processing. - */ - if(pThis->bRunsDA) - FINALIZE; - } - - - /* we run into this part if we are NOT currently running DA. - * TODO: split this function, I think that would make the code easier - * to read. -- rgerhards, 2008-10-15 - */ - /* if we do not hit the high water mark, we have nothing to do */ - if(pThis->iQueueSize != pThis->iHighWtrMrk) - ABORT_FINALIZE(RS_RET_OK); - - dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n", - queueGetID(pThis), pThis->iQueueSize); - - /* set up sync objects for low water mark algo */ + /* set up sync objects */ pthread_mutex_init(&pThis->mutDA, NULL); pthread_cond_init(&pThis->condDA, NULL); @@ -399,18 +374,14 @@ queueChkStrtDA(queue_t *pThis) if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) FINALIZE; /* something is wrong */ - /* if we reach this point, we have a working disk queue - * so we now need to change our consumer to utilize it. - */ - pThis->bRunsDA = 1; /* and that's all we need to do - the worker handles the rest ;) */ - pThis->bWasBelowHighWtr = 0;/* init to be sure */ - - /* now we must start our DA worker thread and shutdown all others */ - CHKiRet(queueStrtWrkThrd(pThis, 0)); + /* tell our fellow workers to shut down */ CHKiRet(queueTellWrkThrds(pThis, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE)); + pThis->qRunsDA = QRUNS_DA; /* we are now in DA mode! */ + dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n", queueGetID(pThis), queueGetID(pThis->pqDA)); + finalize_it: if(iRet != RS_RET_OK) { if(pThis->pqDA != NULL) { @@ -426,6 +397,60 @@ finalize_it: } +/* check if we need to start disk assisted mode and send some signals to + * keep it running if we are already in it. + * rgerhards, 2008-01-14 + */ +static rsRetVal +queueChkStrtDA(queue_t *pThis) +{ + DEFiRet; + int iCancelStateSave; + + ISOBJ_TYPE_assert(pThis, queue); + + /* if we do not hit the high water mark, we have nothing to do */ + if(pThis->iQueueSize != pThis->iHighWtrMrk) + ABORT_FINALIZE(RS_RET_OK); + + if(pThis->qRunsDA != QRUNS_REGULAR) { + /* then we need to signal that we are at the high water mark again. If that happens + * on our way down the queue, that doesn't matter, because then nobody is waiting + * on the condition variable. + */ + dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", + queueGetID(pThis), pThis->iQueueSize); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + pthread_mutex_lock(&pThis->mutDA); + pthread_cond_signal(&pThis->condDA); + pthread_mutex_unlock(&pThis->mutDA); + pthread_setcancelstate(iCancelStateSave, NULL); + queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */ + + /* we need to re-check if we run disk-assisted, because that status may have changed + * in our high water mark processing. + */ + if(pThis->qRunsDA != QRUNS_REGULAR) + FINALIZE; + } + + /* if we reach this point, we are NOT currently running in DA mode. + * TODO: split this function, I think that would make the code easier + * to read. -- rgerhards, 2008-10-15 + */ + dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n", + queueGetID(pThis), pThis->iQueueSize); + + pThis->qRunsDA = QRUNS_DA_INIT; /* indicate we now run in DA mode - this is reset by the DA worker if it fails */ + + /* now we must start our DA worker thread - it does the rest of the initialization */ + CHKiRet(queueStrtWrkThrd(pThis, 0)); + +finalize_it: + return iRet; +} + + /* --------------- end code for disk-assisted queue modes -------------------- */ @@ -865,7 +890,7 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) /* awake them... */ pthread_cond_broadcast(pThis->notEmpty); dbgprintf("queueWrkThrdTrm broadcasted notEmpty\n"); - if(pThis->bRunsDA) /* if running disk-assisted, workers may wait on that condition, too */ + if(pThis->qRunsDA != QRUNS_REGULAR) /* if running disk-assisted, workers may wait on that condition, too */ pthread_cond_broadcast(&pThis->condDA); /* get timeout */ @@ -905,7 +930,7 @@ queueWrkThrdCancel(queue_t *pThis) /* awake the workers one more time, just to be sure */ pthread_cond_broadcast(pThis->notEmpty); - if(pThis->bRunsDA) /* if running disk-assisted, workers may wait on that condition, too */ + if(pThis->qRunsDA != QRUNS_REGULAR) /* if running disk-assisted, workers may wait on that condition, too */ pthread_cond_broadcast(&pThis->condDA); /* first tell the workers our request */ @@ -983,18 +1008,45 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* This is a helper for queueWorker() it either calls the configured - * consumer or the DA-consumer (if in disk-assisted mode). It is NOT - * protected by the queue mutex. + * consumer or the DA-consumer (if in disk-assisted mode). It is + * protected by the queue mutex, but MUST release it as soon as possible. + * Most importantly, it must release it before the consumer is called. * rgerhards, 2008-01-14 */ static inline rsRetVal -queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr) +queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave) { DEFiRet; rsRetVal iRetLocal; int iSeverity; + void *pUsr; + int qRunsDA; + + + /* first check if we have still something to process */ + if(pThis->iQueueSize == 0) { + pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + FINALIZE; + } + + /* dequeue element (still protected from mutex) */ + iRet = queueDel(pThis, &pUsr); + queueChkPersist(pThis); // when we support peek(), we must do this down after the del! + qRunsDA = pThis->qRunsDA; /* do a local copy so that we prevent a race after mutex release */ + pthread_mutex_unlock(pThis->mut); + pthread_cond_signal(pThis->notFull); + pthread_setcancelstate(iCancelStateSave, NULL); + /* do actual processing (the lengthy part, runs in parallel) + * If we had a problem while dequeing, we do not call the consumer, + * but we otherwise ignore it. This is in the hopes that it will be + * self-healing. However, this is really not a good thing. + * rgerhards, 2008-01-03 + */ + if(iRet != RS_RET_OK) + FINALIZE; - if(pThis->bRunsDA) { + if(qRunsDA == QRUNS_DA) { queueDAConsumer(pThis, iMyThrdIndx, pUsr); } else { /* we are running in normal, non-disk-assisted mode */ @@ -1017,7 +1069,12 @@ queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr) } } -dbgprintf("CallConsumer returns\n"); +finalize_it: + if(iRet != RS_RET_OK) { + dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things " + "may happen\n", queueGetID(pThis), iMyThrdIndx, iRet); + } +dbgprintf("CallConsumer returns %d\n", iRet); return iRet; } @@ -1029,18 +1086,12 @@ dbgprintf("CallConsumer returns\n"); static void * queueWorker(void *arg) { - DEFiRet; queue_t *pThis = (queue_t*) arg; - void *pUsr; sigset_t sigSet; int iMyThrdIndx; /* index for this thread in queue thread table */ int iCancelStateSave; - int bInitialEmpty = 1; /* if running as a DA child, we do NOT need to signal the parent - * on the first occasion we are empty (because that happens on every - * startup. This var keeps track of the state. - */ - assert(pThis != NULL); + ISOBJ_TYPE_assert(pThis, queue); sigfillset(&sigSet); pthread_sigmask(SIG_BLOCK, &sigSet, NULL); @@ -1053,12 +1104,22 @@ queueWorker(void *arg) dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx); - /* tell the world there is one more worker */ + /* do some one-time initialization */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); - pThis->iCurNumWrkThrd++; + + pThis->iCurNumWrkThrd++; /* tell the world there is one more worker */ + + if(iMyThrdIndx == 0) { /* are we the DA worker? */ + if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */ + /* if we could not init the DA queue, we have nothing to do, so shut down. */ + queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE); + } + } + pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); + /* end one-time stuff */ /* now we have our identity, on to real processing */ while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN @@ -1069,22 +1130,17 @@ queueWorker(void *arg) dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", queueGetID(pThis), iMyThrdIndx); if(pThis->bSignalOnEmpty) { - if(bInitialEmpty == 1) { - /* ignore */ - bInitialEmpty = 0; - } else { - /* we need to signal our parent queue that we are empty */ + /* we need to signal our parent queue that we are empty */ dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); - pthread_mutex_lock(pThis->mutSignalOnEmpty); - pthread_cond_signal(pThis->condSignalOnEmpty); - pthread_mutex_unlock(pThis->mutSignalOnEmpty); + pthread_mutex_lock(pThis->mutSignalOnEmpty); + pthread_cond_signal(pThis->condSignalOnEmpty); + pthread_mutex_unlock(pThis->mutSignalOnEmpty); dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx); - /* we now need to re-check if we still shall continue to - * run. This is important because the parent may have changed our - * state. So we simply go back to the begin of the loop. - */ - continue; - } + /* we now need to re-check if we still shall continue to + * run. This is important because the parent may have changed our + * state. So we simply go back to the begin of the loop. + */ + continue; } /* If we arrive here, we have the regular case, where we can safely assume that * iQueueSize and tCmd have not changed since the while(). @@ -1093,35 +1149,13 @@ dbgprintf("Queue %p/w%d: pre condwait ->notEmpty\n", pThis, iMyThrdIndx); pthread_cond_wait(pThis->notEmpty, pThis->mut); dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); } - if(pThis->iQueueSize > 0) { - /* dequeue element (still protected from mutex) */ - iRet = queueDel(pThis, &pUsr); - queueChkPersist(pThis); // when we support peek(), we must do this down after the del! - pthread_mutex_unlock(pThis->mut); - pthread_cond_signal(pThis->notFull); - pthread_setcancelstate(iCancelStateSave, NULL); - /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is - * a cancellation point in itself. As we run most of the time without cancel enabled, I fear - * we may never get cancelled if we do not create a cancellation point ourselfs. - */ - pthread_testcancel(); - /* do actual processing (the lengthy part, runs in parallel) - * If we had a problem while dequeing, we do not call the consumer, - * but we otherwise ignore it. This is in the hopes that it will be - * self-healing. However, this is really not a good thing. - * rgerhards, 2008-01-03 - */ - if(iRet == RS_RET_OK) { - queueWorkerCallConsumer(pThis, iMyThrdIndx, pUsr); - } else { - dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things " - "may happen\n", queueGetID(pThis), iMyThrdIndx, iRet); - } - } else { /* the mutex must be unlocked in any case (important for termination) */ - pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - } + queueWorkerCallConsumer(pThis, iMyThrdIndx, iCancelStateSave); + /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is + * a cancellation point in itself. As we run most of the time without cancel enabled, I fear + * we may never get cancelled if we do not create a cancellation point ourselfs. + */ + pthread_testcancel(); /* We now yield to give the other threads a chance to obtain the mutex. If we do not * do that, this thread may very well aquire the mutex again before another thread * has even a chance to run. The reason is that mutex operations are free to be @@ -1404,7 +1438,7 @@ rsRetVal queueDestruct(queue_t *pThis) } /* if running DA, terminate disk queue */ - if(pThis->bRunsDA) + if(pThis->qRunsDA != QRUNS_REGULAR) queueDestruct(pThis->pqDA); /* persist the queue (we always do that - queuePersits() does cleanup it the queue is empty) */ @@ -1495,7 +1529,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) int iSeverity = 8; rsRetVal iRetLocal; - assert(pThis != NULL); + ISOBJ_TYPE_assert(pThis, queue); /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE @@ -66,8 +66,7 @@ typedef enum { /* ALL active states MUST be numerically higher than eWRKTHRDCMD_TERMINATED and NONE must be lower! */ eWRKTHRDCMD_RUN = 2, eWRKTHRDCMD_SHUTDOWN = 3, - eWRKTHRDCMD_SHUTDOWN_IMMEDIATE = 4, - eWRKTHRDCMD_TURN_OFF_DA_MODE = 5 + eWRKTHRDCMD_SHUTDOWN_IMMEDIATE = 4 } qWrkCmd_t; /* commands for queue worker threads */ typedef struct qWrkThrd_s { @@ -87,7 +86,7 @@ typedef struct queue_s { int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ int iHighWtrMrk; /* high water mark for disk-assisted memory queues */ - int bWasBelowHighWtr;/* when running in DA mode: queue was below high wtr mrk at least once */ + //int bWasBelowHighWtr;/* when running in DA mode: queue was below high wtr mrk at least once */ int iLowWtrMrk; /* low water mark for disk-assisted memory queues */ int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */ int iDiscardSeverity;/* messages of this severity above are discarded on too-full queue */ @@ -123,7 +122,11 @@ typedef struct queue_s { int iNumberFiles; /* how many files make up the queue? */ size_t iMaxFileSize; /* max size for a single queue file */ int bIsDA; /* is this queue disk assisted? */ - int bRunsDA; /* is this queue actually *running* disk assisted? */ + enum { + QRUNS_REGULAR, + QRUNS_DA_INIT, + QRUNS_DA + } qRunsDA; /* is this queue actually *running* disk assisted? if so, which mode? */ pthread_mutex_t mutDA; /* mutex for low water mark algo */ pthread_cond_t condDA; /* and its matching condition */ struct queue_s *pqDA; /* queue for disk-assisted modes */ @@ -389,7 +389,7 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) iWritten = write(pThis->fd, pBuf, lenBuf); dbgprintf("Stream 0x%lx: file %d write wrote %d bytes, errno: %d\n", (unsigned long) pThis, - iWritten, pThis->fd, errno); + pThis->fd, iWritten, errno); /* TODO: handle error case -- rgerhards, 2008-01-07 */ /* Now indicate buffer empty again. We do this in any case, because there |