From ed0363210c34002e5cfbab553506573f5b8a13a5 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 17 Jan 2008 12:45:10 +0000 Subject: worked on threading --- queue.c | 381 +++++++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 294 insertions(+), 87 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 9003b344..2b241d82 100644 --- a/queue.c +++ b/queue.c @@ -1,6 +1,6 @@ - // DA-input only +// TODO: start up the correct num of workers when switching to non-DA mode // TODO: "preforked" worker threads -// TODO: do an if(debug) in dbgrintf - performanc ein release build! +// TODO: do an if(debug) in dbgrintf - performance in release build! // TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in // call consumer state. Facilitates retaining messages in queue until action could // be called! @@ -56,9 +56,61 @@ rsRetVal queueChkPersist(queue_t *pThis); static void *queueWorker(void *arg); static rsRetVal queueGetQueueSize(queue_t *pThis, int *piQueueSize); static rsRetVal queueChkWrkThrdChanges(queue_t *pThis); +static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly); /* methods */ +/* get the current worker state. For simplicity and speed, we have + * NOT used our regular calling interface this time. I hope that won't + * bite in the long term... -- rgerhards, 2008-01-17 + */ +static inline qWrkCmd_t +qWrkrGetState(qWrkThrd_t *pThis) +{ + assert(pThis != NULL); + return pThis->tCurrCmd; +} + + +/* send a command to a specific thread + */ +static rsRetVal +qWrkrSetState(qWrkThrd_t *pThis, qWrkCmd_t tCmd) +{ + DEFiRet; + + assert(pThis != NULL); + dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis->pQueue), tCmd, pThis->iThrd); + + /* change some admin structures */ + switch(tCmd) { + case eWRKTHRD_TERMINATING: + pthread_cond_destroy(&pThis->condInitDone); + dbgprintf("Queue 0x%lx/w%d: thread terminating with %d entries left in queue, %d workers running.\n", + queueGetID(pThis->pQueue), pThis->iThrd, pThis->pQueue->iQueueSize, + pThis->pQueue->iCurNumWrkThrd); + break; + case eWRKTHRD_RUN_CREATED: + pthread_cond_init(&pThis->condInitDone, NULL); + break; + case eWRKTHRD_RUN_INIT: + break; + case eWRKTHRD_RUNNING: + pthread_cond_signal(&pThis->condInitDone); + break; + /* these cases just to satisfy the compiler, we do (yet) not act an them: */ + case eWRKTHRD_STOPPED: + case eWRKTHRD_SHUTDOWN: + case eWRKTHRD_SHUTDOWN_IMMEDIATE: + /* DO NOTHING */ + break; + } + + pThis->tCurrCmd = tCmd; + + return iRet; +} + /* send a command to a specific active thread. If the thread is not * active, the command is not sent. */ @@ -70,9 +122,9 @@ queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) ISOBJ_TYPE_assert(pThis, queue); assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); - if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_INIT) { + if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_CREATED) { dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx); - pThis->pWrkThrds[iIdx].tCurrCmd = tCmd; + qWrkrSetState(&pThis->pWrkThrds[iIdx], tCmd); } else { dbgprintf("Queue 0x%lx: command %d NOT sent to inactive thread %d\n", queueGetID(pThis), tCmd, iIdx); } @@ -80,21 +132,60 @@ queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) return iRet; } -/* send a command to a specific thread - * TODO: check if we can run into trouble with inactive threads + +/* Finalize construction of a wWrkrThrd_t "object" + * rgerhards, 2008-01-17 */ static inline rsRetVal -queueTellWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) +qWrkrConstructFinalize(qWrkThrd_t *pThis, queue_t *pQueue, int i) { - DEFiRet; + assert(pThis != NULL); + ISOBJ_TYPE_assert(pQueue, queue); - dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx); - ISOBJ_TYPE_assert(pThis, queue); - assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); + dbgprintf("Queue 0x%lx: finalizing construction of worker %d instance data\n", queueGetID(pQueue), i); - pThis->pWrkThrds[iIdx].tCurrCmd = tCmd; + /* initialize our thread instance descriptor */ + pThis = pQueue->pWrkThrds + i; + pThis->pQueue = pQueue; + pThis->iThrd = i; + pThis->pUsr = NULL; - return iRet; + qWrkrSetState(pThis, eWRKTHRD_STOPPED); + + return RS_RET_OK; +} + + +/* initialize the qWrkThrd_t structure - this MUST be called right after + * startup of a worker thread. -- rgerhards, 2008-01-17 + */ +static inline rsRetVal +qWrkrInit(qWrkThrd_t **ppThis, queue_t *pQueue) +{ + qWrkThrd_t *pThis; + int i; + + assert(ppThis != NULL); + ISOBJ_TYPE_assert(pQueue, queue); + + /* find myself in the queue's thread table */ + for(i = 0 ; i <= pQueue->iNumWorkerThreads ; ++i) + if(pQueue->pWrkThrds[i].thrdID == pthread_self()) + break; +dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pQueue, + (unsigned) pQueue->pWrkThrds[i].thrdID, i, (unsigned) pthread_self()); + assert(pQueue->pWrkThrds[i].thrdID == pthread_self()); + + /* initialize our thread instance descriptor */ + pThis = pQueue->pWrkThrds + i; + pThis->pQueue = pQueue; + pThis->iThrd = i; + pThis->pUsr = NULL; + + *ppThis = pThis; + qWrkrSetState(pThis, eWRKTHRD_RUN_INIT); + + return RS_RET_OK; } @@ -112,9 +203,9 @@ queueJoinWrkThrd(queue_t *pThis, int iIdx) dbgprintf("Queue 0x%lx: thread %d state %d, waiting for exit\n", queueGetID(pThis), iIdx, pThis->pWrkThrds[iIdx].tCurrCmd); pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL); - pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRD_STOPPED; /* back to virgin... */ + qWrkrSetState(&pThis->pWrkThrds[iIdx], eWRKTHRD_STOPPED); /* back to virgin... */ pThis->pWrkThrds[iIdx].thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ - dbgprintf("Queue 0x%lx: thread %d state %d, has exited\n", queueGetID(pThis), iIdx, + dbgprintf("Queue 0x%lx: thread %d state %d, has stopped\n", queueGetID(pThis), iIdx, pThis->pWrkThrds[iIdx].tCurrCmd); return iRet; @@ -131,9 +222,9 @@ queueStrtWrkThrd(queue_t *pThis, int i) ISOBJ_TYPE_assert(pThis, queue); assert(i >= 0 && i <= pThis->iNumWorkerThreads); - assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRD_RUN_INIT); + assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRD_RUN_CREATED); - queueTellWrkThrd(pThis, i, eWRKTHRD_RUN_INIT); + qWrkrSetState(&pThis->pWrkThrds[i], eWRKTHRD_RUN_CREATED); iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); dbgprintf("Queue 0x%lx: starting Worker thread %x, index %d with state %d.\n", (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); @@ -164,7 +255,7 @@ queueStrtNewWrkThrd(queue_t *pThis) dbgprintf("Queue %p: search thrd tbl slot: i %d, CuccCmd %d\n", pThis, i, pThis->pWrkThrds[i].tCurrCmd); if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_STOPPED) { break; - } else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_INIT) { + } else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_CREATED) { iStartingUp = i; break; } @@ -176,7 +267,7 @@ dbgprintf("Queue %p: after thrd search: i %d, iStartingUp %d\n", pThis, i, iStar assert(i <= pThis->iNumWorkerThreads); /* now there must be a free spot, else something is really wrong! */ - queueTellWrkThrd(pThis, i, eWRKTHRD_RUN_INIT); + qWrkrSetState(&pThis->pWrkThrds[i], eWRKTHRD_RUN_CREATED); iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n", (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); @@ -206,7 +297,7 @@ queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd) /* tell the workers our request */ for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i) - if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATED) + if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) queueTellActWrkThrd(pThis, i, tCmd); return iRet; @@ -271,7 +362,7 @@ queueChkAndStrtWrk(queue_t *pThis) queueChkWrkThrdChanges(pThis); /* check if we need to start up another worker (only in regular mode) */ - if(pThis->qRunsDA == QRUNS_REGULAR) { + if(pThis->qRunsDA == QRUNS_REGULAR && pThis->bEnqOnly == 0) { if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) { dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qRunsDA: %d\n", pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA); @@ -320,14 +411,14 @@ queueTurnOffDAMode(queue_t *pThis) * messages come into the queue, we may be well off with a single worker. * rgerhards, 2008-01-16 */ - queueStrtNewWrkThrd(pThis); + if(pThis->bEnqOnly == 0) + queueStrtNewWrkThrd(pThis); pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, * this will be quick. */ - queueDestruct(pThis->pqDA); /* and now we are ready to destruct the DA queue */ - pThis->pqDA = NULL; + queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ /* now free the remaining resources */ pthread_mutex_destroy(&pThis->mutDA); @@ -358,11 +449,12 @@ queueChkWrkThrdChanges(queue_t *pThis) /* go through all threads (including DA thread) */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { switch(pThis->pWrkThrds[i].tCurrCmd) { - case eWRKTHRD_TERMINATED: + case eWRKTHRD_TERMINATING: queueJoinWrkThrd(pThis, i); break; /* these cases just to satisfy the compiler, we do not act an them: */ case eWRKTHRD_STOPPED: + case eWRKTHRD_RUN_CREATED: case eWRKTHRD_RUN_INIT: case eWRKTHRD_RUNNING: case eWRKTHRD_SHUTDOWN: @@ -428,7 +520,7 @@ dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, * Note that the child queue now in almost all cases is non-empty, because we just enqueued * a message. */ - if(iQueueSize <= pThis->iLowWtrMrk && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) { + if(iQueueSize <= pThis->iLowWtrMrk && iQueueSize != 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) { dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n", queueGetID(pThis), iMyThrdIndx, iQueueSize); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); @@ -494,6 +586,7 @@ queueStrtDA(queue_t *pThis) CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); + CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly)); CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); if(pThis->toQShutdown == 0) { @@ -536,8 +629,7 @@ queueStrtDA(queue_t *pThis) finalize_it: if(iRet != RS_RET_OK) { if(pThis->pqDA != NULL) { - queueDestruct(pThis->pqDA); - pThis->pqDA = NULL; + queueDestruct(&pThis->pqDA); } dbgprintf("Queue 0x%lx: error %d creating disk queue - giving up.\n", queueGetID(pThis), iRet); @@ -549,19 +641,24 @@ finalize_it: /* initiate DA mode + * param bEnqOnly tells if the disk queue is to be run in enqueue-only mode. This may + * be needed during shutdown of memory queues which need to be persisted to disk. * rgerhards, 2008-01-16 */ static inline rsRetVal -queueInitDA(queue_t *pThis) +queueInitDA(queue_t *pThis, int bEnqOnly) { DEFiRet; /* indicate we now run in DA mode - this is reset by the DA worker if it fails */ pThis->qRunsDA = QRUNS_DA_INIT; + pThis->bDAEnqOnly = bEnqOnly; - /* now we must start our DA worker thread - it does the rest of the initialization */ - // DA-input only mode! - iRet = queueStrtWrkThrd(pThis, 0); + /* now we must start our DA worker thread - it does the rest of the initialization + * In enqueue-only mode, we do not start any workers. + */ + if(pThis->bEnqOnly == 0) + iRet = queueStrtWrkThrd(pThis, 0); return iRet; } @@ -606,7 +703,7 @@ queueChkStrtDA(queue_t *pThis) dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n", queueGetID(pThis), pThis->iQueueSize); - queueInitDA(pThis); /* initiate DA mode */ + queueInitDA(pThis, QUEUE_MODE_ENQDEQ); /* initiate DA mode */ } finalize_it: @@ -800,7 +897,6 @@ queueHaveQIF(queue_t *pThis) (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); /* check if the file exists */ -dbgprintf("stat HaveQIF '%s'\n", pszQIFNam); if(stat((char*) pszQIFNam, &stat_buf) == -1) { if(errno == ENOENT) { dbgprintf("Queue 0x%lx: no .qi file found\n", queueGetID(pThis)); @@ -874,7 +970,7 @@ queueTryLoadPersistedInfo(queue_t *pThis) finalize_it: if(psQIF != NULL) - strmDestruct(psQIF); + strmDestruct(&psQIF); if(iRet != RS_RET_OK) { dbgprintf("Queue 0x%lx: error %d reading .qi file - can not read persisted info (if any)\n", @@ -949,8 +1045,8 @@ static rsRetVal qDestructDisk(queue_t *pThis) assert(pThis != NULL); - strmDestruct(pThis->tVars.disk.pWrite); - strmDestruct(pThis->tVars.disk.pRead); + strmDestruct(&pThis->tVars.disk.pWrite); + strmDestruct(&pThis->tVars.disk.pRead); if(pThis->pszSpoolDir != NULL) free(pThis->pszSpoolDir); @@ -1068,7 +1164,6 @@ queueWrkThrdReqTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int bIncludeDAWrk) { DEFiRet; - // DA-input only if(bIncludeDAWrk) { queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ @@ -1093,13 +1188,19 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) struct timespec t; queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ +dbgprintf("WrkThrdTrm 0\n"); queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ + /* race: must make sure all are running! */ +dbgprintf("WrkThrdTrm 1\n"); queueTimeoutComp(&t, iTimeout);/* get timeout */ +dbgprintf("WrkThrdTrm 2\n"); /* and wait for their termination */ pthread_mutex_lock(pThis->mut); bTimedOut = 0; +dbgprintf("WrkThrdTrm 3, thrds: %d\n", pThis->iCurNumWrkThrd); while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { +dbgprintf("WrkThrdTrm 4 to %d\n", bTimedOut); dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n", queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd); @@ -1128,12 +1229,15 @@ queueWrkThrdCancel(queue_t *pThis) // TODO: we need to implement peek(), without it (today!) we lose one message upon // worker cancellation! -- rgerhards, 2008-01-14 + /* process any pending thread requests so that we know who actually is still running */ + queueChkWrkThrdChanges(pThis); + /* awake the workers one more time, just to be sure */ queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ /* first tell the workers our request */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) - if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATED) { + if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) { dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i); pthread_cancel(pThis->pWrkThrds[i].thrdID); } @@ -1196,14 +1300,14 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) } -/* This is a helper for queueWorker() it either calls the configured +/* This is a helper for queueWorker () it either calls the configured * consumer or the DA-consumer (if in disk-assisted mode). It is * protected by the queue mutex, but MUST release it as soon as possible. * Most importantly, it must release it before the consumer is called. * rgerhards, 2008-01-14 */ static inline rsRetVal -queueWorkerChkAndCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave) +queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancelStateSave) { DEFiRet; rsRetVal iRetLocal; @@ -1211,7 +1315,12 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateS int iQueueSize; void *pUsr; int qRunsDA; + int iMyThrdIndx; + ISOBJ_TYPE_assert(pThis, queue); + assert(pWrkrInst != NULL); + + iMyThrdIndx = pWrkrInst->iThrd; /* first check if we have still something to process */ if(pThis->iQueueSize == 0 || @@ -1228,6 +1337,7 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateS queueChkPersist(pThis); // when we support peek(), we must do this down after the del! qRunsDA = pThis->qRunsDA; /* do a local copy so that we prevent a race after mutex release */ iQueueSize = pThis->iQueueSize; /* ... and the same for this property */ + pWrkrInst->pUsr = pUsr; /* save it for the cancel cleanup handler */ pthread_mutex_unlock(pThis->mut); pthread_cond_signal(pThis->notFull); pthread_setcancelstate(iCancelStateSave, NULL); @@ -1274,6 +1384,34 @@ dbgprintf("CallConsumer returns %d\n", iRet); } + +/* cancellation cleanup handler for queueWorker () + * Updates admin structure and frees ressources. + * rgerhards, 2008-01-16 + */ +static void queueWorkerCancelCleanup(void *arg) +{ + qWrkThrd_t *pWrkrInst = (qWrkThrd_t*) arg; + queue_t *pThis; + + assert(pWrkrInst != NULL); + ISOBJ_TYPE_assert(pWrkrInst->pQueue, queue); + pThis = pWrkrInst->pQueue; + + dbgprintf("Queue 0x%lx/w%d: cancelation cleanup handler called (NOT FULLY IMPLEMENTED, one msgs lost!)\n", + queueGetID(pThis), pWrkrInst->iThrd); + + pThis->iCurNumWrkThrd--; /* one worker less... */ + pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ + qWrkrSetState(&pThis->pWrkThrds[pWrkrInst->iThrd], eWRKTHRD_TERMINATING); + pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ + + /* TODO: re-enqueue the data element! */ + dbgprintf("Queue 0x%lx/w%d: thread CANCELED with %d entries left in queue, %d workers running.\n", + queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize, pThis->iCurNumWrkThrd); +} + + /* Each queue has at least one associated worker (consumer) thread. It will pull * the message from the queue and pass it to a user-defined function. * This function was provided on construction. It MUST be thread-safe. @@ -1290,6 +1428,7 @@ queueWorker(void *arg) struct timespec t; int iMyThrdIndx; /* index for this thread in queue thread table */ int iCancelStateSave; + qWrkThrd_t *pWrkrInst; /* for cleanup handler */ ISOBJ_TYPE_assert(pThis, queue); @@ -1300,17 +1439,13 @@ queueWorker(void *arg) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); - /* first find myself in the queue's thread table */ - for(iMyThrdIndx = 0 ; iMyThrdIndx <= pThis->iNumWorkerThreads ; ++iMyThrdIndx) - if(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()) - break; -dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis, - (unsigned) pThis->pWrkThrds[iMyThrdIndx].thrdID, iMyThrdIndx, (unsigned) pthread_self()); - assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()); + /* initialize our thread instance descriptor */ + qWrkrInit(&pWrkrInst, pThis); - dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx); + iMyThrdIndx = pWrkrInst->iThrd; pThis->iCurNumWrkThrd++; /* tell the world there is one more worker */ + dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx); if(iMyThrdIndx == 0) { /* are we the DA worker? */ if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */ @@ -1323,16 +1458,18 @@ dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis, * because someone may have requested us to shut down even before we got a chance to do * our init. That would be a bad race... -- rgerhards, 2008-01-16 */ - if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUN_INIT) - pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_RUNNING; /* we are running now! */ + if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT) + qWrkrSetState(pWrkrInst, eWRKTHRD_RUNNING); /* we are running now! */ + + pthread_cleanup_push(queueWorkerCancelCleanup, pWrkrInst); pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); /* end one-time stuff */ /* now we have our identity, on to real processing */ - while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING - || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN && pThis->iQueueSize > 0)) { + while( (qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING) + || (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN && pThis->iQueueSize > 0)) { pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); @@ -1340,7 +1477,7 @@ dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis, queueChkWrkThrdChanges(pThis); dbgprintf("Queue %p/w%d: pre empty queue, qsize %d\n", pThis, iMyThrdIndx, pThis->iQueueSize); - while(pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) { + while(pThis->iQueueSize == 0 && qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING) { dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", queueGetID(pThis), iMyThrdIndx); if(pThis->bSignalOnEmpty > 0) { @@ -1377,13 +1514,13 @@ dbgprintf("worker never times out!\n"); /* we use SHUTDOWN (and not SHUTDOWN_IMMEDIATE) so that the worker * does not terminate if in the mean time a new message arrived. */ - pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_SHUTDOWN; + qWrkrSetState(pWrkrInst, eWRKTHRD_SHUTDOWN); } } dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); } - queueWorkerChkAndCallConsumer(pThis, iMyThrdIndx, iCancelStateSave); + queueWorkerChkAndCallConsumer(pThis, pWrkrInst, iCancelStateSave); /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is * a cancellation point in itself. As we run most of the time without cancel enabled, I fear @@ -1406,7 +1543,7 @@ dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); pthread_yield(); dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n", queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd); - if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0) + if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0) dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has " " %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); } @@ -1415,17 +1552,21 @@ dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n", pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); pThis->iCurNumWrkThrd--; - if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN || - pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN_IMMEDIATE) { + pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ + pthread_cleanup_pop(0); /* remove cleanup handler */ + + /* if we ever need finalize_it, here would be the place for it! */ + if(qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN || + qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN_IMMEDIATE || + qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT || + qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_CREATED) { /* in shutdown case, we need to flag termination. All other commands * have a meaning to the thread harvester, so we can not overwrite them */ - pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_TERMINATED; +dbgprintf("Queue 0x%lx/w%d: setting termination state\n", queueGetID(pThis), iMyThrdIndx); + qWrkrSetState(pWrkrInst, eWRKTHRD_TERMINATING); } pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ - dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n", - queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd); pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); @@ -1517,20 +1658,27 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ DEFiRet; rsRetVal iRetLocal; int bInitialized = 0; /* is queue already initialized? */ + int i; assert(pThis != NULL); /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ - dbgprintf("Queue 0x%lx: type %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis), pThis->qType, - pThis->bIsDA, pThis->iMaxFileSize); + dbgprintf("Queue 0x%lx: type %d, enq-only %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis), + pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ + /* initialize worker thread instances + * TODO: move to separate function + */ if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + for(i = 0 ; i < pThis->iNumWorkerThreads + 1 ; ++i) { + qWrkrConstructFinalize(&pThis->pWrkThrds[i], pThis, i); + } if(pThis->bIsDA) { /* If we are disk-assisted, we need to check if there is a QIF file @@ -1541,7 +1689,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n", queueGetID(pThis)); - queueInitDA(pThis); /* initiate DA mode */ + queueInitDA(pThis, QUEUE_MODE_ENQDEQ); /* initiate DA mode */ bInitialized = 1; /* we are done */ } else { // TODO: use logerror? -- rgerhards, 2008-01-16 @@ -1552,12 +1700,10 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ if(!bInitialized) { dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis)); - /* worker 0 is reserved for disk-assisted mode, so do not start */ - queueTellWrkThrd(pThis, 0, eWRKTHRD_STOPPED); - /* fire up the worker threads */ // TODO: preforked workers! queueStrtAllWrkThrds(pThis); } + pThis->bQueueStarted = 1; finalize_it: return iRet; @@ -1634,7 +1780,7 @@ static rsRetVal queuePersist(queue_t *pThis) finalize_it: if(psQIF != NULL) - strmDestruct(psQIF); + strmDestruct(&psQIF); return iRet; } @@ -1661,44 +1807,60 @@ rsRetVal queueChkPersist(queue_t *pThis) /* destructor for the queue object */ -rsRetVal queueDestruct(queue_t *pThis) +rsRetVal queueDestruct(queue_t **ppThis) { + queue_t *pThis; DEFiRet; - assert(pThis != NULL); + assert(ppThis != NULL); + pThis = *ppThis; + ISOBJ_TYPE_assert(pThis, queue); - /* if running DA, tell the DA workers to shut down. This saves us some CPU cycles which - * we can use to persist the remaining in-memory data to disk quicker. -- rgerhads, 2008-01-16 - * TODO: we actually need to change the queue to an "input-only" mode, that also prevents - * startup of the thread again further down in the process. None of that really hurts, so we - * leave it for the time being. -- rgerhards, 2008-01-16 +pThis->bSaveOnShutdown = 1; // TODO: Test remove + /* if running DA, switch the DA queue to enqueue-only mode. That saves us some CPU cycles as + * its workers do no longer need to run. It also prevents longer-running actions to spring into + * existence while we are draining the main (memory) queue. -- rgerhads, 2008-01-16 */ - if(pThis->qRunsDA != QRUNS_REGULAR) - queueWrkThrdReqTrm(pThis->pqDA, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0); - // DA-input only + if(pThis->qRunsDA != QRUNS_REGULAR) { + queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */ + if(pThis->bSaveOnShutdown) + pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; + } /* then, terminate our own worker threads */ if(pThis->pWrkThrds != NULL) { queueShutdownWorkers(pThis); - free(pThis->pWrkThrds); - pThis->pWrkThrds = NULL; } - /* of we have now data left in in-memory queues, this data will be lost if we do not - * persist it to a disk queue. - * TODO: implement code rgerhards, 2008-01-16 + /* If we currently run in DA mode, the in-memory queue is already persisted to disk. + * If we are not in DA mode, we may have data left in in-memory queues, this data will + * be lost if we do not persist it to a disk queue. So, if configured to do so, we will + * now start DA mode just to drain our queue. -- rgerhards, 2008-01-16 + * TODO: move to persist function! */ + if(pThis->iQueueSize > 0 && pThis->bSaveOnShutdown && pThis->bIsDA) { + dbgprintf("Queue 0x%lx: in-memory queue contains %d entries after worker shutdown - using DA to save to disk\n", + queueGetID(pThis), pThis->iQueueSize); + pThis->iLowWtrMrk = 0; /* disable low water mark algo */ + queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* start DA queue in enqueue-only mode */ + pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; + queueShutdownWorkers(pThis); + } /* if running DA, terminate disk queue */ if(pThis->qRunsDA != QRUNS_REGULAR) - queueDestruct(pThis->pqDA); + queueDestruct(&pThis->pqDA); - /* persist the queue (we always do that - queuePersits() does cleanup it the queue is empty) */ + /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) */ CHKiRet_Hdlr(queuePersist(pThis)) { dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet); } /* ... then free resources */ + if(pThis->pWrkThrds != NULL) { + free(pThis->pWrkThrds); + pThis->pWrkThrds = NULL; + } pthread_mutex_destroy(pThis->mut); free(pThis->mut); pthread_cond_destroy(pThis->notFull); @@ -1713,6 +1875,7 @@ rsRetVal queueDestruct(queue_t *pThis) /* and finally delete the queue objet itself */ free(pThis); + *ppThis = NULL; return iRet; } @@ -1845,6 +2008,50 @@ finalize_it: return iRet; } + +/* set queue mode to enqueue only or not + * rgerhards, 2008-01-16 + */ +static rsRetVal +queueSetEnqOnly(queue_t *pThis, int bEnqOnly) +{ + DEFiRet; + int iCancelStateSave; + + ISOBJ_TYPE_assert(pThis, queue); + + /* for simplicity, we do one big mutex lock. This method is extremely seldom + * called, so that doesn't matter... -- rgerhards, 2008-01-16 + */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + pthread_mutex_lock(pThis->mut); + + if(bEnqOnly == pThis->bEnqOnly) + FINALIZE; /* no change, nothing to do */ + + if(pThis->bQueueStarted) { + /* we need to adjust queue operation only if we are not during initial param setup */ + if(bEnqOnly == 1) { + /* switch to enqueue-only mode */ + /* this means we need to terminate all workers - that's it... */ + dbgprintf("Queue 0x%lx: switching to enqueue-only mode, terminating all worker threads\n", + queueGetID(pThis)); + queueWrkThrdReqTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0); + } else { + /* switch back to regular mode */ + ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */ + } + } + + pThis->bEnqOnly = bEnqOnly; + +finalize_it: + pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + return iRet; +} + + /* some simple object access methods */ DEFpropSetMeth(queue, iPersistUpdCnt, int); DEFpropSetMeth(queue, toQShutdown, long); -- cgit