summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--queue.c48
1 files changed, 13 insertions, 35 deletions
diff --git a/queue.c b/queue.c
index 4291c509..9003b344 100644
--- a/queue.c
+++ b/queue.c
@@ -1,3 +1,4 @@
+ // DA-input only
// TODO: "preforked" worker threads
// TODO: do an if(debug) in dbgrintf - performanc ein release build!
// TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in
@@ -332,7 +333,7 @@ queueTurnOffDAMode(queue_t *pThis)
pthread_mutex_destroy(&pThis->mutDA);
pthread_cond_destroy(&pThis->condDA);
- queueTellWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */
+ queueTellActWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */
dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
queueGetID(pThis), iRet);
@@ -559,6 +560,7 @@ queueInitDA(queue_t *pThis)
pThis->qRunsDA = QRUNS_DA_INIT;
/* now we must start our DA worker thread - it does the rest of the initialization */
+ // DA-input only mode!
iRet = queueStrtWrkThrd(pThis, 0);
return iRet;
@@ -764,22 +766,6 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr)
/* -------------------- disk -------------------- */
-/* This method checks if there is any persistent information on the
- * queue.
- */
-#if 0
-static rsRetVal
-queueTryLoadPersistedInfo(queue_t *pThis)
-{
- DEFiRet;
- strm_t *psQIF = NULL;
- uchar pszQIFNam[MAXFNAME];
- size_t lenQIFNam;
- struct stat stat_buf;
-}
-#endif
-
-
static rsRetVal
queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t *pThis)
{
@@ -850,7 +836,6 @@ queueTryLoadPersistedInfo(queue_t *pThis)
(char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
/* check if the file exists */
-dbgprintf("stat '%s'\n", pszQIFNam);
if(stat((char*) pszQIFNam, &stat_buf) == -1) {
if(errno == ENOENT) {
dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis));
@@ -892,7 +877,7 @@ finalize_it:
strmDestruct(psQIF);
if(iRet != RS_RET_OK) {
- dbgprintf("Queue 0x%lx: error %d reading .qi file - can not start queue\n",
+ dbgprintf("Queue 0x%lx: error %d reading .qi file - can not read persisted info (if any)\n",
queueGetID(pThis), iRet);
}
@@ -1010,7 +995,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
assert(pThis != NULL);
- /* TODO: calling the consumer should go into its own function! -- rgerhards, 2008-01-05*/
+ /* calling the consumer is quite different here than it is from a worker thread */
iRetLocal = pThis->pConsumer(pUsr);
if(iRetLocal != RS_RET_OK)
dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n",
@@ -1083,6 +1068,7 @@ queueWrkThrdReqTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int bIncludeDAWrk)
{
DEFiRet;
+ // DA-input only
if(bIncludeDAWrk) {
queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */
queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
@@ -1203,15 +1189,6 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
}
}
- /* as we may have cancelled a thread, clean up our internal structure. All are
- * terminated now. For simplicity, we simply overwrite the states.
- */
- for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
- if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRD_STOPPED) {
- pThis->pWrkThrds[i].tCurrCmd = eWRKTHRD_TERMINATED;
- }
- }
-
dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
queueGetID(pThis), pThis->iQueueSize);
@@ -1319,6 +1296,10 @@ queueWorker(void *arg)
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
+ /* do some one-time initialization */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ pthread_mutex_lock(pThis->mut);
+
/* first find myself in the queue's thread table */
for(iMyThrdIndx = 0 ; iMyThrdIndx <= pThis->iNumWorkerThreads ; ++iMyThrdIndx)
if(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self())
@@ -1329,16 +1310,12 @@ dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis,
dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx);
- /* do some one-time initialization */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- pthread_mutex_lock(pThis->mut);
-
pThis->iCurNumWrkThrd++; /* tell the world there is one more worker */
if(iMyThrdIndx == 0) { /* are we the DA worker? */
if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */
/* if we could not init the DA queue, we have nothing to do, so shut down. */
- queueTellWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE);
+ queueTellActWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE);
}
}
@@ -1698,6 +1675,7 @@ rsRetVal queueDestruct(queue_t *pThis)
*/
if(pThis->qRunsDA != QRUNS_REGULAR)
queueWrkThrdReqTrm(pThis->pqDA, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0);
+ // DA-input only
/* then, terminate our own worker threads */
if(pThis->pWrkThrds != NULL) {
@@ -1855,7 +1833,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
queueChkPersist(pThis);
finalize_it:
- /* now activate the worker thread */
+ /* now awake sleeping worker threads */
if(pThis->pWrkThrds != NULL) {
pthread_mutex_unlock(pThis->mut);
i = pthread_cond_signal(pThis->notEmpty);