diff options
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 1579 |
1 files changed, 418 insertions, 1161 deletions
@@ -52,491 +52,23 @@ #include "stringbuf.h" #include "srUtils.h" #include "obj.h" +#include "wtp.h" +#include "wti.h" /* static data */ DEFobjStaticHelpers -/* debug aides */ -#if 0 -#define d_pthread_mutex_lock(x) {dbgprintf("mutex %p lock %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ - pthread_mutex_lock(x); \ - if(1)dbgprintf("mutex %p lock aquired %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ - } -#define d_pthread_mutex_unlock(x) {dbgprintf("mutex %p UNlock %s, %s(), line %d\n", x ,__FILE__, __func__, __LINE__);\ - pthread_mutex_unlock(x); \ - if(1)dbgprintf("mutex %p UNlock done %s, %s(), line %d\n", x, __FILE__, __func__, __LINE__); \ - } -#else -#define d_pthread_mutex_lock(x) pthread_mutex_lock(x) -#define d_pthread_mutex_unlock(x) pthread_mutex_unlock(x) -#endif - - /* forward-definitions */ rsRetVal queueChkPersist(queue_t *pThis); -static void *queueWorker(void *arg); -static rsRetVal queueChkWrkThrdChanges(queue_t *pThis); static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly); +static int queueChkStopWrkrDA(queue_t *pThis); +static int queueIsIdleDA(queue_t *pThis); +static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave); +static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2); /* methods */ -/* cancellation cleanup handler - frees provided mutex - * rgerhards, 2008-01-14 - */ -static void queueMutexCleanup(void *arg) -{ - assert(arg != NULL); - d_pthread_mutex_unlock((pthread_mutex_t*) arg); -} - - -/* get the current worker state. For simplicity and speed, we have - * NOT used our regular calling interface this time. I hope that won't - * bite in the long term... -- rgerhards, 2008-01-17 - */ -static inline qWrkCmd_t -qWrkrGetState(qWrkThrd_t *pThis) -{ - assert(pThis != NULL); - return pThis->tCurrCmd; -} - - -/* indicate worker thread startup - * (it would be best if we could do this with an atomic operation) - * rgerhards, 2008-01-19 - */ -static void -queueWrkrThrdStartupIndication(queue_t *pThis) -{ - int iCancelStateSave; - - ISOBJ_TYPE_assert(pThis, queue); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - pThis->iCurNumWrkThrd++; - d_pthread_mutex_unlock(&pThis->mutThrdMgmt); - pthread_setcancelstate(iCancelStateSave, NULL); -} - - -/* indicate worker thread shutdown - * (it would be best if we could do this with an atomic operation) - * rgerhards, 2008-01-19 - */ -static void -queueWrkrThrdShutdownIndication(queue_t *pThis) -{ - int iCancelStateSave; - - ISOBJ_TYPE_assert(pThis, queue); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - pThis->iCurNumWrkThrd--; - pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - d_pthread_mutex_unlock(&pThis->mutThrdMgmt); - pthread_setcancelstate(iCancelStateSave, NULL); -} - - -/* send a command to a specific thread - */ -static rsRetVal -qWrkrSetState(qWrkThrd_t *pThis, qWrkCmd_t tCmd) -{ - DEFiRet; - - assert(pThis != NULL); - -dbgprintf("Queue 0x%lx: trying to send command %d to thread %d\n", queueGetID(pThis->pQueue), tCmd, pThis->iThrd); - if(pThis->tCurrCmd == eWRKTHRD_SHUTDOWN_IMMEDIATE && tCmd != eWRKTHRD_TERMINATING) - FINALIZE; - - dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis->pQueue), tCmd, pThis->iThrd); - - /* change some admin structures */ - switch(tCmd) { - case eWRKTHRD_TERMINATING: - pthread_cond_destroy(&pThis->condInitDone); - pthread_mutex_destroy(&pThis->mut); - dbgprintf("Queue 0x%lx/w%d: thread terminating with %d entries left in queue, %d workers running.\n", - queueGetID(pThis->pQueue), pThis->iThrd, pThis->pQueue->iQueueSize, - pThis->pQueue->iCurNumWrkThrd); - break; - case eWRKTHRD_RUN_CREATED: - pthread_cond_init(&pThis->condInitDone, NULL); - pthread_mutex_init(&pThis->mut, NULL); - break; - case eWRKTHRD_RUN_INIT: - break; - case eWRKTHRD_RUNNING: - pthread_cond_signal(&pThis->condInitDone); - break; - /* these cases just to satisfy the compiler, we do (yet) not act an them: */ - case eWRKTHRD_STOPPED: - case eWRKTHRD_SHUTDOWN: - case eWRKTHRD_SHUTDOWN_IMMEDIATE: - /* DO NOTHING */ - break; - } - - pThis->tCurrCmd = tCmd; - -finalize_it: - return iRet; -} - -/* send a command to a specific active thread. If the thread is not - * active, the command is not sent. - */ -static inline rsRetVal -queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, queue); - assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); - - if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_CREATED) { - qWrkrSetState(&pThis->pWrkThrds[iIdx], tCmd); - } else { - dbgprintf("Queue 0x%lx: command %d NOT sent to inactive thread %d\n", queueGetID(pThis), tCmd, iIdx); - } - - return iRet; -} - - -/* Finalize construction of a wWrkrThrd_t "object" - * rgerhards, 2008-01-17 - */ -static inline rsRetVal -qWrkrConstructFinalize(qWrkThrd_t *pThis, queue_t *pQueue, int i) -{ - assert(pThis != NULL); - ISOBJ_TYPE_assert(pQueue, queue); - - dbgprintf("Queue 0x%lx: finalizing construction of worker %d instance data\n", queueGetID(pQueue), i); - - /* initialize our thread instance descriptor */ - pThis = pQueue->pWrkThrds + i; - pThis->pQueue = pQueue; - pThis->iThrd = i; - pThis->pUsr = NULL; - - qWrkrSetState(pThis, eWRKTHRD_STOPPED); - - return RS_RET_OK; -} - - -/* Waits until the specified worker thread - * changed to full running state (aka have started up). This function - * MUST NOT be called while the queue mutex is locked as it does - * this itself. The wait is without timeout. - * rgerhards, 2008-01-17 - */ -static inline rsRetVal -qWrkrWaitStartup(qWrkThrd_t *pThis) -{ - int iCancelStateSave; - - assert(pThis != NULL); - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->pQueue->mut); - if((pThis->tCurrCmd == eWRKTHRD_RUN_CREATED) || (pThis->tCurrCmd == eWRKTHRD_RUN_CREATED)) { - dbgprintf("Queue 0x%lx: waiting on worker thread %d startup\n", queueGetID(pThis->pQueue), - pThis->iThrd); - pthread_cond_wait(&pThis->condInitDone, pThis->pQueue->mut); -dbgprintf("worker startup done!\n"); - } - d_pthread_mutex_unlock(pThis->pQueue->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - - return RS_RET_OK; -} - - -/* waits until all worker threads that a currently initializing are fully started up - * rgerhards, 2008-01-18 - */ -static rsRetVal -qWrkrWaitAllWrkrStartup(queue_t *pThis) -{ - DEFiRet; - int i; - - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - qWrkrWaitStartup(pThis->pWrkThrds + i); - } - - return iRet; -} - - -/* initialize the qWrkThrd_t structure - this MUST be called right after - * startup of a worker thread. -- rgerhards, 2008-01-17 - */ -static inline rsRetVal -qWrkrInit(qWrkThrd_t **ppThis, queue_t *pQueue) -{ - qWrkThrd_t *pThis; - int i; - - assert(ppThis != NULL); - ISOBJ_TYPE_assert(pQueue, queue); - - /* find myself in the queue's thread table */ - for(i = 0 ; i <= pQueue->iNumWorkerThreads ; ++i) - if(pQueue->pWrkThrds[i].thrdID == pthread_self()) - break; -dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pQueue, - (unsigned) pQueue->pWrkThrds[i].thrdID, i, (unsigned) pthread_self()); - assert(pQueue->pWrkThrds[i].thrdID == pthread_self()); - - /* initialize our thread instance descriptor */ - pThis = pQueue->pWrkThrds + i; - pThis->pQueue = pQueue; - pThis->iThrd = i; - pThis->pUsr = NULL; - - *ppThis = pThis; - qWrkrSetState(pThis, eWRKTHRD_RUN_INIT); - - return RS_RET_OK; -} - - -/* join a specific worker thread - */ -static inline rsRetVal -queueJoinWrkThrd(queue_t *pThis, int iIdx) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, queue); - assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); - assert(pThis->pWrkThrds[iIdx].tCurrCmd != eWRKTHRD_STOPPED); - - dbgprintf("Queue 0x%lx: thread %d state %d, waiting for exit\n", queueGetID(pThis), iIdx, - pThis->pWrkThrds[iIdx].tCurrCmd); - pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL); - qWrkrSetState(&pThis->pWrkThrds[iIdx], eWRKTHRD_STOPPED); /* back to virgin... */ - pThis->pWrkThrds[iIdx].thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ - dbgprintf("Queue 0x%lx: thread %d state %d, has stopped\n", queueGetID(pThis), iIdx, - pThis->pWrkThrds[iIdx].tCurrCmd); - - return iRet; -} - - -/* Starts a worker thread (on a specific index [i]!) - */ -static inline rsRetVal -queueStrtWrkThrd(queue_t *pThis, int i) -{ - DEFiRet; - int iState; - - ISOBJ_TYPE_assert(pThis, queue); - assert(i >= 0 && i <= pThis->iNumWorkerThreads); - assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRD_RUN_CREATED); - - qWrkrSetState(&pThis->pWrkThrds[i], eWRKTHRD_RUN_CREATED); - iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); - dbgprintf("Queue 0x%lx: starting Worker thread %x, index %d with state %d.\n", - (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); - - return iRet; -} - - -/* start the DA worker thread (if not already running) - */ -static inline rsRetVal -queueStrtDAWrkr(queue_t *pThis) -{ - DEFiRet; - int iCancelStateSave; - - ISOBJ_TYPE_assert(pThis, queue); - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->pWrkThrds[0].mut); - if(pThis->pWrkThrds[0].tCurrCmd == eWRKTHRD_STOPPED) { - iRet = queueStrtWrkThrd(pThis, 0); - } - d_pthread_mutex_unlock(&pThis->pWrkThrds[0].mut); - pthread_setcancelstate(iCancelStateSave, NULL); - - return iRet; -} - -/* Starts a *new* worker thread. Function searches itself for a free index spot. It must only - * be called when we have less than max workers active. Pending wrkr thread requests MUST have - * been processed before calling this function. -- rgerhards, 2008-01-16 - */ -static inline rsRetVal -queueStrtNewWrkThrd(queue_t *pThis) -{ - DEFiRet; - int i; - int iStartingUp; - int iState; - - ISOBJ_TYPE_assert(pThis, queue); - - /* find free spot in thread table. If we find at least one worker that is in initializiation, - * we do NOT start a new one. Let's give the other one a chance, first. - */ - iStartingUp = -1; - for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) { -dbgprintf("Queue %p: search thrd tbl slot: i %d, CuccCmd %d\n", pThis, i, pThis->pWrkThrds[i].tCurrCmd); - if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_STOPPED) { - break; - } else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_CREATED) { - iStartingUp = i; - break; - } - } - -dbgprintf("Queue %p: after thrd search: i %d, iStartingUp %d\n", pThis, i, iStartingUp); - if(iStartingUp > -1) - ABORT_FINALIZE(RS_RET_ALREADY_STARTING); - - assert(i <= pThis->iNumWorkerThreads); /* now there must be a free spot, else something is really wrong! */ - - qWrkrSetState(&pThis->pWrkThrds[i], eWRKTHRD_RUN_CREATED); - iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); - dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n", - (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); - /* we try to give the starting worker a little boost. It won't help much as we still - * hold the queue's mutex, but at least it has a chance to start on a single-CPU system. - */ - pthread_yield(); - -finalize_it: - return iRet; -} - - -/* send a command to all active worker threads. A start index can be - * given. Usually, this is 0 or 1. Thread 0 is reserved to disk-assisted - * mode and this start index take care of the special handling it needs to - * receive. -- rgerhards, 2008-01-16 - */ -static inline rsRetVal -queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd) -{ - DEFiRet; - int i; - - ISOBJ_TYPE_assert(pThis, queue); - assert(iStartIdx == 0 || iStartIdx == 1); - - /* tell the workers our request */ - for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i) - if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) - queueTellActWrkThrd(pThis, i, tCmd); - - return iRet; -} - - -/* compute an absolute time timeout suitable for calls to pthread_cond_timedwait() - * rgerhards, 2008-01-14 - */ -static rsRetVal -queueTimeoutComp(struct timespec *pt, int iTimeout) -{ - assert(pt != NULL); - /* compute timeout */ - clock_gettime(CLOCK_REALTIME, pt); - pt->tv_nsec += (iTimeout % 1000) * 1000000; /* think INTEGER arithmetic! */ - if(pt->tv_nsec > 999999999) { /* overrun? */ - pt->tv_nsec -= 1000000000; - ++pt->tv_sec; - } - pt->tv_sec += iTimeout / 1000; - return RS_RET_OK; /* so far, this is static... */ -} - - -/* wake up all worker threads. Param bWithDAWrk tells if the DA worker - * is to be awaken, too. It needs special handling because it waits on - * two different conditions depending on processing state. - * rgerhards, 2008-01-16 - */ -static inline rsRetVal -queueWakeupWrkThrds(queue_t *pThis, int bWithDAWrk) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, queue); - - pthread_cond_broadcast(&pThis->notEmpty); - if(bWithDAWrk && pThis->qRunsDA != QRUNS_REGULAR) { - /* if running disk-assisted, workers may wait on that condition, too */ - pthread_cond_broadcast(&pThis->condDA); - } - - return iRet; -} - - -/* This function checks if (another) worker threads needs to be started. It - * must be called while the caller holds a lock on the queue mutex. So it must not - * do anything that either reaquires the mutex or forces somebody else to aquire - * it (that would lead to a deadlock). - * rgerhards, 2008-01-16 - */ -static inline rsRetVal -queueChkAndStrtWrk(queue_t *pThis) -{ - DEFiRet; - int iCancelStateSave; - - ISOBJ_TYPE_assert(pThis, queue); - - /* process any pending thread requests */ - queueChkWrkThrdChanges(pThis); - - if(pThis->bEnqOnly == 1) - FINALIZE; /* in enqueue-only mode we have no workers */ - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt); - pthread_setcancelstate(iCancelStateSave, NULL); - /* check if we need to start up another worker */ - if(pThis->qRunsDA == QRUNS_REGULAR) { - if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) { -dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qRunsDA: %d\n", - pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA); - /* check if we satisfy the min nbr of messages per worker to start a new one */ - if(pThis->iCurNumWrkThrd == 0 || - pThis->iQueueSize / pThis->iCurNumWrkThrd > pThis->iMinMsgsPerWrkr) { - dbgprintf("Queue 0x%lx: high activity - starting additional worker thread.\n", - queueGetID(pThis)); - queueStrtNewWrkThrd(pThis); - } - } - } else { - if(pThis->iCurNumWrkThrd == 0 && pThis->bEnqOnly == 0) { -dbgprintf("Queue %p: DA worker is no longer running, restarting, qsize %d, workers %d, qRunsDA: %d\n", - pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA); - /* DA worker has timed out and needs to be restarted */ - iRet = queueStrtDAWrkr(pThis); - } - } - pthread_cleanup_pop(1); - -finalize_it: - return iRet; -} - /* --------------- code for disk-assisted (DA) queue modes -------------------- */ @@ -555,7 +87,7 @@ queueTurnOffDAMode(queue_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, queue); - assert(pThis->qRunsDA != QRUNS_REGULAR); + assert(pThis->bRunsDA); /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to @@ -572,62 +104,18 @@ queueTurnOffDAMode(queue_t *pThis) dbgprintf("Queue 0x%lx: disk-assistance being been turned off, bEnqOnly %d, bQueInDestr %d, NumWrkd %d\n", queueGetID(pThis), pThis->bEnqOnly,pThis->bQueueInDestruction,pThis->iCurNumWrkThrd); - // TODO: think about this code - there is a race - if(pThis->bEnqOnly == 0 && pThis->bQueueInDestruction == 0 && pThis->iCurNumWrkThrd < 2) - queueStrtNewWrkThrd(pThis); - pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */ + // TODO: mutex? + pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, * this will be quick. */ queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ - /* now free the remaining resources */ - pthread_mutex_destroy(&pThis->mutDA); - pthread_cond_destroy(&pThis->condDA); - - queueTellActWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE);/* finally, tell ourselves to shutdown */ dbgprintf("Queue 0x%lx: disk-assistance has been turned off, disk queue was empty (iRet %d)\n", queueGetID(pThis), iRet); - return iRet; -} - -/* check if we had any worker thread changes and, if so, act - * on them. At a minimum, terminated threads are harvested (joined). - * This function MUST NEVER block on the queue mutex! - */ -static rsRetVal -queueChkWrkThrdChanges(queue_t *pThis) -{ - DEFiRet; - int i; - - ISOBJ_TYPE_assert(pThis, queue); - - if(pThis->bThrdStateChanged == 0) - FINALIZE; - - /* go through all threads (including DA thread) */ - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - switch(pThis->pWrkThrds[i].tCurrCmd) { - case eWRKTHRD_TERMINATING: - queueJoinWrkThrd(pThis, i); - break; - /* these cases just to satisfy the compiler, we do not act an them: */ - case eWRKTHRD_STOPPED: - case eWRKTHRD_RUN_CREATED: - case eWRKTHRD_RUN_INIT: - case eWRKTHRD_RUNNING: - case eWRKTHRD_SHUTDOWN: - case eWRKTHRD_SHUTDOWN_IMMEDIATE: - /* DO NOTHING */ - break; - } - } - -finalize_it: - return iRet; + RETiRet; } @@ -644,6 +132,7 @@ queueChkIsDA(queue_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, queue); +RUNLOG_VAR("%s", pThis->pszFilePrefix); if(pThis->pszFilePrefix != NULL) { pThis->bIsDA = 1; dbgprintf("Queue 0x%lx: is disk-assisted, disk will be used on demand\n", queueGetID(pThis)); @@ -651,7 +140,7 @@ queueChkIsDA(queue_t *pThis) dbgprintf("Queue 0x%lx: is NOT disk-assisted\n", queueGetID(pThis)); } - return iRet; + RETiRet; } @@ -660,11 +149,11 @@ queueChkIsDA(queue_t *pThis) * chore of this function is to create the DA queue object. If that function fails, * the DA worker should return with an appropriate state, which in turn should lead to * a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this - * function is called, else a race on pThis->qRunsDA may happen. + * function is called, else a race on pThis->bRunsDA may happen. * rgerhards, 2008-01-15 */ static rsRetVal -queueStrtDA(queue_t *pThis) +queueStartDA(queue_t *pThis) { DEFiRet; @@ -682,30 +171,19 @@ dbgprintf("Queue %p: queueSTrtDA after child queue construct, q %p\n", pThis, pT /* as the created queue is the same object class, we take the * liberty to access its properties directly. */ - pThis->pqDA->condSignalOnEmpty = &pThis->condDA; - pThis->pqDA->mutSignalOnEmpty = &pThis->mutDA; - pThis->pqDA->condSignalOnEmpty2 = &pThis->notEmpty; - pThis->pqDA->bSignalOnEmpty = 2; pThis->pqDA->pqParent = pThis; -dbgprintf("Queue %p: queueSTrtDA after assign\n", pThis); CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); -dbgprintf("Queue %p: queueSTrtDA 10\n", pThis); CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly)); -dbgprintf("Queue %p: queueSTrtDA 15\n", pThis); CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); -dbgprintf("Queue %p: queueSTrtDA 20\n", pThis); CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); -dbgprintf("Queue %p: queueSTrtDA 25\n", pThis); if(pThis->toQShutdown == 0) { -dbgprintf("Queue %p: queueSTrtDA 30a\n", pThis); CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ } else { -dbgprintf("Queue %p: queueSTrtDA 30b\n", pThis); /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND * have an obviously large backlog, we can't finish it in any case. So there is no point * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15 @@ -713,30 +191,21 @@ dbgprintf("Queue %p: queueSTrtDA 30b\n", pThis); CHKiRet(queueSettoQShutdown(pThis->pqDA, 1)); } -dbgprintf("Queue %p: queueSTrtDA pre start\n", pThis); +dbgprintf("Queue %p: queueStartDA pre start\n", pThis); iRet = queueStart(pThis->pqDA); /* file not found is expected, that means it is no previous QIF available */ if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) FINALIZE; /* something is wrong */ - /* tell our fellow workers to shut down - * NOTE: we do NOT join them by intension! If we did, we would hold draining - * the queue until some potentially long-running actions are finished. Having - * the ability to immediatly drain the queue was the primary intension of - * reserving worker thread 0 for DA queues. So if we would join the other - * workers, we would screw up and do against our design goal. - */ - CHKiRet(queueTellActWrkThrds(pThis, 1, eWRKTHRD_SHUTDOWN_IMMEDIATE)); - /* as we are right now starting DA mode because we are so busy, it is - * extremely unlikely that any worker is sleeping on empty queue. HOWEVER, + * extremely unlikely that any regular worker is sleeping on empty queue. HOWEVER, * we want to be on the safe side, and so we awake anyone that is waiting * on one. So even if the scheduler plays badly with us, things should be * quite well. -- rgerhards, 2008-01-15 */ - queueWakeupWrkThrds(pThis, 0); /* awake all workers, but not ourselves ;) */ + wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */ - pThis->qRunsDA = QRUNS_DA; /* we are now in DA mode! */ + pThis->bRunsDA = 1; /* we are now in DA mode! */ dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n", queueGetID(pThis), queueGetID(pThis->pqDA)); @@ -751,31 +220,63 @@ finalize_it: pThis->bIsDA = 0; } - return iRet; + RETiRet; } /* initiate DA mode * param bEnqOnly tells if the disk queue is to be run in enqueue-only mode. This may * be needed during shutdown of memory queues which need to be persisted to disk. + * If this function fails (should not happen), DA mode is not turned on. * rgerhards, 2008-01-16 */ static inline rsRetVal -queueInitDA(queue_t *pThis, int bEnqOnly) +queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) { DEFiRet; + DEFVARS_mutexProtection; + uchar pszBuf[64]; + size_t lenBuf; + + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex); + /* check if we already have a DA worker pool. If not, initiate one. Please note that the + * pool is created on first need but never again destructed (until the queue is). This + * is intentional. We assume that when we need it once, we may also need it on another + * occasion. Ressources used are quite minimal when no worker is running. + * rgerhards, 2008-01-24 + */ + if(pThis->pWtpDA == NULL) { + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx/DA", (unsigned long) pThis); + CHKiRet(wtpConstruct (&pThis->pWtpDA)); + CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, queueChkStopWrkrDA)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, queueIsIdleDA)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, queueConsumerDA)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, queueConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, queueStartDA)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, queueTurnOffDAMode)); + CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); + CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); + CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); + CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis)); + CHKiRet(wtpConstructFinalize (pThis->pWtpDA)); + } + /* if we reach this point, we have a "good" DA worker pool */ /* indicate we now run in DA mode - this is reset by the DA worker if it fails */ - pThis->qRunsDA = QRUNS_DA_INIT; + pThis->bRunsDA = 1; pThis->bDAEnqOnly = bEnqOnly; - /* now we must start our DA worker thread - it does the rest of the initialization - * In enqueue-only mode, we do not start any workers. + /* now we must now adivse the wtp that we need one worker. If none is yet active, + * that will also start one up. If we forgot that step, everything would be stalled + * until the next enqueue request. */ if(pThis->bEnqOnly == 0) - iRet = queueStrtDAWrkr(pThis); + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* one worker only for disk queues! */ - return iRet; +finalize_it: + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + RETiRet; } @@ -783,7 +284,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly) * keep it running if we are already in it. * rgerhards, 2008-01-14 */ -static rsRetVal +static inline rsRetVal queueChkStrtDA(queue_t *pThis) { DEFiRet; @@ -794,35 +295,31 @@ queueChkStrtDA(queue_t *pThis) if(pThis->iQueueSize != pThis->iHighWtrMrk) ABORT_FINALIZE(RS_RET_OK); - if(pThis->qRunsDA != QRUNS_REGULAR) { +dbgprintf("Queue %p: chkStartDA\n", pThis); + if(pThis->bRunsDA) { /* then we need to signal that we are at the high water mark again. If that happens * on our way down the queue, that doesn't matter, because then nobody is waiting * on the condition variable. + * (Remember that a DA queue stops draining the queue once it has reached the low + * water mark and restarts it when the high water mark is reached again - this is + * what this code here is responsible for. Please note that all workers may have been + * terminated due to the inactivity timeout, thus we need to advise the pool that + * we need at least one). */ dbgprintf("Queue 0x%lx: %d entries - passed high water mark in DA mode, send notify\n", queueGetID(pThis), pThis->iQueueSize); - //pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - // TODO: mutex call order check! this must aquire the queue mutex - //d_pthread_mutex_lock(&pThis->mutDA); - pthread_cond_signal(&pThis->condDA); - //d_pthread_mutex_unlock(&pThis->mutDA); - //pthread_setcancelstate(iCancelStateSave, NULL); - queueChkWrkThrdChanges(pThis); /* the queue mode may have changed while we waited, so check! */ - } - - /* we need to re-check if we run disk-assisted, because that status may have changed - * in our high water mark processing. - */ - if(pThis->qRunsDA == QRUNS_REGULAR) { - /* if we reach this point, we are NOT currently running in DA mode. */ + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* run again [see comment above] ;) */ + } else { + /* this is the case when we are currently not running in DA mode. So it is time + * to turn it back on. + */ dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n", queueGetID(pThis), pThis->iQueueSize); - - queueInitDA(pThis, QUEUE_MODE_ENQDEQ); /* initiate DA mode */ + queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ } finalize_it: - return iRet; + RETiRet; } @@ -855,7 +352,7 @@ static rsRetVal qConstructFixedArray(queue_t *pThis) queueChkIsDA(pThis); finalize_it: - return iRet; + RETiRet; } @@ -868,7 +365,7 @@ static rsRetVal qDestructFixedArray(queue_t *pThis) if(pThis->tVars.farray.pBuf != NULL) free(pThis->tVars.farray.pBuf); - return iRet; + RETiRet; } static rsRetVal qAddFixedArray(queue_t *pThis, void* in) @@ -881,7 +378,7 @@ static rsRetVal qAddFixedArray(queue_t *pThis, void* in) if (pThis->tVars.farray.tail == pThis->iMaxQueueSize) pThis->tVars.farray.tail = 0; - return iRet; + RETiRet; } static rsRetVal qDelFixedArray(queue_t *pThis, void **out) @@ -895,7 +392,7 @@ static rsRetVal qDelFixedArray(queue_t *pThis, void **out) if (pThis->tVars.farray.head == pThis->iMaxQueueSize) pThis->tVars.farray.head = 0; - return iRet; + RETiRet; } @@ -911,7 +408,7 @@ static rsRetVal qConstructLinkedList(queue_t *pThis) queueChkIsDA(pThis); - return iRet; + RETiRet; } @@ -925,7 +422,7 @@ static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis) * dynamic left with the linked list. */ - return iRet; + RETiRet; } static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr) @@ -949,7 +446,7 @@ static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr) } finalize_it: - return iRet; + RETiRet; } static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) @@ -971,7 +468,7 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) } free(pEntry); - return iRet; + RETiRet; } @@ -986,7 +483,7 @@ queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t *pThis) ISOBJ_TYPE_assert(pThis, queue); CHKiRet(strmSetDir(pStrm, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); finalize_it: - return iRet; + RETiRet; } @@ -1024,7 +521,7 @@ queueHaveQIF(queue_t *pThis) /* If we reach this point, we have a .qi file */ finalize_it: - return iRet; + RETiRet; } @@ -1092,7 +589,7 @@ finalize_it: queueGetID(pThis), iRet); } - return iRet; + RETiRet; } @@ -1150,7 +647,7 @@ CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize)); CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize)); finalize_it: - return iRet; + RETiRet; } @@ -1166,7 +663,7 @@ static rsRetVal qDestructDisk(queue_t *pThis) if(pThis->pszSpoolDir != NULL) free(pThis->pszSpoolDir); - return iRet; + RETiRet; } static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) @@ -1179,7 +676,7 @@ static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) CHKiRet(strmFlush(pThis->tVars.disk.pWrite)); finalize_it: - return iRet; + RETiRet; } static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) @@ -1216,7 +713,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr) * queue anything ;) */ - return iRet; + RETiRet; } static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) @@ -1228,6 +725,7 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__ /* --------------- end type-specific handlers -------------------- */ + /* generic code to add a queue entry */ static rsRetVal queueAdd(queue_t *pThis, void *pUsr) @@ -1242,7 +740,7 @@ queueAdd(queue_t *pThis, void *pUsr) dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", queueGetID(pThis), pThis->iQueueSize); finalize_it: - return iRet; + RETiRet; } @@ -1265,121 +763,31 @@ queueDel(queue_t *pThis, void *pUsr) dbgprintf("Queue 0x%lx: entry deleted, state %d, size now %d entries\n", queueGetID(pThis), iRet, pThis->iQueueSize); - return iRet; -} - - -/* Send a shutdown command to all workers and awake them. This function - * does NOT wait for them to terminate. Set bIncludeDAWRk to send the - * termination command to the DA worker, too (else this does not happen). - * rgerhards, 2008-01-16 - */ -static inline rsRetVal -queueWrkThrdReqTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int bIncludeDAWrk) -{ - DEFiRet; - - if(bIncludeDAWrk) { - queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ - queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ - } else { - queueTellActWrkThrds(pThis, 1, tShutdownCmd);/* first tell the workers our request */ - queueWakeupWrkThrds(pThis, 0); /* awake all workers but not DA-worker */ - } - - return iRet; -} - - -/* Send a shutdown command to all workers and see if they terminate. - * A timeout may be specified. - * rgerhards, 2008-01-14 - */ -static rsRetVal -queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) -{ - DEFiRet; - int bTimedOut; - struct timespec t; - int iCancelStateSave; - - queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ - queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ - /* race: must make sure all are running! */ - queueTimeoutComp(&t, iTimeout);/* get timeout */ - - /* and wait for their termination */ -dbgprintf("Queue %p: waiting for mutex %p\n", pThis, &pThis->mutThrdMgmt); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - pthread_cleanup_push(queueMutexCleanup, &pThis->mutThrdMgmt); - pthread_setcancelstate(iCancelStateSave, NULL); - bTimedOut = 0; - while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { - dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n", - queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd); - - if(pthread_cond_timedwait(&pThis->condThrdTrm, &pThis->mutThrdMgmt, &t) != 0) { - dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", queueGetID(pThis)); - bTimedOut = 1; /* we exit the loop on timeout */ - } - } - pthread_cleanup_pop(1); - - if(bTimedOut) - iRet = RS_RET_TIMED_OUT; - - return iRet; + RETiRet; } -/* Unconditionally cancel all running worker threads. - * rgerhards, 2008-01-14 - */ -static rsRetVal -queueWrkThrdCancel(queue_t *pThis) -{ - DEFiRet; - int i; - // TODO: we need to implement peek(), without it (today!) we lose one message upon - // worker cancellation! -- rgerhards, 2008-01-14 - - /* process any pending thread requests so that we know who actually is still running */ - queueChkWrkThrdChanges(pThis); - - /* awake the workers one more time, just to be sure */ - queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ - - /* first tell the workers our request */ - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) { - dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i); - pthread_cancel(pThis->pWrkThrds[i].thrdID); - } - } - - return iRet; -} - - -/* Worker thread management function carried out when the main - * worker is about to terminate. +/* This function shuts down all worker threads and waits until they + * have terminated. If they timeout, they are cancelled. Parameters have been set + * before this function is called so that DA queues will be fully persisted to + * disk (if configured to do so). + * rgerhards, 2008-01-24 */ static rsRetVal queueShutdownWorkers(queue_t *pThis) { DEFiRet; int i; - assert(pThis != NULL); + ISOBJ_TYPE_assert(pThis, queue); - dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis); + dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", queueGetID(pThis)); /* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This * is necessary so that all threads get sent the termination command. With a timeout of 0, however, * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as * good. */ - iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN, pThis->toQShutdown); + wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, pThis->toQShutdown); if(iRet == RS_RET_TIMED_OUT) { if(pThis->toQShutdown == 0) { iRet = RS_RET_OK; @@ -1387,418 +795,69 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* OK, we now need to try force the shutdown */ dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", queueGetID(pThis)); - iRet = queueWrkThrdTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); + iRet = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); } } if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */ /* still didn't work out - so we now need to cancel the workers */ dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis); - iRet = queueWrkThrdCancel(pThis); + iRet = wtpCancelAll(pThis->pWtpReg); + } + + // TODO: do it just once but right ;) + if(pThis->pWtpDA != NULL) { + wtpShutdownAll(pThis->pWtpDA, pThis->toQShutdown, pThis->toQShutdown); + if(iRet == RS_RET_TIMED_OUT) { + if(pThis->toQShutdown == 0) { + iRet = RS_RET_OK; + } else { + /* OK, we now need to try force the shutdown */ + dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", + queueGetID(pThis)); + iRet = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); + } + } + + if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */ + /* still didn't work out - so we now need to cancel the workers */ + dbgprintf("Queue 0x%lx: worker threads could not be shutdown, now canceling them\n", (unsigned long) pThis); + iRet = wtpCancelAll(pThis->pWtpDA); + } } + /* finally join the threads * In case of a cancellation, this may actually take some time. This is also * needed to clean up the thread descriptors, even with a regular termination. * And, most importantly, this is needed if we have an indifitite termination * time set (timeout == 0)! -- rgerhards, 2008-01-14 */ +#if 0 // totally wrong, we must implement something along these lines in wtp! +RUNLOG; for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { - if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRD_STOPPED) { - queueJoinWrkThrd(pThis, i); + if(pThis->pWtpReg->pWrkr[i]->tCurrCmd != eWRKTHRD_STOPPED) { + wtiJoinThrd(pThis->pWtpReg->pWrkr[i]); } } - dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", - queueGetID(pThis), pThis->iQueueSize); - - return iRet; -} - -/* This is a special consumer to feed the disk-queue in disk-assited mode. - * When active, our own queue more or less acts as a memory buffer to the disk. - * So this consumer just needs to drain the memory queue and submit entries - * to the disk queue. The disk queue will then call the actual consumer from - * the app point of view (we chain two queues here). - * rgerhards, 2008-01-14 - */ -static inline rsRetVal -queueDAConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst) -{ - DEFiRet; - int iCancelStateSave; - - ISOBJ_TYPE_assert(pThis, queue); - assert(pThis->qRunsDA != QRUNS_REGULAR); - ISOBJ_assert(pWrkrInst->pUsr); - -dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize);/* dirty iQueueSize! */ - CHKiRet(queueEnqObj(pThis->pqDA, pWrkrInst->pUsr)); - - /* We check if we reached the low water mark (but only if we are not in shutdown mode) - * Note that the child queue now in almost all cases is non-empty, because we just enqueued - * a message. Note that we need a quick check below to see if we are still in running state. - * If not, we do not go into the wait, because that's not a good thing to do. We do not - * do a full termination check, as this is done when we go back to the main worker loop. - * We need to re-aquire the queue mutex here, because we need to have a consistent - * access to the queue's admin data. - */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); -dbgprintf("pre mutex lock (think about CLEANUP!)\n"); - d_pthread_mutex_lock(pThis->mut); - pthread_cleanup_push(queueMutexCleanup, pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); -dbgprintf("mutex locked (think about CLEANUP!)\n"); - if(pThis->iQueueSize <= pThis->iLowWtrMrk && pWrkrInst->tCurrCmd == eWRKTHRD_RUNNING) { - dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n", - queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize); - /* wait for either passing the high water mark or the child disk queue drain */ - pthread_cond_wait(&pThis->condDA, pThis->mut); - } - pthread_cleanup_pop(1); /* release mutex in an atomic way via cleanup handler */ - -finalize_it: -dbgprintf("DAConsumer returns with iRet %d\n", iRet); - return iRet; -} - - -/* This is a helper for queueWorker () it either calls the configured - * consumer or the DA-consumer (if in disk-assisted mode). It is - * protected by the queue mutex, but MUST release it as soon as possible. - * Most importantly, it must release it before the consumer is called. - * rgerhards, 2008-01-14 - */ -static inline rsRetVal -queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancelStateSave) -{ - DEFiRet; - rsRetVal iRetLocal; - int iSeverity; - int iQueueSize; - void *pUsr; - int qRunsDA; - int iMyThrdIndx; - - ISOBJ_TYPE_assert(pThis, queue); - assert(pWrkrInst != NULL); - - iMyThrdIndx = pWrkrInst->iThrd; - - /* dequeue element (still protected from mutex) */ - iRet = queueDel(pThis, &pUsr); - queueChkPersist(pThis); // when we support peek(), we must do this down after the del! - iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */ - pWrkrInst->pUsr = pUsr; /* save it for the cancel cleanup handler */ - qRunsDA = pThis->qRunsDA; - d_pthread_mutex_unlock(pThis->mut); - pthread_cond_signal(&pThis->notFull); - pthread_setcancelstate(iCancelStateSave, NULL); - /* WE ARE NO LONGER PROTECTED FROM THE MUTEX */ - - /* do actual processing (the lengthy part, runs in parallel) - * If we had a problem while dequeing, we do not call the consumer, - * but we otherwise ignore it. This is in the hopes that it will be - * self-healing. However, this is really not a good thing. - * rgerhards, 2008-01-03 - */ - if(iRet != RS_RET_OK) - FINALIZE; - - /* call consumer depending on queue mode (in DA mode, we have just one thread, so it can not change) */ - if(qRunsDA == QRUNS_DA) { - queueDAConsumer(pThis, pWrkrInst); - } else { - /* we are running in normal, non-disk-assisted mode - * do a quick check if we need to drain the queue. It is OK to use the cached - * iQueueSize here, because it does not hurt if it is slightly wrong. - */ - if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { - iRetLocal = objGetSeverity(pUsr, &iSeverity); - if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { - dbgprintf("Queue 0x%lx/w%d: dequeue/queue nearly full (%d entries), " - "discarded severity %d message\n", - queueGetID(pThis), iMyThrdIndx, iQueueSize, iSeverity); - objDestruct(pUsr); - } - } else { - dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n", - queueGetID(pThis), iMyThrdIndx); - iRetLocal = pThis->pConsumer(pUsr); - if(iRetLocal != RS_RET_OK) { - dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n", - queueGetID(pThis), iMyThrdIndx, iRetLocal); +RUNLOG; + if(pThis->pWtpDA != NULL) { + for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { + if(pThis->pWtpDA->pWrkr[i]->tCurrCmd != eWRKTHRD_STOPPED) { + wtiJoinThrd(pThis->pWtpDA->pWrkr[i]); } } } +#endif -finalize_it: - if(iRet != RS_RET_OK) { - dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things " - "may happen\n", queueGetID(pThis), iMyThrdIndx, iRet); - } -dbgprintf("CallConsumer returns %d\n", iRet); - return iRet; -} - - - -/* cancellation cleanup handler for queueWorker () - * Updates admin structure and frees ressources. - * rgerhards, 2008-01-16 - */ -static void queueWorkerCancelCleanup(void *arg) -{ - qWrkThrd_t *pWrkrInst = (qWrkThrd_t*) arg; - queue_t *pThis; - int iCancelStateSave; - - assert(pWrkrInst != NULL); - ISOBJ_TYPE_assert(pWrkrInst->pQueue, queue); - pThis = pWrkrInst->pQueue; - - dbgprintf("Queue 0x%lx/w%d: cancelation cleanup handler called (NOT FULLY IMPLEMENTED, one msg lost!)\n", - queueGetID(pThis), pWrkrInst->iThrd); - - /* TODO: re-enqueue the data element! */ - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - qWrkrSetState(&pThis->pWrkThrds[pWrkrInst->iThrd], eWRKTHRD_TERMINATING); - pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - - dbgprintf("Queue 0x%lx/w%d: thread CANCELED with %d entries left in queue, %d workers running.\n", - queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize, pThis->iCurNumWrkThrd - 1); - - pThis->iCurNumWrkThrd--; - pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - d_pthread_mutex_unlock(&pThis->mutThrdMgmt); - pthread_setcancelstate(iCancelStateSave, NULL); -} - - -/* This function is created to keep the code in queueWorker () short. Thus it - * also does not abide to the usual calling conventions used in rsyslog. It is more - * like a macro. Its sole purpose is to have a handy shortcut for the queue - * termination condition. For the same reason, the calling parameters are a bit - * more verbose than the need to be in theory. The reasoning is the Worker has - * everything handy and so we do not need to access it from memory (OK, the - * optimized would probably have created the same code, but why not do it - * optimal right away...). The function returns 0 if the worker should terminate - * and something else if it should continue to run. - * rgerhards, 2008-01-18 - */ -static inline int -queueWorkerRemainActive(queue_t *pThis, qWrkThrd_t *pWrkrInst) -{ - register int b; /* this is a boolean! */ - - /* first check the usual termination condition that applies to all workers */ - b = ( (qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING) - || ((qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && (pThis->iQueueSize > 0))); -dbgprintf("Queue %p/w%d: chk 1 pre empty queue, qsize %d (high wtr %d), cont run: %d, cmd %d, DA qsize %d\n", pThis, - pWrkrInst->iThrd, - pThis->iQueueSize, pThis->iHighWtrMrk, b, qWrkrGetState(pWrkrInst), - (pThis->pqDA == NULL) ? -1 : pThis->pqDA->iQueueSize); - if(b && pWrkrInst->iThrd == 0 && pThis->qRunsDA == QRUNS_DA) { - b = pThis->iQueueSize >= pThis->iHighWtrMrk || pThis->pqDA->iQueueSize != 0; - } + dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", + queueGetID(pThis), pThis->iQueueSize); -dbgprintf("Queue %p/w%d: pre empty queue, qsize %d, cont run: %d\n", pThis, pWrkrInst->iThrd, pThis->iQueueSize, b); - return b; + RETiRet; } -/* Each queue has at least one associated worker (consumer) thread. It will pull - * the message from the queue and pass it to a user-defined function. - * This function was provided on construction. It MUST be thread-safe. - * Worker thread 0 is always reserved for disk-assisted mode (if the queue - * is not DA, this worker will be dormant). All other workers are for - * regular operations mode. Workers are started and stopped as need arises. - * rgerhards, 2008-01-15 - */ -static void * -queueWorker(void *arg) -{ - queue_t *pThis = (queue_t*) arg; - sigset_t sigSet; - struct timespec t; - int iMyThrdIndx; /* index for this thread in queue thread table */ - int iCancelStateSave; - qWrkThrd_t *pWrkrInst; /* for cleanup handler */ - int bContinueRun; - - ISOBJ_TYPE_assert(pThis, queue); - - sigfillset(&sigSet); - pthread_sigmask(SIG_BLOCK, &sigSet, NULL); - - /* do some one-time thread initialization */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - - /* initialize our thread instance descriptor */ - qWrkrInit(&pWrkrInst, pThis); - - iMyThrdIndx = pWrkrInst->iThrd; - - dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx); - -dbgprintf("qRunsDA %d, check against %d\n", pThis->qRunsDA, QRUNS_DA); - if((iMyThrdIndx == 0) && (pThis->qRunsDA != QRUNS_DA)) { /* are we the DA worker? */ - if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */ - /* if we could not init the DA queue, we have nothing to do, so shut down. */ - queueTellActWrkThrd(pThis, 0, eWRKTHRD_SHUTDOWN_IMMEDIATE); - } - } - - /* finally change to RUNNING state. We need to check if we actually should still run, - * because someone may have requested us to shut down even before we got a chance to do - * our init. That would be a bad race... -- rgerhards, 2008-01-16 - */ - pThis->iCurNumWrkThrd++; - if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT) - qWrkrSetState(pWrkrInst, eWRKTHRD_RUNNING); /* we are running now! */ - - pthread_cleanup_push(queueWorkerCancelCleanup, pWrkrInst); - - d_pthread_mutex_unlock(&pThis->mutThrdMgmt); - pthread_setcancelstate(iCancelStateSave, NULL); - /* end one-time stuff */ - - /* now we have our identity, on to real processing */ - bContinueRun = 1; /* we need this variable, because we need to check the actual termination condition - * while protected by mutex */ - while(bContinueRun) { -dbgprintf("Queue 0x%lx/w%d: start worker run, queue cmd currently %d\n", - queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd); - /* process any pending thread requests */ - queueChkWrkThrdChanges(pThis); - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->mut); -dbgprintf("pthis 2: %p\n", pThis); - - if((bContinueRun = queueWorkerRemainActive(pThis, pWrkrInst)) == 0) { -dbgprintf("pthis 2a: %p\n", pThis); - d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); -dbgprintf("pthis 2b: %p\n", pThis); - continue; /* and break loop */ - } - - /* if we reach this point, we are still protected by the mutex */ -dbgprintf("pthis 3: %p\n", pThis); - - if(pThis->iQueueSize == 0) { - dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", - queueGetID(pThis), iMyThrdIndx); - /* check if the parent DA worker is running and, if not, initiate it. Thanks - * to queueStrtDAWrkr (), we do not actually need to check (that routines does - * that for us, but we need to aquire the parent queue's mutex to call it. - */ - if(pThis->pqParent != NULL) { - dbgprintf("Queue %p: pre start parent %p worker\n", pThis, pThis->pqParent); - queueStrtDAWrkr(pThis->pqParent); - } - - if(pThis->bSignalOnEmpty > 0) { - /* we need to signal our parent queue that we are empty */ - dbgprintf("Queue %p/w%d: signal parent we are empty\n", pThis, iMyThrdIndx); - pthread_cond_signal(pThis->condSignalOnEmpty); - dbgprintf("Queue %p/w%d: signaling parent empty done\n", pThis, iMyThrdIndx); - } - if(pThis->bSignalOnEmpty > 1) { - /* no mutex associated with this condition, it's just a try (but needed - * to wakeup a parent worker if e.g. the queue was restarted from disk) */ - pthread_cond_signal(pThis->condSignalOnEmpty2); - } - /* If we arrive here, we have the regular case, where we can safely assume that - * iQueueSize and tCmd have not changed since the while(). - */ - dbgprintf("Queue %p/w%d: pre condwait ->notEmpty, worker shutdown %d\n", pThis, iMyThrdIndx, pThis->toWrkShutdown); - /* DA worker and first worker never have an inactivity timeout */ - if(pWrkrInst->iThrd < 2 || pThis->toWrkShutdown == -1) { - //xxx if(pThis->toWrkShutdown == -1) { - dbgprintf("worker never times out!\n"); - /* never shut down any started worker */ - pthread_cond_wait(&pThis->notEmpty, pThis->mut); - } else { - queueTimeoutComp(&t, pThis->toWrkShutdown);/* get absolute timeout */ - if(pthread_cond_timedwait(&pThis->notEmpty, pThis->mut, &t) != 0) { - dbgprintf("Queue 0x%lx/w%d: inactivity timeout, worker terminating...\n", - queueGetID(pThis), iMyThrdIndx); - /* we use SHUTDOWN (and not SHUTDOWN_IMMEDIATE) so that the worker - * does not terminate if in the mean time a new message arrived. - */ - qWrkrSetState(pWrkrInst, eWRKTHRD_SHUTDOWN); - } - } - dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); - d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - pthread_testcancel(); /* see big comment below */ - pthread_yield(); /* see big comment below */ - continue; /* request next iteration */ - } - - /* if we reach this point, we have a non-empty queue (and are still protected by mutex) */ - queueWorkerChkAndCallConsumer(pThis, pWrkrInst, iCancelStateSave); - - /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is - * a cancellation point in itself. As we run most of the time without cancel enabled, I fear - * we may never get cancelled if we do not create a cancellation point ourselfs. - */ - pthread_testcancel(); - /* We now yield to give the other threads a chance to obtain the mutex. If we do not - * do that, this thread may very well aquire the mutex again before another thread - * has even a chance to run. The reason is that mutex operations are free to be - * implemented in the quickest possible way (and they typically are!). That is, the - * mutex lock/unlock most probably just does an atomic memory swap and does not necessarily - * schedule other threads waiting on the same mutex. That can lead to the same thread - * aquiring the mutex ever and ever again while all others are starving for it. We - * have exactly seen this behaviour when we deliberately introduced a long-running - * test action which basically did a sleep. I understand that with real actions the - * likelihood of this starvation condition is very low - but it could still happen - * and would be very hard to debug. The yield() is a sure fix, its performance overhead - * should be well accepted given the above facts. -- rgerhards, 2008-01-10 - */ - pthread_yield(); - if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0) - dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has " - " %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); - } - - /* indicate termination */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); -dbgprintf("Queue %p: worker waiting for mutex\n", pThis); - d_pthread_mutex_lock(&pThis->mutThrdMgmt); - /* check if we are the DA worker and, if so, switch back to regular mode */ - if(pWrkrInst->iThrd == 0) { - queueTurnOffDAMode(pThis); - } - pthread_cleanup_pop(0); /* remove cleanup handler */ - - pThis->iCurNumWrkThrd--; /* one less ;) */ - /* if we ever need finalize_it, here would be the place for it! */ - if(qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN || - qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN_IMMEDIATE || - qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT || - qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_CREATED) { - /* in shutdown case, we need to flag termination. All other commands - * have a meaning to the thread harvester, so we can not overwrite them - */ -dbgprintf("Queue 0x%lx/w%d: setting termination state\n", queueGetID(pThis), iMyThrdIndx); - qWrkrSetState(pWrkrInst, eWRKTHRD_TERMINATING); - } - pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - d_pthread_mutex_unlock(&pThis->mutThrdMgmt); - pthread_setcancelstate(iCancelStateSave, NULL); - - pthread_exit(0); -} - /* Constructor for the queue object * This constructs the data structure, but does not yet start the queue. That @@ -1811,6 +870,8 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, DEFiRet; queue_t *pThis; +int *pBoom = NULL; +//*pBoom = 'A'; assert(ppThis != NULL); assert(pConsumer != NULL); assert(iWorkerThreads >= 0); @@ -1866,7 +927,205 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, finalize_it: OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP - return iRet; + RETiRet; +} + + +/* cancellation cleanup handler for queueWorker () + * Updates admin structure and frees ressources. + * rgerhards, 2008-01-16 + */ +static rsRetVal +queueConsumerCancelCleanup(void *arg1, void *arg2) +{ + queue_t *pThis = (queue_t*) arg1; + wti_t *pWti = (wti_t*) arg2; + + ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pWti, wti); + + dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n", + queueGetID(pThis)); + + /* TODO: re-enqueue the data element! */ + + return RS_RET_OK; +} + + + +/* This function checks if the provided message shall be discarded and does so, if needed. + * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to + * provide real-time creation of spool files. + * Note: cached copies of iQueueSize and bRunsDA are provided so that no mutex locks are required. + * The caller must have obtained them while the mutex was locked. Of course, these values may no + * longer be current, but that is OK for the discard check. At worst, the message is either processed + * or discarded when it should not have been. As discarding is in itself somewhat racy and erratic, + * that is no problems for us. This function MUST NOT lock the queue mutex, it could result in + * deadlocks! + * If the message is discarded, it can no longer be processed by the caller. So be sure to check + * the return state! + * rgerhards, 2008-01-24 + */ +static int queueChkDiscardMsg(queue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr) +{ + DEFiRet; + rsRetVal iRetLocal; + int iSeverity; + + ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_assert(pUsr); + + if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) { + iRetLocal = objGetSeverity(pUsr, &iSeverity); + if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { + dbgprintf("Queue 0x%lx: queue nearly full (%d entries), discarded severity %d message\n", + queueGetID(pThis), iQueueSize, iSeverity); + objDestruct(pUsr); + ABORT_FINALIZE(RS_RET_QUEUE_FULL); + } else { + dbgprintf("Queue 0x%lx: queue nearly full (%d entries), but could not drop msg " + "(iRet: %d, severity %d)\n", queueGetID(pThis), iQueueSize, + iRetLocal, iSeverity); + } + } + +finalize_it: + RETiRet; +} + + +/* dequeue the queued object for the queue consumers. + * rgerhards, 2008-10-21 + */ +static rsRetVal +queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +{ + DEFiRet; + void *pUsr; + int iQueueSize; + int bRunsDA; /* cache for early mutex release */ + + /* dequeue element (still protected from mutex) */ + iRet = queueDel(pThis, &pUsr); + queueChkPersist(pThis); // when we support peek(), we must do this down after the del! + iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */ + bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ + pWti->pUsrp = pUsr; /* save it for the cancel cleanup handler */ + d_pthread_mutex_unlock(pThis->mut); + pthread_cond_signal(&pThis->notFull); + pthread_setcancelstate(iCancelStateSave, NULL); + /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ + + /* do actual processing (the lengthy part, runs in parallel) + * If we had a problem while dequeing, we do not call the consumer, + * but we otherwise ignore it. This is in the hopes that it will be + * self-healing. However, this is really not a good thing. + * rgerhards, 2008-01-03 + */ + if(iRet != RS_RET_OK) + FINALIZE; + + /* we are running in normal, non-disk-assisted mode do a quick check if we need to drain the queue. + * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to + * provide real-time creation of spool files. + * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong. + */ + CHKiRet(queueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr)); + +finalize_it: + if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { + dbgprintf("Queue 0x%lx/w?: error %d dequeueing element - ignoring, but strange things " + "may happen\n", queueGetID(pThis), iRet); + } + RETiRet; +} + + +/* This is the queue consumer in the regular (non-DA) case. It is + * protected by the queue mutex, but MUST release it as soon as possible. + * rgerhards, 2008-01-21 + */ +static rsRetVal +queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pWti, wti); + + CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(pThis->pConsumer(pWti->pUsrp)); + +finalize_it: +dbgprintf("Queue %p: regular consumer returns %d\n", pThis, iRet); + RETiRet; +} + + +/* This is a special consumer to feed the disk-queue in disk-assited mode. + * When active, our own queue more or less acts as a memory buffer to the disk. + * So this consumer just needs to drain the memory queue and submit entries + * to the disk queue. The disk queue will then call the actual consumer from + * the app point of view (we chain two queues here). + * When this method is entered, the mutex is always locked and needs to be unlocked + * as part of the processing. + * rgerhards, 2008-01-14 + */ +static rsRetVal +queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pWti, wti); + +dbgprintf("Queue %p/w?: queueDAConsumer, queue size %d\n", pThis, pThis->iQueueSize);/* dirty iQueueSize! */ + CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(queueEnqObj(pThis->pqDA, pWti->pUsrp)); + +finalize_it: +dbgprintf("DAConsumer returns with iRet %d\n", iRet); + RETiRet; +} + + +/* must only be called when the queue mutex is locked, else results + * are not stable! + * Version when running in DA mode. + */ +static int +queueChkStopWrkrDA(queue_t *pThis) +{ + return pThis->bEnqOnly || !pThis->bRunsDA; +} + +/* must only be called when the queue mutex is locked, else results + * are not stable! + * Version when running in non-DA mode. + */ +static int +queueChkStopWrkrReg(queue_t *pThis) +{ + return pThis->bEnqOnly || pThis->bRunsDA; +} + + +/* must only be called when the queue mutex is locked, else results + * are not stable! DA version + */ +static int +queueIsIdleDA(queue_t *pThis) +{ + return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk)); +} +/* must only be called when the queue mutex is locked, else results + * are not stable! Regular version + */ +static int +queueIsIdleReg(queue_t *pThis) +{ + return (pThis->iQueueSize == 0); } @@ -1878,7 +1137,8 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ DEFiRet; rsRetVal iRetLocal; int bInitialized = 0; /* is queue already initialized? */ - int i; + uchar pszBuf[64]; + size_t lenBuf; assert(pThis != NULL); @@ -1888,7 +1148,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ if(pThis->pqParent == NULL) { dbgprintf("Queue %p: no parent, alloc mutex\n", pThis); pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t)); - pthread_mutex_init (pThis->mut, NULL); + pthread_mutex_init(pThis->mut, NULL); } else { /* child queue, we need to use parent's mutex */ pThis->mut = pThis->pqParent->mut; @@ -1909,15 +1169,23 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ - /* initialize worker thread instances - * TODO: move to separate function + /* create worker thread pools for regular operation. The DA pool is created on an as-needed + * basis, which potentially means never under most circumstances. */ - if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - for(i = 0 ; i < pThis->iNumWorkerThreads + 1 ; ++i) { - qWrkrConstructFinalize(&pThis->pWrkThrds[i], pThis, i); - } - + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx/Reg", (unsigned long) pThis); + CHKiRet(wtpConstruct (&pThis->pWtpReg)); + CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, queueChkStopWrkrReg)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, queueIsIdleReg)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, queueConsumerReg)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, queueConsumerCancelCleanup)); + CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); + CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); + CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); + CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis)); + CHKiRet(wtpConstructFinalize (pThis->pWtpReg)); + + /* initialize worker thread instances */ if(pThis->bIsDA) { /* If we are disk-assisted, we need to check if there is a QIF file * which we need to load. -- rgerhards, 2008-01-15 @@ -1927,7 +1195,11 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n", queueGetID(pThis)); - queueInitDA(pThis, QUEUE_MODE_ENQDEQ); /* initiate DA mode */ + queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ + /* we need to start the DA worker thread so that messages will be processed. So + * we advise the worker pool there is at least one needed. The wtp does the rest... + */ + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); bInitialized = 1; /* we are done */ } else { // TODO: use logerror? -- rgerhards, 2008-01-16 @@ -1938,16 +1210,14 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut); if(!bInitialized) { dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis)); - /* fire up the worker threads */ - if(pThis->bEnqOnly == 0) - queueStrtNewWrkThrd(pThis); - // TODO: preforked workers! queueStrtAllWrkThrds(pThis); + /* we do not fire up any worker threads here, this happens automatically when they are needed */ + // TODO: preforked workers? queueStrtAllWrkThrds(pThis); } pThis->bQueueStarted = 1; finalize_it: dbgprintf("queueStart() exit, iret %d\n", iRet); - return iRet; + RETiRet; } @@ -2023,7 +1293,7 @@ finalize_it: if(psQIF != NULL) strmDestruct(&psQIF); - return iRet; + RETiRet; } @@ -2043,15 +1313,16 @@ rsRetVal queueChkPersist(queue_t *pThis) pThis->iUpdsSincePersist = 0; } - return iRet; + RETiRet; } /* destructor for the queue object */ rsRetVal queueDestruct(queue_t **ppThis) { - queue_t *pThis; DEFiRet; + queue_t *pThis; + DEFVARS_mutexProtection; assert(ppThis != NULL); pThis = *ppThis; @@ -2064,52 +1335,38 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove /* we do not need to take care of any messages left in queue if we are in enqueue only mode */ if(!pThis->bEnqOnly) { /* in regular mode, need look at termination */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ if(pThis->bIsDA && pThis->iQueueSize > 0) { // TODO: atomic iQueueSize! dbgprintf("IsDA queue, modifying params for draining\n"); pThis->iHighWtrMrk = 1; /* make sure we drain */ pThis->iLowWtrMrk = 0; /* disable low water mark algo */ - if(pThis->qRunsDA == QRUNS_REGULAR) { + if(pThis->bRunsDA == 0) { if(pThis->iQueueSize > 0) { - queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* initiate DA mode */ + queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ } } else { queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */ + /* worker may have been waited on low water mark, reactivate */ + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); } if(pThis->bSaveOnShutdown) { dbgprintf("bSaveOnShutdown set, eternal timeout set\n"); pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; } - /* now we need to activate workers (read doc/dev_queue.html) */ + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } - - /* wait until all pending workers are started up */ - qWrkrWaitAllWrkrStartup(pThis); - - // We need to startup a worker if we are in non-DA mode and the queue is not empty and not in enque-only mode */ - dbgprintf("Queue %p: queueDestruct probing if any regular workers need to be started, CurWrkr %d, qsize %d, qRunsDA %d\n", - pThis, pThis->iCurNumWrkThrd, pThis->iQueueSize, pThis->qRunsDA); - d_pthread_mutex_lock(pThis->mut); - dbgprintf("queueDestruct mutex locked\n"); - if(pThis->iCurNumWrkThrd == 0 && pThis->iQueueSize > 0 && !pThis->bEnqOnly) { - dbgprintf("Queue %p: queueDestruct must start regular workers!\n", pThis); - // TODO check mutex call order - doies function aquire mutex? - queueStrtNewWrkThrd(pThis); - } - d_pthread_mutex_unlock(pThis->mut); - dbgprintf("queueDestruct mutex unlocked\n"); - - /* wait again in case a new worker was started */ - qWrkrWaitAllWrkrStartup(pThis); } - /* terminate our own worker threads */ - if(pThis->pWrkThrds != NULL) { + /* at this point, the queue is either empty with all workers being idle (or deact) or the queue + * is full and all workers are running. We now need to wait for everyone to become idle. + */ + if(pThis->qType != QUEUETYPE_DIRECT) { queueShutdownWorkers(pThis); } - /* if still running DA, terminate disk queue */ - if(pThis->qRunsDA != QRUNS_REGULAR) + /* if still running DA, terminate disk queue (note that the DA queue is NULL if it was never used) */ + if(pThis->bRunsDA && pThis->pqDA != NULL) queueDestruct(&pThis->pqDA); /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) */ @@ -2118,9 +1375,10 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove } /* ... then free resources */ - if(pThis->pWrkThrds != NULL) { - free(pThis->pWrkThrds); - pThis->pWrkThrds = NULL; + if(pThis->qType != QUEUETYPE_DIRECT) { + wtpDestruct(&pThis->pWtpReg); + if(pThis->pWtpDA != NULL) + wtpDestruct(&pThis->pWtpDA); } if(pThis->pqParent == NULL) { @@ -2141,7 +1399,7 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove free(pThis); *ppThis = NULL; - return iRet; + RETiRet; } @@ -2167,7 +1425,7 @@ queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) pThis->lenFilePrefix = iLenPrefix; finalize_it: - return iRet; + RETiRet; } /* set the queue's maximum file size @@ -2187,7 +1445,7 @@ queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize) pThis->iMaxFileSize = iMaxFileSize; finalize_it: - return iRet; + RETiRet; } @@ -2203,65 +1461,64 @@ queueEnqObj(queue_t *pThis, void *pUsr) { DEFiRet; int iCancelStateSave; + int iMaxWorkers; int i; struct timespec t; - int iSeverity = 8; - rsRetVal iRetLocal; ISOBJ_TYPE_assert(pThis, queue); - /* process any pending thread requests */ - queueChkWrkThrdChanges(pThis); - +dbgprintf("Queue %p: EnqObj() 1\n", pThis); /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE * during its execution. If that is not done, race conditions occur if the * thread is canceled (most important use case is input module termination). * rgerhards, 2008-01-08 */ - if(pThis->pWrkThrds != NULL) { + if(pThis->qType != QUEUETYPE_DIRECT) { pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(pThis->mut); } - /* first check if we can discard anything */ - if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) { - iRetLocal = objGetSeverity(pUsr, &iSeverity); - if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { - dbgprintf("Queue 0x%lx: queue nearly full (%d entries), discarded severity %d message\n", - queueGetID(pThis), pThis->iQueueSize, iSeverity); - objDestruct(pUsr); - ABORT_FINALIZE(RS_RET_QUEUE_FULL); - } else { - dbgprintf("Queue 0x%lx: queue nearly full (%d entries), but could not drop msg " - "(iRet: %d, severity %d)\n", queueGetID(pThis), pThis->iQueueSize, - iRetLocal, iSeverity); - } - } + /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ + CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); +dbgprintf("Queue %p: EnqObj() 10\n", pThis); /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) CHKiRet(queueChkStrtDA(pThis)); - /* re-process any new pending thread requests and see if we need to start workers */ - queueChkAndStrtWrk(pThis); +RUNLOG_VAR("%d", pThis->bIsDA); + /* make sure at least one worker is running. */ + if(pThis->bRunsDA) { +RUNLOG; + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ + } else { + if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; + } else { + iMaxWorkers = pThis->iQueueSize / pThis->iMinMsgsPerWrkr + 1; + } +RUNLOG; + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); + } - /* and finally (try to) enqueue what is left over */ + /* wait for the queue to be ready... */ while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis)); - queueTimeoutComp(&t, pThis->toEnq); + timeoutComp(&t, pThis->toEnq); if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", queueGetID(pThis)); objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } } + + /* and finally enqueue the message */ CHKiRet(queueAdd(pThis, pUsr)); queueChkPersist(pThis); finalize_it: - /* now awake sleeping worker threads */ - if(pThis->pWrkThrds != NULL) { + if(pThis->qType != QUEUETYPE_DIRECT) { d_pthread_mutex_unlock(pThis->mut); i = pthread_cond_signal(&pThis->notEmpty); dbgprintf("Queue 0x%lx: EnqueueMsg signaled condition (%d)\n", (unsigned long) pThis, i); @@ -2269,7 +1526,7 @@ finalize_it: } - return iRet; + RETiRet; } @@ -2307,7 +1564,8 @@ queueSetEnqOnly(queue_t *pThis, int bEnqOnly) /* this means we need to terminate all workers - that's it... */ dbgprintf("Queue 0x%lx: switching to enqueue-only mode, terminating all worker threads\n", queueGetID(pThis)); - queueWrkThrdReqTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0); + wtpWakeupAllWrkr(pThis->pWtpDA); + wtpWakeupAllWrkr(pThis->pWtpReg); } else { /* switch back to regular mode */ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */ @@ -2321,7 +1579,7 @@ finalize_it: d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); } - return iRet; + RETiRet; } @@ -2360,19 +1618,18 @@ static rsRetVal queueSetProperty(queue_t *pThis, property_t *pProp) } finalize_it: - return iRet; + RETiRet; } #undef isProp - - /* Initialize the stream class. Must be called as the very first method * before anything else is called inside this class. * rgerhards, 2008-01-09 */ BEGINObjClassInit(queue, 1) - //OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty); //OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize); +//fprintf(stdout, "queueChkStopWrkrReg: %p\n", queueChkStopWrkrReg); +//fprintf(stdout, "queueChkStopWrkrDA: %p\n", queueChkStopWrkrDA); ENDObjClassInit(queue) /* |