diff options
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 298 |
1 files changed, 256 insertions, 42 deletions
@@ -1,3 +1,4 @@ +// TODO: remove bIsDA? // TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in // call consumer state. Facilitates retaining messages in queue until action could // be called! @@ -50,9 +51,69 @@ DEFobjStaticHelpers /* forward-definitions */ rsRetVal queueChkPersist(queue_t *pThis); +static void *queueWorker(void *arg); /* methods */ +/* send a command to a specific thread + */ +static inline rsRetVal +queueTellWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); + + pThis->pWrkThrds[iIdx].tCurrCmd = tCmd; + + return iRet; +} + + +/* Starts a worker thread (on a specific index [i]!) + */ +static inline rsRetVal +queueStrtWrkThrd(queue_t *pThis, int i) +{ + DEFiRet; + int iState; + + ISOBJ_TYPE_assert(pThis, queue); + assert(i >= 0 && i <= pThis->iNumWorkerThreads); + + queueTellWrkThrd(pThis, i, eWRKTHRDCMD_RUN); + iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); + dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n", + (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); + + return iRet; +} + + +/* send a command to all worker threads. A start index can be + * given. Usually, this is 0 or 1. Thread 0 is reserved to disk-assisted + * mode and this start index take care of the special handling it needs to + * receive. + */ +static inline rsRetVal +queueTellWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd) +{ + DEFiRet; + int i; + + ISOBJ_TYPE_assert(pThis, queue); + assert(iStartIdx == 0 || iStartIdx == 1); + + /* tell the workers our request */ + for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i) + if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRDCMD_TERMINATED) + queueTellWrkThrd(pThis, i, tCmd); + + return iRet; +} + + /* compute an absolute time timeout suitable for calls to pthread_cond_timedwait() * rgerhards, 2008-01-14 */ @@ -72,7 +133,123 @@ queueTimeoutComp(struct timespec *pt, int iTimeout) } -/* first, we define type-specific handlers. The provide a generic functionality, + +/* --------------- code for disk-assisted (DA) queue modes -------------------- */ + + +/* check if we run in disk-assisted mode and record that + * setting for easy (and quick!) access in the future. This + * function must only be called from constructors and only + * from those that support disk-assisted modes (aka memory- + * based queue drivers). + * rgerhards, 2008-01-14 + */ +static rsRetVal +queueChkIsDA(queue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + if(pThis->pszFilePrefix != NULL) { + pThis->bIsDA = 1; + dbgprintf("Queue 0x%lx: is disk-assisted, disk will be used on demand\n", queueGetID(pThis)); + } else { + dbgprintf("Queue 0x%lx: is NOT disk-assisted\n", queueGetID(pThis)); + } + + return iRet; +} + + + +/* This is a special consumer to feed the disk-queue in disk-assited mode. + * When active, our own queue more or less acts as a memory buffer to the disk. + * So this consumer just needs to drain the memory queue and submit entries + * to the disk queue. The disk queue will then call the actual consumer from + * the app point of view (we chain two queues here). + * This function must also handle the LowWaterMark situation, at which it is + * switched back to in-memory queueing. + * rgerhards, 2008-01-14 + */ +static inline rsRetVal +queueDAConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_assert(pUsr); + assert(pThis->bRunsDA); + +dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, pThis->iQueueSize); + CHKiRet(queueEnqObj(pThis->pqDA, pUsr)); + +finalize_it: + return iRet; +} + + +/* check if we need to start disk assisted mode + * rgerhards, 2008-01-14 + */ +static rsRetVal +queueChkStrtDA(queue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + if(pThis->iQueueSize < pThis->iHighWtrMrk || pThis->bRunsDA) + ABORT_FINALIZE(RS_RET_OK); + + dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n", + queueGetID(pThis), pThis->iQueueSize); + + /* create message queue */ + CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer)); + + CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); + CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); + CHKiRet(queueSettoQShutdown(pThis->pqDA, pThis->toQShutdown)); + CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); + CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); + CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); + CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); + + iRet = queueStart(pThis->pqDA); + /* file not found is expected, that means it is no previous QIF available */ + if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) + FINALIZE; /* something is wrong */ + + /* if we reach this point, we have a working disk queue + * so we now need to change our consumer to utilize it. + */ + pThis->bRunsDA = 1; /* and that's all we need to do - the worker handles the rest ;) */ + + /* now we must start our DA worker thread and shutdown all others */ + CHKiRet(queueStrtWrkThrd(pThis, 0)); + CHKiRet(queueTellWrkThrds(pThis, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE)); + + dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n", + queueGetID(pThis), queueGetID(pThis->pqDA)); +finalize_it: + if(iRet != RS_RET_OK) { + if(pThis->pqDA != NULL) { + queueDestruct(pThis->pqDA); + pThis->pqDA = NULL; + } + dbgprintf("Queue 0x%lx: error %d creating disk queue - giving up.\n", + queueGetID(pThis), iRet); + pThis->bIsDA = 0; + } + + return iRet; +} + + +/* --------------- end code for disk-assisted queue modes -------------------- */ + + +/* Now, we define type-specific handlers. The provide a generic functionality, * but for this specific type of queue. The mapping to these handlers happens during * queue construction. Later on, handlers are called by pointers present in the * queue instance object. @@ -95,6 +272,8 @@ static rsRetVal qConstructFixedArray(queue_t *pThis) pThis->tVars.farray.head = 0; pThis->tVars.farray.tail = 0; + queueChkIsDA(pThis); + finalize_it: return iRet; } @@ -150,6 +329,8 @@ static rsRetVal qConstructLinkedList(queue_t *pThis) pThis->tVars.linklist.pRoot = 0; pThis->tVars.linklist.pLast = 0; + queueChkIsDA(pThis); + return iRet; } @@ -443,7 +624,6 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__ } - /* --------------- end type-specific handlers -------------------- */ @@ -496,14 +676,11 @@ static rsRetVal queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) { DEFiRet; - int i; int bTimedOut; struct timespec t; /* first tell the workers our request */ - for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) - if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED) - pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd; + queueTellWrkThrds(pThis, 0, tShutdownCmd); /* awake them... */ pthread_cond_broadcast(pThis->notEmpty); @@ -547,8 +724,8 @@ queueWrkThrdCancel(queue_t *pThis) pthread_cond_broadcast(pThis->notEmpty); /* first tell the workers our request */ - for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) - if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED) { + for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) + if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRDCMD_TERMINATED) { dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i); pthread_cancel(pThis->pWrkThrds[i].thrdID); } @@ -598,16 +775,20 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) * And, most importantly, this is needed if we have an indifitite termination * time set (timeout == 0)! -- rgerhards, 2008-01-14 */ - for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { -dbgprintf("join thred %d\n", i); - pthread_join(pThis->pWrkThrds[i].thrdID, NULL); + for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { + if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_NEVER_RAN) { + dbgprintf("Queue 0x%lx: joining worker thread %d\n", queueGetID(pThis), i); + pthread_join(pThis->pWrkThrds[i].thrdID, NULL); + } } /* as we may have cancelled a thread, clean up our internal structure. All are * terminated now. For simplicity, we simply overwrite the states. */ - for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_TERMINATED; + for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { + if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_NEVER_RAN) { + pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_TERMINATED; + } } dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", @@ -617,6 +798,45 @@ dbgprintf("join thred %d\n", i); } +/* This is a helper for queueWorker() it either calls the configured + * consumer or the DA-consumer (if in disk-assisted mode). It is NOT + * protected by the queue mutex. + * rgerhards, 2008-01-14 + */ +static inline rsRetVal +queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr) +{ + DEFiRet; + rsRetVal iRetLocal; + int iSeverity; + + if(pThis->bRunsDA) { + queueDAConsumer(pThis, iMyThrdIndx, pUsr); + } else { + /* we are running in normal, non-disk-assisted mode */ + /* do a quick check if we need to drain the queue */ + if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) { + iRetLocal = objGetSeverity(pUsr, &iSeverity); + if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { + dbgprintf("Queue 0x%lx/w%d: dequeue/queue nearly full (%d entries), " + "discarded severity %d message\n", + queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, iSeverity); + objDestruct(pUsr); + } + } else { + dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n", + queueGetID(pThis), iMyThrdIndx); + iRetLocal = pThis->pConsumer(pUsr); + if(iRetLocal != RS_RET_OK) + dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n", + queueGetID(pThis), iMyThrdIndx, iRetLocal); + } + } + + return iRet; +} + + /* Each queue has one associated worker (consumer) thread. It will pull * the message from the queue and pass it to a user-defined function. * This function was provided on construction. It MUST be thread-safe. @@ -630,8 +850,6 @@ queueWorker(void *arg) sigset_t sigSet; int iMyThrdIndx; /* index for this thread in queue thread table */ int iCancelStateSave; - int iSeverity; - rsRetVal iRetLocal; assert(pThis != NULL); @@ -639,7 +857,7 @@ queueWorker(void *arg) pthread_sigmask(SIG_BLOCK, &sigSet, NULL); /* first find myself in the queue's thread table */ - for(iMyThrdIndx = 0 ; iMyThrdIndx < pThis->iNumWorkerThreads ; ++iMyThrdIndx) + for(iMyThrdIndx = 0 ; iMyThrdIndx <= pThis->iNumWorkerThreads ; ++iMyThrdIndx) if(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()) break; assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()); @@ -678,27 +896,11 @@ queueWorker(void *arg) /* do actual processing (the lengthy part, runs in parallel) * If we had a problem while dequeing, we do not call the consumer, * but we otherwise ignore it. This is in the hopes that it will be - * self-healing. Howerver, this is really not a good thing. + * self-healing. However, this is really not a good thing. * rgerhards, 2008-01-03 */ if(iRet == RS_RET_OK) { - /* do a quick check if we need to drain the queue */ - if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) { - iRetLocal = objGetSeverity(pUsr, &iSeverity); - if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { - dbgprintf("Queue 0x%lx/w%d: dequeue/queue nearly full (%d entries), " - "discarded severity %d message\n", - queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, iSeverity); - objDestruct(pUsr); - } - } else { - dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n", - queueGetID(pThis), iMyThrdIndx); - iRetLocal = pThis->pConsumer(pUsr); - if(iRetLocal != RS_RET_OK) - dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n", - queueGetID(pThis), iMyThrdIndx, iRetLocal); - } + queueWorkerCallConsumer(pThis, iMyThrdIndx, pUsr); } else { dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things " "may happen\n", queueGetID(pThis), iMyThrdIndx, iRet); @@ -825,7 +1027,6 @@ finalize_it: rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; - int iState; int i; assert(pThis != NULL); @@ -837,15 +1038,15 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ pThis->iMaxFileSize); if(pThis->qType != QUEUETYPE_DIRECT) { - if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads, sizeof(qWrkThrd_t))) == NULL) + if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + /* worker 0 is reserved for disk-assisted mode */ + queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_NEVER_RAN); + /* fire up the worker threads */ - for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_RUN; - iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); - dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n", - (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); + for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) { + queueStrtWrkThrd(pThis, i); } } @@ -982,6 +1183,10 @@ rsRetVal queueDestruct(queue_t *pThis) pThis->pWrkThrds = NULL; } + /* if running DA, terminate disk queue */ + if(pThis->bRunsDA) + queueDestruct(pThis->pqDA); + /* persist the queue (we always do that - queuePersits() does cleanup it the queue is empty) */ CHKiRet_Hdlr(queuePersist(pThis)) { dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet); @@ -1020,6 +1225,9 @@ queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) if(pThis->pszFilePrefix != NULL) free(pThis->pszFilePrefix); + if(pszPrefix == NULL) /* just unset the prefix! */ + ABORT_FINALIZE(RS_RET_OK); + if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); @@ -1037,7 +1245,7 @@ queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize) { DEFiRet; - assert(pThis != NULL); + ISOBJ_TYPE_assert(pThis, queue); if(iMaxFileSize < 1024) { ABORT_FINALIZE(RS_RET_VALUE_TOO_LOW); @@ -1080,6 +1288,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) pthread_mutex_lock(pThis->mut); } + /* first check if we can discard anything */ if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) { iRetLocal = objGetSeverity(pUsr, &iSeverity); if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { @@ -1094,7 +1303,11 @@ queueEnqObj(queue_t *pThis, void *pUsr) } } + /* then check if we need to add an assistance disk queue */ + if(pThis->bIsDA) + CHKiRet(queueChkStrtDA(pThis)); + /* and finally (try to) enqueue what is left over */ while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) { dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis)); queueTimeoutComp(&t, pThis->toEnq); @@ -1130,6 +1343,7 @@ DEFpropSetMeth(queue, iHighWtrMrk, int); DEFpropSetMeth(queue, iLowWtrMrk, int); DEFpropSetMeth(queue, iDiscardMrk, int); DEFpropSetMeth(queue, iDiscardSeverity, int); +DEFpropSetMeth(queue, bIsDA, int); /* This function can be used as a generic way to set properties. Only the subset |