summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-17 12:45:10 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-17 12:45:10 +0000
commited0363210c34002e5cfbab553506573f5b8a13a5 (patch)
tree518ce37551ddb1803b3f8d0ced8599b8b04cb984 /queue.c
parent6b8b242250123d6c3105b48cde831ef749c88647 (diff)
downloadrsyslog-ed0363210c34002e5cfbab553506573f5b8a13a5.tar.gz
rsyslog-ed0363210c34002e5cfbab553506573f5b8a13a5.tar.xz
rsyslog-ed0363210c34002e5cfbab553506573f5b8a13a5.zip
worked on threading
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c381
1 files changed, 294 insertions, 87 deletions
diff --git a/queue.c b/queue.c
index 9003b344..2b241d82 100644
--- a/queue.c
+++ b/queue.c
@@ -1,6 +1,6 @@
- // DA-input only
+// TODO: start up the correct num of workers when switching to non-DA mode
// TODO: "preforked" worker threads
-// TODO: do an if(debug) in dbgrintf - performanc ein release build!
+// TODO: do an if(debug) in dbgrintf - performance in release build!
// TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in
// call consumer state. Facilitates retaining messages in queue until action could
// be called!
@@ -56,9 +56,61 @@ rsRetVal queueChkPersist(queue_t *pThis);
static void *queueWorker(void *arg);
static rsRetVal queueGetQueueSize(queue_t *pThis, int *piQueueSize);
static rsRetVal queueChkWrkThrdChanges(queue_t *pThis);
+static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly);
/* methods */
+/* get the current worker state. For simplicity and speed, we have
+ * NOT used our regular calling interface this time. I hope that won't
+ * bite in the long term... -- rgerhards, 2008-01-17
+ */
+static inline qWrkCmd_t
+qWrkrGetState(qWrkThrd_t *pThis)
+{
+ assert(pThis != NULL);
+ return pThis->tCurrCmd;
+}
+
+
+/* send a command to a specific thread
+ */
+static rsRetVal
+qWrkrSetState(qWrkThrd_t *pThis, qWrkCmd_t tCmd)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+ dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis->pQueue), tCmd, pThis->iThrd);
+
+ /* change some admin structures */
+ switch(tCmd) {
+ case eWRKTHRD_TERMINATING:
+ pthread_cond_destroy(&pThis->condInitDone);
+ dbgprintf("Queue 0x%lx/w%d: thread terminating with %d entries left in queue, %d workers running.\n",
+ queueGetID(pThis->pQueue), pThis->iThrd, pThis->pQueue->iQueueSize,
+ pThis->pQueue->iCurNumWrkThrd);
+ break;
+ case eWRKTHRD_RUN_CREATED:
+ pthread_cond_init(&pThis->condInitDone, NULL);
+ break;
+ case eWRKTHRD_RUN_INIT:
+ break;
+ case eWRKTHRD_RUNNING:
+ pthread_cond_signal(&pThis->condInitDone);
+ break;
+ /* these cases just to satisfy the compiler, we do (yet) not act an them: */
+ case eWRKTHRD_STOPPED:
+ case eWRKTHRD_SHUTDOWN:
+ case eWRKTHRD_SHUTDOWN_IMMEDIATE:
+ /* DO NOTHING */
+ break;
+ }
+
+ pThis->tCurrCmd = tCmd;
+
+ return iRet;
+}
+
/* send a command to a specific active thread. If the thread is not
* active, the command is not sent.
*/
@@ -70,9 +122,9 @@ queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
ISOBJ_TYPE_assert(pThis, queue);
assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads);
- if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_INIT) {
+ if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_CREATED) {
dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx);
- pThis->pWrkThrds[iIdx].tCurrCmd = tCmd;
+ qWrkrSetState(&pThis->pWrkThrds[iIdx], tCmd);
} else {
dbgprintf("Queue 0x%lx: command %d NOT sent to inactive thread %d\n", queueGetID(pThis), tCmd, iIdx);
}
@@ -80,21 +132,60 @@ queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
return iRet;
}
-/* send a command to a specific thread
- * TODO: check if we can run into trouble with inactive threads
+
+/* Finalize construction of a wWrkrThrd_t "object"
+ * rgerhards, 2008-01-17
*/
static inline rsRetVal
-queueTellWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
+qWrkrConstructFinalize(qWrkThrd_t *pThis, queue_t *pQueue, int i)
{
- DEFiRet;
+ assert(pThis != NULL);
+ ISOBJ_TYPE_assert(pQueue, queue);
- dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx);
- ISOBJ_TYPE_assert(pThis, queue);
- assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads);
+ dbgprintf("Queue 0x%lx: finalizing construction of worker %d instance data\n", queueGetID(pQueue), i);
- pThis->pWrkThrds[iIdx].tCurrCmd = tCmd;
+ /* initialize our thread instance descriptor */
+ pThis = pQueue->pWrkThrds + i;
+ pThis->pQueue = pQueue;
+ pThis->iThrd = i;
+ pThis->pUsr = NULL;
- return iRet;
+ qWrkrSetState(pThis, eWRKTHRD_STOPPED);
+
+ return RS_RET_OK;
+}
+
+
+/* initialize the qWrkThrd_t structure - this MUST be called right after
+ * startup of a worker thread. -- rgerhards, 2008-01-17
+ */
+static inline rsRetVal
+qWrkrInit(qWrkThrd_t **ppThis, queue_t *pQueue)
+{
+ qWrkThrd_t *pThis;
+ int i;
+
+ assert(ppThis != NULL);
+ ISOBJ_TYPE_assert(pQueue, queue);
+
+ /* find myself in the queue's thread table */
+ for(i = 0 ; i <= pQueue->iNumWorkerThreads ; ++i)
+ if(pQueue->pWrkThrds[i].thrdID == pthread_self())
+ break;
+dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pQueue,
+ (unsigned) pQueue->pWrkThrds[i].thrdID, i, (unsigned) pthread_self());
+ assert(pQueue->pWrkThrds[i].thrdID == pthread_self());
+
+ /* initialize our thread instance descriptor */
+ pThis = pQueue->pWrkThrds + i;
+ pThis->pQueue = pQueue;
+ pThis->iThrd = i;
+ pThis->pUsr = NULL;
+
+ *ppThis = pThis;
+ qWrkrSetState(pThis, eWRKTHRD_RUN_INIT);
+
+ return RS_RET_OK;
}
@@ -112,9 +203,9 @@ queueJoinWrkThrd(queue_t *pThis, int iIdx)
dbgprintf("Queue 0x%lx: thread %d state %d, waiting for exit\n", queueGetID(pThis), iIdx,
pThis->pWrkThrds[iIdx].tCurrCmd);
pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL);
- pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRD_STOPPED; /* back to virgin... */
+ qWrkrSetState(&pThis->pWrkThrds[iIdx], eWRKTHRD_STOPPED); /* back to virgin... */
pThis->pWrkThrds[iIdx].thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
- dbgprintf("Queue 0x%lx: thread %d state %d, has exited\n", queueGetID(pThis), iIdx,
+ dbgprintf("Queue 0x%lx: thread %d state %d, has stopped\n", queueGetID(pThis), iIdx,
pThis->pWrkThrds[iIdx].tCurrCmd);
return iRet;
@@ -131,9 +222,9 @@ queueStrtWrkThrd(queue_t *pThis, int i)
ISOBJ_TYPE_assert(pThis, queue);
assert(i >= 0 && i <= pThis->iNumWorkerThreads);
- assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRD_RUN_INIT);
+ assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRD_RUN_CREATED);
- queueTellWrkThrd(pThis, i, eWRKTHRD_RUN_INIT);
+ qWrkrSetState(&pThis->pWrkThrds[i], eWRKTHRD_RUN_CREATED);
iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
dbgprintf("Queue 0x%lx: starting Worker thread %x, index %d with state %d.\n",
(unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState);
@@ -164,7 +255,7 @@ queueStrtNewWrkThrd(queue_t *pThis)
dbgprintf("Queue %p: search thrd tbl slot: i %d, CuccCmd %d\n", pThis, i, pThis->pWrkThrds[i].tCurrCmd);
if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_STOPPED) {
break;
- } else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_INIT) {
+ } else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_CREATED) {
iStartingUp = i;
break;
}
@@ -176,7 +267,7 @@ dbgprintf("Queue %p: after thrd search: i %d, iStartingUp %d\n", pThis, i, iStar
assert(i <= pThis->iNumWorkerThreads); /* now there must be a free spot, else something is really wrong! */
- queueTellWrkThrd(pThis, i, eWRKTHRD_RUN_INIT);
+ qWrkrSetState(&pThis->pWrkThrds[i], eWRKTHRD_RUN_CREATED);
iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n",
(unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState);
@@ -206,7 +297,7 @@ queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd)
/* tell the workers our request */
for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i)
- if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATED)
+ if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING)
queueTellActWrkThrd(pThis, i, tCmd);
return iRet;
@@ -271,7 +362,7 @@ queueChkAndStrtWrk(queue_t *pThis)
queueChkWrkThrdChanges(pThis);
/* check if we need to start up another worker (only in regular mode) */
- if(pThis->qRunsDA == QRUNS_REGULAR) {
+ if(pThis->qRunsDA == QRUNS_REGULAR && pThis->bEnqOnly == 0) {
if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) {
dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qRunsDA: %d\n",
pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA);
@@ -320,14 +411,14 @@ queueTurnOffDAMode(queue_t *pThis)
* messages come into the queue, we may be well off with a single worker.
* rgerhards, 2008-01-16
*/
- queueStrtNewWrkThrd(pThis);
+ if(pThis->bEnqOnly == 0)
+ queueStrtNewWrkThrd(pThis);
pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
* this will be quick.
*/
- queueDestruct(pThis->pqDA); /* and now we are ready to destruct the DA queue */
- pThis->pqDA = NULL;
+ queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
/* now free the remaining resources */
pthread_mutex_destroy(&pThis->mutDA);
@@ -358,11 +449,12 @@ queueChkWrkThrdChanges(queue_t *pThis)
/* go through all threads (including DA thread) */
for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
switch(pThis->pWrkThrds[i].tCurrCmd) {
- case eWRKTHRD_TERMINATED:
+ case eWRKTHRD_TERMINATING:
queueJoinWrkThrd(pThis, i);
break;
/* these cases just to satisfy the compiler, we do not act an them: */
case eWRKTHRD_STOPPED:
+ case eWRKTHRD_RUN_CREATED:
case eWRKTHRD_RUN_INIT:
case eWRKTHRD_RUNNING:
case eWRKTHRD_SHUTDOWN:
@@ -428,7 +520,7 @@ dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx,
* Note that the child queue now in almost all cases is non-empty, because we just enqueued
* a message.
*/
- if(iQueueSize <= pThis->iLowWtrMrk && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) {
+ if(iQueueSize <= pThis->iLowWtrMrk && iQueueSize != 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) {
dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n",
queueGetID(pThis), iMyThrdIndx, iQueueSize);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
@@ -494,6 +586,7 @@ queueStrtDA(queue_t *pThis)
CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
+ CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly));
CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
if(pThis->toQShutdown == 0) {
@@ -536,8 +629,7 @@ queueStrtDA(queue_t *pThis)
finalize_it:
if(iRet != RS_RET_OK) {
if(pThis->pqDA != NULL) {
- queueDestruct(pThis->pqDA);
- pThis->pqDA = NULL;
+ queueDestruct(&pThis->pqDA);
}
dbgprintf("Queue 0x%lx: error %d creating disk queue - giving up.\n",
queueGetID(pThis), iRet);
@@ -549,19 +641,24 @@ finalize_it:
/* initiate DA mode
+ * param bEnqOnly tells if the disk queue is to be run in enqueue-only mode. This may
+ * be needed during shutdown of memory queues which need to be persisted to disk.
* rgerhards, 2008-01-16
*/
static inline rsRetVal
-queueInitDA(queue_t *pThis)
+queueInitDA(queue_t *pThis, int bEnqOnly)
{
DEFiRet;
/* indicate we now run in DA mode - this is reset by the DA worker if it fails */
pThis->qRunsDA = QRUNS_DA_INIT;
+ pThis->bDAEnqOnly = bEnqOnly;
- /* now we must start our DA worker thread - it does the rest of the initialization */
- // DA-input only mode!
- iRet = queueStrtWrkThrd(pThis, 0);
+ /* now we must start our DA worker thread - it does the rest of the initialization
+ * In enqueue-only mode, we do not start any workers.
+ */
+ if(pThis->bEnqOnly == 0)
+ iRet = queueStrtWrkThrd(pThis, 0);
return iRet;
}
@@ -606,7 +703,7 @@ queueChkStrtDA(queue_t *pThis)
dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n",
queueGetID(pThis), pThis->iQueueSize);
- queueInitDA(pThis); /* initiate DA mode */
+ queueInitDA(pThis, QUEUE_MODE_ENQDEQ); /* initiate DA mode */
}
finalize_it:
@@ -800,7 +897,6 @@ queueHaveQIF(queue_t *pThis)
(char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
/* check if the file exists */
-dbgprintf("stat HaveQIF '%s'\n", pszQIFNam);
if(stat((char*) pszQIFNam, &stat_buf) == -1) {
if(errno == ENOENT) {
dbgprintf("Queue 0x%lx: no .qi file found\n", queueGetID(pThis));
@@ -874,7 +970,7 @@ queueTryLoadPersistedInfo(queue_t *pThis)
finalize_it:
if(psQIF != NULL)
- strmDestruct(psQIF);
+ strmDestruct(&psQIF);
if(iRet != RS_RET_OK) {
dbgprintf("Queue 0x%lx: error %d reading .qi file - can not read persisted info (if any)\n",
@@ -949,8 +1045,8 @@ static rsRetVal qDestructDisk(queue_t *pThis)
assert(pThis != NULL);
- strmDestruct(pThis->tVars.disk.pWrite);
- strmDestruct(pThis->tVars.disk.pRead);
+ strmDestruct(&pThis->tVars.disk.pWrite);
+ strmDestruct(&pThis->tVars.disk.pRead);
if(pThis->pszSpoolDir != NULL)
free(pThis->pszSpoolDir);
@@ -1068,7 +1164,6 @@ queueWrkThrdReqTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int bIncludeDAWrk)
{
DEFiRet;
- // DA-input only
if(bIncludeDAWrk) {
queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */
queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
@@ -1093,13 +1188,19 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
struct timespec t;
queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */
+dbgprintf("WrkThrdTrm 0\n");
queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
+ /* race: must make sure all are running! */
+dbgprintf("WrkThrdTrm 1\n");
queueTimeoutComp(&t, iTimeout);/* get timeout */
+dbgprintf("WrkThrdTrm 2\n");
/* and wait for their termination */
pthread_mutex_lock(pThis->mut);
bTimedOut = 0;
+dbgprintf("WrkThrdTrm 3, thrds: %d\n", pThis->iCurNumWrkThrd);
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
+dbgprintf("WrkThrdTrm 4 to %d\n", bTimedOut);
dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n",
queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd);
@@ -1128,12 +1229,15 @@ queueWrkThrdCancel(queue_t *pThis)
// TODO: we need to implement peek(), without it (today!) we lose one message upon
// worker cancellation! -- rgerhards, 2008-01-14
+ /* process any pending thread requests so that we know who actually is still running */
+ queueChkWrkThrdChanges(pThis);
+
/* awake the workers one more time, just to be sure */
queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
/* first tell the workers our request */
for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i)
- if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATED) {
+ if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) {
dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i);
pthread_cancel(pThis->pWrkThrds[i].thrdID);
}
@@ -1196,14 +1300,14 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
}
-/* This is a helper for queueWorker() it either calls the configured
+/* This is a helper for queueWorker () it either calls the configured
* consumer or the DA-consumer (if in disk-assisted mode). It is
* protected by the queue mutex, but MUST release it as soon as possible.
* Most importantly, it must release it before the consumer is called.
* rgerhards, 2008-01-14
*/
static inline rsRetVal
-queueWorkerChkAndCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave)
+queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancelStateSave)
{
DEFiRet;
rsRetVal iRetLocal;
@@ -1211,7 +1315,12 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateS
int iQueueSize;
void *pUsr;
int qRunsDA;
+ int iMyThrdIndx;
+ ISOBJ_TYPE_assert(pThis, queue);
+ assert(pWrkrInst != NULL);
+
+ iMyThrdIndx = pWrkrInst->iThrd;
/* first check if we have still something to process */
if(pThis->iQueueSize == 0 ||
@@ -1228,6 +1337,7 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateS
queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
qRunsDA = pThis->qRunsDA; /* do a local copy so that we prevent a race after mutex release */
iQueueSize = pThis->iQueueSize; /* ... and the same for this property */
+ pWrkrInst->pUsr = pUsr; /* save it for the cancel cleanup handler */
pthread_mutex_unlock(pThis->mut);
pthread_cond_signal(pThis->notFull);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -1274,6 +1384,34 @@ dbgprintf("CallConsumer returns %d\n", iRet);
}
+
+/* cancellation cleanup handler for queueWorker ()
+ * Updates admin structure and frees ressources.
+ * rgerhards, 2008-01-16
+ */
+static void queueWorkerCancelCleanup(void *arg)
+{
+ qWrkThrd_t *pWrkrInst = (qWrkThrd_t*) arg;
+ queue_t *pThis;
+
+ assert(pWrkrInst != NULL);
+ ISOBJ_TYPE_assert(pWrkrInst->pQueue, queue);
+ pThis = pWrkrInst->pQueue;
+
+ dbgprintf("Queue 0x%lx/w%d: cancelation cleanup handler called (NOT FULLY IMPLEMENTED, one msgs lost!)\n",
+ queueGetID(pThis), pWrkrInst->iThrd);
+
+ pThis->iCurNumWrkThrd--; /* one worker less... */
+ pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
+ qWrkrSetState(&pThis->pWrkThrds[pWrkrInst->iThrd], eWRKTHRD_TERMINATING);
+ pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
+
+ /* TODO: re-enqueue the data element! */
+ dbgprintf("Queue 0x%lx/w%d: thread CANCELED with %d entries left in queue, %d workers running.\n",
+ queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize, pThis->iCurNumWrkThrd);
+}
+
+
/* Each queue has at least one associated worker (consumer) thread. It will pull
* the message from the queue and pass it to a user-defined function.
* This function was provided on construction. It MUST be thread-safe.
@@ -1290,6 +1428,7 @@ queueWorker(void *arg)
struct timespec t;
int iMyThrdIndx; /* index for this thread in queue thread table */
int iCancelStateSave;
+ qWrkThrd_t *pWrkrInst; /* for cleanup handler */
ISOBJ_TYPE_assert(pThis, queue);
@@ -1300,17 +1439,13 @@ queueWorker(void *arg)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
- /* first find myself in the queue's thread table */
- for(iMyThrdIndx = 0 ; iMyThrdIndx <= pThis->iNumWorkerThreads ; ++iMyThrdIndx)
- if(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self())
- break;
-dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis,
- (unsigned) pThis->pWrkThrds[iMyThrdIndx].thrdID, iMyThrdIndx, (unsigned) pthread_self());
- assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self());
+ /* initialize our thread instance descriptor */
+ qWrkrInit(&pWrkrInst, pThis);
- dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx);
+ iMyThrdIndx = pWrkrInst->iThrd;
pThis->iCurNumWrkThrd++; /* tell the world there is one more worker */
+ dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx);
if(iMyThrdIndx == 0) { /* are we the DA worker? */
if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */
@@ -1323,16 +1458,18 @@ dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis,
* because someone may have requested us to shut down even before we got a chance to do
* our init. That would be a bad race... -- rgerhards, 2008-01-16
*/
- if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUN_INIT)
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_RUNNING; /* we are running now! */
+ if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT)
+ qWrkrSetState(pWrkrInst, eWRKTHRD_RUNNING); /* we are running now! */
+
+ pthread_cleanup_push(queueWorkerCancelCleanup, pWrkrInst);
pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
/* end one-time stuff */
/* now we have our identity, on to real processing */
- while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING
- || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN && pThis->iQueueSize > 0)) {
+ while( (qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING)
+ || (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN && pThis->iQueueSize > 0)) {
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
@@ -1340,7 +1477,7 @@ dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis,
queueChkWrkThrdChanges(pThis);
dbgprintf("Queue %p/w%d: pre empty queue, qsize %d\n", pThis, iMyThrdIndx, pThis->iQueueSize);
- while(pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) {
+ while(pThis->iQueueSize == 0 && qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING) {
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
queueGetID(pThis), iMyThrdIndx);
if(pThis->bSignalOnEmpty > 0) {
@@ -1377,13 +1514,13 @@ dbgprintf("worker never times out!\n");
/* we use SHUTDOWN (and not SHUTDOWN_IMMEDIATE) so that the worker
* does not terminate if in the mean time a new message arrived.
*/
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_SHUTDOWN;
+ qWrkrSetState(pWrkrInst, eWRKTHRD_SHUTDOWN);
}
}
dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx);
}
- queueWorkerChkAndCallConsumer(pThis, iMyThrdIndx, iCancelStateSave);
+ queueWorkerChkAndCallConsumer(pThis, pWrkrInst, iCancelStateSave);
/* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
* a cancellation point in itself. As we run most of the time without cancel enabled, I fear
@@ -1406,7 +1543,7 @@ dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx);
pthread_yield();
dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n",
queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd);
- if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0)
+ if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0)
dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has "
" %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
}
@@ -1415,17 +1552,21 @@ dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n",
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
pThis->iCurNumWrkThrd--;
- if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN ||
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN_IMMEDIATE) {
+ pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
+ pthread_cleanup_pop(0); /* remove cleanup handler */
+
+ /* if we ever need finalize_it, here would be the place for it! */
+ if(qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN ||
+ qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN_IMMEDIATE ||
+ qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT ||
+ qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_CREATED) {
/* in shutdown case, we need to flag termination. All other commands
* have a meaning to the thread harvester, so we can not overwrite them
*/
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_TERMINATED;
+dbgprintf("Queue 0x%lx/w%d: setting termination state\n", queueGetID(pThis), iMyThrdIndx);
+ qWrkrSetState(pWrkrInst, eWRKTHRD_TERMINATING);
}
pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
- pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
- dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n",
- queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd);
pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -1517,20 +1658,27 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
DEFiRet;
rsRetVal iRetLocal;
int bInitialized = 0; /* is queue already initialized? */
+ int i;
assert(pThis != NULL);
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
- dbgprintf("Queue 0x%lx: type %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis), pThis->qType,
- pThis->bIsDA, pThis->iMaxFileSize);
+ dbgprintf("Queue 0x%lx: type %d, enq-only %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis),
+ pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
+ /* initialize worker thread instances
+ * TODO: move to separate function
+ */
if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ for(i = 0 ; i < pThis->iNumWorkerThreads + 1 ; ++i) {
+ qWrkrConstructFinalize(&pThis->pWrkThrds[i], pThis, i);
+ }
if(pThis->bIsDA) {
/* If we are disk-assisted, we need to check if there is a QIF file
@@ -1541,7 +1689,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n",
queueGetID(pThis));
- queueInitDA(pThis); /* initiate DA mode */
+ queueInitDA(pThis, QUEUE_MODE_ENQDEQ); /* initiate DA mode */
bInitialized = 1; /* we are done */
} else {
// TODO: use logerror? -- rgerhards, 2008-01-16
@@ -1552,12 +1700,10 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
if(!bInitialized) {
dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis));
- /* worker 0 is reserved for disk-assisted mode, so do not start */
- queueTellWrkThrd(pThis, 0, eWRKTHRD_STOPPED);
-
/* fire up the worker threads */
// TODO: preforked workers! queueStrtAllWrkThrds(pThis);
}
+ pThis->bQueueStarted = 1;
finalize_it:
return iRet;
@@ -1634,7 +1780,7 @@ static rsRetVal queuePersist(queue_t *pThis)
finalize_it:
if(psQIF != NULL)
- strmDestruct(psQIF);
+ strmDestruct(&psQIF);
return iRet;
}
@@ -1661,44 +1807,60 @@ rsRetVal queueChkPersist(queue_t *pThis)
/* destructor for the queue object */
-rsRetVal queueDestruct(queue_t *pThis)
+rsRetVal queueDestruct(queue_t **ppThis)
{
+ queue_t *pThis;
DEFiRet;
- assert(pThis != NULL);
+ assert(ppThis != NULL);
+ pThis = *ppThis;
+ ISOBJ_TYPE_assert(pThis, queue);
- /* if running DA, tell the DA workers to shut down. This saves us some CPU cycles which
- * we can use to persist the remaining in-memory data to disk quicker. -- rgerhads, 2008-01-16
- * TODO: we actually need to change the queue to an "input-only" mode, that also prevents
- * startup of the thread again further down in the process. None of that really hurts, so we
- * leave it for the time being. -- rgerhards, 2008-01-16
+pThis->bSaveOnShutdown = 1; // TODO: Test remove
+ /* if running DA, switch the DA queue to enqueue-only mode. That saves us some CPU cycles as
+ * its workers do no longer need to run. It also prevents longer-running actions to spring into
+ * existence while we are draining the main (memory) queue. -- rgerhads, 2008-01-16
*/
- if(pThis->qRunsDA != QRUNS_REGULAR)
- queueWrkThrdReqTrm(pThis->pqDA, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0);
- // DA-input only
+ if(pThis->qRunsDA != QRUNS_REGULAR) {
+ queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */
+ if(pThis->bSaveOnShutdown)
+ pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL;
+ }
/* then, terminate our own worker threads */
if(pThis->pWrkThrds != NULL) {
queueShutdownWorkers(pThis);
- free(pThis->pWrkThrds);
- pThis->pWrkThrds = NULL;
}
- /* of we have now data left in in-memory queues, this data will be lost if we do not
- * persist it to a disk queue.
- * TODO: implement code rgerhards, 2008-01-16
+ /* If we currently run in DA mode, the in-memory queue is already persisted to disk.
+ * If we are not in DA mode, we may have data left in in-memory queues, this data will
+ * be lost if we do not persist it to a disk queue. So, if configured to do so, we will
+ * now start DA mode just to drain our queue. -- rgerhards, 2008-01-16
+ * TODO: move to persist function!
*/
+ if(pThis->iQueueSize > 0 && pThis->bSaveOnShutdown && pThis->bIsDA) {
+ dbgprintf("Queue 0x%lx: in-memory queue contains %d entries after worker shutdown - using DA to save to disk\n",
+ queueGetID(pThis), pThis->iQueueSize);
+ pThis->iLowWtrMrk = 0; /* disable low water mark algo */
+ queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* start DA queue in enqueue-only mode */
+ pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL;
+ queueShutdownWorkers(pThis);
+ }
/* if running DA, terminate disk queue */
if(pThis->qRunsDA != QRUNS_REGULAR)
- queueDestruct(pThis->pqDA);
+ queueDestruct(&pThis->pqDA);
- /* persist the queue (we always do that - queuePersits() does cleanup it the queue is empty) */
+ /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) */
CHKiRet_Hdlr(queuePersist(pThis)) {
dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet);
}
/* ... then free resources */
+ if(pThis->pWrkThrds != NULL) {
+ free(pThis->pWrkThrds);
+ pThis->pWrkThrds = NULL;
+ }
pthread_mutex_destroy(pThis->mut);
free(pThis->mut);
pthread_cond_destroy(pThis->notFull);
@@ -1713,6 +1875,7 @@ rsRetVal queueDestruct(queue_t *pThis)
/* and finally delete the queue objet itself */
free(pThis);
+ *ppThis = NULL;
return iRet;
}
@@ -1845,6 +2008,50 @@ finalize_it:
return iRet;
}
+
+/* set queue mode to enqueue only or not
+ * rgerhards, 2008-01-16
+ */
+static rsRetVal
+queueSetEnqOnly(queue_t *pThis, int bEnqOnly)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+ /* for simplicity, we do one big mutex lock. This method is extremely seldom
+ * called, so that doesn't matter... -- rgerhards, 2008-01-16
+ */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ pthread_mutex_lock(pThis->mut);
+
+ if(bEnqOnly == pThis->bEnqOnly)
+ FINALIZE; /* no change, nothing to do */
+
+ if(pThis->bQueueStarted) {
+ /* we need to adjust queue operation only if we are not during initial param setup */
+ if(bEnqOnly == 1) {
+ /* switch to enqueue-only mode */
+ /* this means we need to terminate all workers - that's it... */
+ dbgprintf("Queue 0x%lx: switching to enqueue-only mode, terminating all worker threads\n",
+ queueGetID(pThis));
+ queueWrkThrdReqTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0);
+ } else {
+ /* switch back to regular mode */
+ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */
+ }
+ }
+
+ pThis->bEnqOnly = bEnqOnly;
+
+finalize_it:
+ pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ return iRet;
+}
+
+
/* some simple object access methods */
DEFpropSetMeth(queue, iPersistUpdCnt, int);
DEFpropSetMeth(queue, toQShutdown, long);