summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c298
1 files changed, 256 insertions, 42 deletions
diff --git a/queue.c b/queue.c
index 17992473..87cea59d 100644
--- a/queue.c
+++ b/queue.c
@@ -1,3 +1,4 @@
+// TODO: remove bIsDA?
// TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in
// call consumer state. Facilitates retaining messages in queue until action could
// be called!
@@ -50,9 +51,69 @@ DEFobjStaticHelpers
/* forward-definitions */
rsRetVal queueChkPersist(queue_t *pThis);
+static void *queueWorker(void *arg);
/* methods */
+/* send a command to a specific thread
+ */
+static inline rsRetVal
+queueTellWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads);
+
+ pThis->pWrkThrds[iIdx].tCurrCmd = tCmd;
+
+ return iRet;
+}
+
+
+/* Starts a worker thread (on a specific index [i]!)
+ */
+static inline rsRetVal
+queueStrtWrkThrd(queue_t *pThis, int i)
+{
+ DEFiRet;
+ int iState;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ assert(i >= 0 && i <= pThis->iNumWorkerThreads);
+
+ queueTellWrkThrd(pThis, i, eWRKTHRDCMD_RUN);
+ iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
+ dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n",
+ (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState);
+
+ return iRet;
+}
+
+
+/* send a command to all worker threads. A start index can be
+ * given. Usually, this is 0 or 1. Thread 0 is reserved to disk-assisted
+ * mode and this start index take care of the special handling it needs to
+ * receive.
+ */
+static inline rsRetVal
+queueTellWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd)
+{
+ DEFiRet;
+ int i;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ assert(iStartIdx == 0 || iStartIdx == 1);
+
+ /* tell the workers our request */
+ for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i)
+ if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRDCMD_TERMINATED)
+ queueTellWrkThrd(pThis, i, tCmd);
+
+ return iRet;
+}
+
+
/* compute an absolute time timeout suitable for calls to pthread_cond_timedwait()
* rgerhards, 2008-01-14
*/
@@ -72,7 +133,123 @@ queueTimeoutComp(struct timespec *pt, int iTimeout)
}
-/* first, we define type-specific handlers. The provide a generic functionality,
+
+/* --------------- code for disk-assisted (DA) queue modes -------------------- */
+
+
+/* check if we run in disk-assisted mode and record that
+ * setting for easy (and quick!) access in the future. This
+ * function must only be called from constructors and only
+ * from those that support disk-assisted modes (aka memory-
+ * based queue drivers).
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueChkIsDA(queue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ if(pThis->pszFilePrefix != NULL) {
+ pThis->bIsDA = 1;
+ dbgprintf("Queue 0x%lx: is disk-assisted, disk will be used on demand\n", queueGetID(pThis));
+ } else {
+ dbgprintf("Queue 0x%lx: is NOT disk-assisted\n", queueGetID(pThis));
+ }
+
+ return iRet;
+}
+
+
+
+/* This is a special consumer to feed the disk-queue in disk-assited mode.
+ * When active, our own queue more or less acts as a memory buffer to the disk.
+ * So this consumer just needs to drain the memory queue and submit entries
+ * to the disk queue. The disk queue will then call the actual consumer from
+ * the app point of view (we chain two queues here).
+ * This function must also handle the LowWaterMark situation, at which it is
+ * switched back to in-memory queueing.
+ * rgerhards, 2008-01-14
+ */
+static inline rsRetVal
+queueDAConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ ISOBJ_assert(pUsr);
+ assert(pThis->bRunsDA);
+
+dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, pThis->iQueueSize);
+ CHKiRet(queueEnqObj(pThis->pqDA, pUsr));
+
+finalize_it:
+ return iRet;
+}
+
+
+/* check if we need to start disk assisted mode
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueChkStrtDA(queue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+ if(pThis->iQueueSize < pThis->iHighWtrMrk || pThis->bRunsDA)
+ ABORT_FINALIZE(RS_RET_OK);
+
+ dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n",
+ queueGetID(pThis), pThis->iQueueSize);
+
+ /* create message queue */
+ CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
+
+ CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
+ CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
+ CHKiRet(queueSettoQShutdown(pThis->pqDA, pThis->toQShutdown));
+ CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
+ CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
+ CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
+ CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
+
+ iRet = queueStart(pThis->pqDA);
+ /* file not found is expected, that means it is no previous QIF available */
+ if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
+ FINALIZE; /* something is wrong */
+
+ /* if we reach this point, we have a working disk queue
+ * so we now need to change our consumer to utilize it.
+ */
+ pThis->bRunsDA = 1; /* and that's all we need to do - the worker handles the rest ;) */
+
+ /* now we must start our DA worker thread and shutdown all others */
+ CHKiRet(queueStrtWrkThrd(pThis, 0));
+ CHKiRet(queueTellWrkThrds(pThis, 1, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE));
+
+ dbgprintf("Queue 0x%lx: is now running in disk assisted mode, disk queue 0x%lx\n",
+ queueGetID(pThis), queueGetID(pThis->pqDA));
+finalize_it:
+ if(iRet != RS_RET_OK) {
+ if(pThis->pqDA != NULL) {
+ queueDestruct(pThis->pqDA);
+ pThis->pqDA = NULL;
+ }
+ dbgprintf("Queue 0x%lx: error %d creating disk queue - giving up.\n",
+ queueGetID(pThis), iRet);
+ pThis->bIsDA = 0;
+ }
+
+ return iRet;
+}
+
+
+/* --------------- end code for disk-assisted queue modes -------------------- */
+
+
+/* Now, we define type-specific handlers. The provide a generic functionality,
* but for this specific type of queue. The mapping to these handlers happens during
* queue construction. Later on, handlers are called by pointers present in the
* queue instance object.
@@ -95,6 +272,8 @@ static rsRetVal qConstructFixedArray(queue_t *pThis)
pThis->tVars.farray.head = 0;
pThis->tVars.farray.tail = 0;
+ queueChkIsDA(pThis);
+
finalize_it:
return iRet;
}
@@ -150,6 +329,8 @@ static rsRetVal qConstructLinkedList(queue_t *pThis)
pThis->tVars.linklist.pRoot = 0;
pThis->tVars.linklist.pLast = 0;
+ queueChkIsDA(pThis);
+
return iRet;
}
@@ -443,7 +624,6 @@ static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__
}
-
/* --------------- end type-specific handlers -------------------- */
@@ -496,14 +676,11 @@ static rsRetVal
queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
{
DEFiRet;
- int i;
int bTimedOut;
struct timespec t;
/* first tell the workers our request */
- for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
- if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED)
- pThis->pWrkThrds[i].tCurrCmd = tShutdownCmd;
+ queueTellWrkThrds(pThis, 0, tShutdownCmd);
/* awake them... */
pthread_cond_broadcast(pThis->notEmpty);
@@ -547,8 +724,8 @@ queueWrkThrdCancel(queue_t *pThis)
pthread_cond_broadcast(pThis->notEmpty);
/* first tell the workers our request */
- for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
- if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED) {
+ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i)
+ if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRDCMD_TERMINATED) {
dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i);
pthread_cancel(pThis->pWrkThrds[i].thrdID);
}
@@ -598,16 +775,20 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
* And, most importantly, this is needed if we have an indifitite termination
* time set (timeout == 0)! -- rgerhards, 2008-01-14
*/
- for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
-dbgprintf("join thred %d\n", i);
- pthread_join(pThis->pWrkThrds[i].thrdID, NULL);
+ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
+ if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_NEVER_RAN) {
+ dbgprintf("Queue 0x%lx: joining worker thread %d\n", queueGetID(pThis), i);
+ pthread_join(pThis->pWrkThrds[i].thrdID, NULL);
+ }
}
/* as we may have cancelled a thread, clean up our internal structure. All are
* terminated now. For simplicity, we simply overwrite the states.
*/
- for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_TERMINATED;
+ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
+ if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_NEVER_RAN) {
+ pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_TERMINATED;
+ }
}
dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
@@ -617,6 +798,45 @@ dbgprintf("join thred %d\n", i);
}
+/* This is a helper for queueWorker() it either calls the configured
+ * consumer or the DA-consumer (if in disk-assisted mode). It is NOT
+ * protected by the queue mutex.
+ * rgerhards, 2008-01-14
+ */
+static inline rsRetVal
+queueWorkerCallConsumer(queue_t *pThis, int iMyThrdIndx, void *pUsr)
+{
+ DEFiRet;
+ rsRetVal iRetLocal;
+ int iSeverity;
+
+ if(pThis->bRunsDA) {
+ queueDAConsumer(pThis, iMyThrdIndx, pUsr);
+ } else {
+ /* we are running in normal, non-disk-assisted mode */
+ /* do a quick check if we need to drain the queue */
+ if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) {
+ iRetLocal = objGetSeverity(pUsr, &iSeverity);
+ if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
+ dbgprintf("Queue 0x%lx/w%d: dequeue/queue nearly full (%d entries), "
+ "discarded severity %d message\n",
+ queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, iSeverity);
+ objDestruct(pUsr);
+ }
+ } else {
+ dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n",
+ queueGetID(pThis), iMyThrdIndx);
+ iRetLocal = pThis->pConsumer(pUsr);
+ if(iRetLocal != RS_RET_OK)
+ dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n",
+ queueGetID(pThis), iMyThrdIndx, iRetLocal);
+ }
+ }
+
+ return iRet;
+}
+
+
/* Each queue has one associated worker (consumer) thread. It will pull
* the message from the queue and pass it to a user-defined function.
* This function was provided on construction. It MUST be thread-safe.
@@ -630,8 +850,6 @@ queueWorker(void *arg)
sigset_t sigSet;
int iMyThrdIndx; /* index for this thread in queue thread table */
int iCancelStateSave;
- int iSeverity;
- rsRetVal iRetLocal;
assert(pThis != NULL);
@@ -639,7 +857,7 @@ queueWorker(void *arg)
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
/* first find myself in the queue's thread table */
- for(iMyThrdIndx = 0 ; iMyThrdIndx < pThis->iNumWorkerThreads ; ++iMyThrdIndx)
+ for(iMyThrdIndx = 0 ; iMyThrdIndx <= pThis->iNumWorkerThreads ; ++iMyThrdIndx)
if(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self())
break;
assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self());
@@ -678,27 +896,11 @@ queueWorker(void *arg)
/* do actual processing (the lengthy part, runs in parallel)
* If we had a problem while dequeing, we do not call the consumer,
* but we otherwise ignore it. This is in the hopes that it will be
- * self-healing. Howerver, this is really not a good thing.
+ * self-healing. However, this is really not a good thing.
* rgerhards, 2008-01-03
*/
if(iRet == RS_RET_OK) {
- /* do a quick check if we need to drain the queue */
- if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) {
- iRetLocal = objGetSeverity(pUsr, &iSeverity);
- if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
- dbgprintf("Queue 0x%lx/w%d: dequeue/queue nearly full (%d entries), "
- "discarded severity %d message\n",
- queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, iSeverity);
- objDestruct(pUsr);
- }
- } else {
- dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n",
- queueGetID(pThis), iMyThrdIndx);
- iRetLocal = pThis->pConsumer(pUsr);
- if(iRetLocal != RS_RET_OK)
- dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n",
- queueGetID(pThis), iMyThrdIndx, iRetLocal);
- }
+ queueWorkerCallConsumer(pThis, iMyThrdIndx, pUsr);
} else {
dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things "
"may happen\n", queueGetID(pThis), iMyThrdIndx, iRet);
@@ -825,7 +1027,6 @@ finalize_it:
rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
- int iState;
int i;
assert(pThis != NULL);
@@ -837,15 +1038,15 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
pThis->iMaxFileSize);
if(pThis->qType != QUEUETYPE_DIRECT) {
- if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads, sizeof(qWrkThrd_t))) == NULL)
+ if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ /* worker 0 is reserved for disk-assisted mode */
+ queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_NEVER_RAN);
+
/* fire up the worker threads */
- for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- pThis->pWrkThrds[i].tCurrCmd = eWRKTHRDCMD_RUN;
- iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
- dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n",
- (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState);
+ for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) {
+ queueStrtWrkThrd(pThis, i);
}
}
@@ -982,6 +1183,10 @@ rsRetVal queueDestruct(queue_t *pThis)
pThis->pWrkThrds = NULL;
}
+ /* if running DA, terminate disk queue */
+ if(pThis->bRunsDA)
+ queueDestruct(pThis->pqDA);
+
/* persist the queue (we always do that - queuePersits() does cleanup it the queue is empty) */
CHKiRet_Hdlr(queuePersist(pThis)) {
dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet);
@@ -1020,6 +1225,9 @@ queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
if(pThis->pszFilePrefix != NULL)
free(pThis->pszFilePrefix);
+ if(pszPrefix == NULL) /* just unset the prefix! */
+ ABORT_FINALIZE(RS_RET_OK);
+
if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1);
@@ -1037,7 +1245,7 @@ queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize)
{
DEFiRet;
- assert(pThis != NULL);
+ ISOBJ_TYPE_assert(pThis, queue);
if(iMaxFileSize < 1024) {
ABORT_FINALIZE(RS_RET_VALUE_TOO_LOW);
@@ -1080,6 +1288,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
pthread_mutex_lock(pThis->mut);
}
+ /* first check if we can discard anything */
if(pThis->iDiscardMrk > 0 && pThis->iQueueSize >= pThis->iDiscardMrk) {
iRetLocal = objGetSeverity(pUsr, &iSeverity);
if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
@@ -1094,7 +1303,11 @@ queueEnqObj(queue_t *pThis, void *pUsr)
}
}
+ /* then check if we need to add an assistance disk queue */
+ if(pThis->bIsDA)
+ CHKiRet(queueChkStrtDA(pThis));
+ /* and finally (try to) enqueue what is left over */
while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", queueGetID(pThis));
queueTimeoutComp(&t, pThis->toEnq);
@@ -1130,6 +1343,7 @@ DEFpropSetMeth(queue, iHighWtrMrk, int);
DEFpropSetMeth(queue, iLowWtrMrk, int);
DEFpropSetMeth(queue, iDiscardMrk, int);
DEFpropSetMeth(queue, iDiscardSeverity, int);
+DEFpropSetMeth(queue, bIsDA, int);
/* This function can be used as a generic way to set properties. Only the subset