summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-28 11:35:33 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-28 11:35:33 +0000
commit94bfc28855393a1a688aa5fdc3339b9e2139e10a (patch)
treed253acc9e48aa6193046920c8d08401289197e96 /queue.c
parentba94662f209ccf17798d706eb4dc5df19360c7e1 (diff)
downloadrsyslog-94bfc28855393a1a688aa5fdc3339b9e2139e10a.tar.gz
rsyslog-94bfc28855393a1a688aa5fdc3339b9e2139e10a.tar.xz
rsyslog-94bfc28855393a1a688aa5fdc3339b9e2139e10a.zip
cleanup to prepare for release
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c84
1 files changed, 29 insertions, 55 deletions
diff --git a/queue.c b/queue.c
index 0e0a5a83..fa682765 100644
--- a/queue.c
+++ b/queue.c
@@ -1,13 +1,3 @@
-// TODO: DA worker must not wait eternal on shutdown when in enqueue only mode!
-// TODO: we need to implement peek(), without it (today!) we lose one message upon
-// worker cancellation! -- rgerhards, 2008-01-14
-// TODO: think about mutDA - I think it's no longer needed
-// TODO: start up the correct num of workers when switching to non-DA mode
-// TODO: "preforked" worker threads
-// TODO: do an if(debug) in dbgrintf - performance in release build!
-// TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in
-// call consumer state. Facilitates retaining messages in queue until action could
-// be called!
/* queue.c
*
* This file implements the queue object and its several queueing methods.
@@ -218,9 +208,6 @@ queueStartDA(queue_t *pThis)
if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */
FINALIZE; /* ... then we are already done! */
- /* set up sync objects */
- pthread_mutex_init(&pThis->mutDA, NULL);
-
/* create message queue */
dbgprintf("Queue %p: queueSTrtDA pre child queue construct,\n", pThis);
CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
@@ -309,12 +296,12 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex)
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx:DA", (unsigned long) pThis);
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, queueChkStopWrkrDA));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, queueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, queueConsumerDA));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, queueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, queueStartDA));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, queueTurnOffDAMode));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) queueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueStartDA));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueTurnOffDAMode));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
@@ -564,7 +551,7 @@ queueHaveQIF(queue_t *pThis)
ISOBJ_TYPE_assert(pThis, queue);
if(pThis->pszFilePrefix == NULL)
- ABORT_FINALIZE(RS_RET_ERR); // TODO: change code!
+ ABORT_FINALIZE(RS_RET_NO_FILEPREFIX);
/* Construct file name */
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
@@ -921,17 +908,16 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
* set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate as soon as its consumer
* is done. This is especially important as we otherwise may interfere with queue order while the
* DA consumer is running. -- rgerhards, 2008-01-27
+ * Note: there was a note that we should not wait eternally on the DA worker if we run in
+ * enqueue-only note. I have reviewed the code and think there is no need for this check. Howerver,
+ * I'd like to keep this note in here should we happen to run into some related trouble.
+ * rgerhards, 2008-01-28
*/
wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */
-// TODO: what about pure disk queues and bSaveOnShutdown?
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
/* optimize parameters for shutdown of DA-enabled queues */
-//RUNLOG_VAR("%d", pThis->bSaveOnShutdown);
-//RUNLOG_VAR("%d", pThis->bIsDA);
-//RUNLOG_VAR("%d", pThis->iQueueSize);
if(pThis->bIsDA && pThis->iQueueSize > 0 && pThis->bSaveOnShutdown) {
-//RUNLOG;
/* switch to enqueue-only mode so that no more actions happen */
if(pThis->bRunsDA == 0) {
queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
@@ -953,21 +939,17 @@ RUNLOG_VAR("%d", pThis->toQShutdown);
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
}
-RUNLOG;
/* now the primary queue is either empty, persisted to disk - or set to loose messages. So we
* can now request immediate shutdown of any remaining workers. Note that if bSaveOnShutdown was set,
* the queue is now empty. If regular workers are still running, and try to pull the next message,
* they will automatically terminate as there no longer is any message left to process.
*/
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
-RUNLOG_VAR("%d", pThis->iQueueSize);
- //old: if(pThis->iQueueSize > 0) {
if(pThis->iQueueSize > 0) {
timeoutComp(&tTimeout, pThis->toActShutdown);
if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
dbgprintf("Queue 0x%lx: trying immediate shutdown of regular workers\n", queueGetID(pThis));
- // TODO: ??? cut&paste error? iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
dbgprintf("Queue 0x%lx: immediate shutdown timed out on primary queue (this is acceptable and "
@@ -1047,8 +1029,6 @@ RUNLOG_VAR("%d", pThis->iQueueSize);
}
}
-// TODO: think about joining all workers, so that the destructors are called
-//
/* ... finally ... all worker threads have terminated :-)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running.
@@ -1152,7 +1132,7 @@ queueConsumerCancelCleanup(void *arg1, void *arg2)
dbgprintf("Queue 0x%lx: cancelation cleanup handler consumer called (NOT FULLY IMPLEMENTED, one msg lost!)\n",
queueGetID(pThis));
- /* TODO: re-enqueue the data element! */
+ /* TODO: re-enqueue the data element! This will also make the compiler warning go away... */
RETiRet;
}
@@ -1213,7 +1193,7 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
/* dequeue element (still protected from mutex) */
iRet = queueDel(pThis, &pUsr);
- queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
+ queueChkPersist(pThis);
iQueueSize = pThis->iQueueSize; /* cache this for after mutex release */
bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
pWti->pUsrp = pUsr; /* save it for the cancel cleanup handler */
@@ -1316,12 +1296,6 @@ queueChkStopWrkrDA(queue_t *pThis)
bStopWrkr = 1;
} else {
if(pThis->bRunsDA) {
-#if 0
-RUNLOG_VAR("%d", pThis->iQueueSize);
-RUNLOG_VAR("%d", pThis->iHighWtrMrk);
-if(pThis->pqDA != NULL)
- RUNLOG_VAR("%d", pThis->pqDA->bQueueStarted);
-#endif
if(pThis->iQueueSize < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
bStopWrkr = 1;
} else {
@@ -1332,7 +1306,6 @@ if(pThis->pqDA != NULL)
}
}
-//RUNLOG_VAR("%d", bStopWrkr);
ENDfunc
return bStopWrkr;
}
@@ -1364,10 +1337,7 @@ queueIsIdleDA(queue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
BEGINfunc
-RUNLOG_VAR("%d", pThis->iLowWtrMrk);
-dbgprintf("queueIsIdleDA(%p) returns %d, qsize %d\n", pThis, pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk), pThis->iQueueSize);
- //// TODO: I think we need just a single function...
- //return (pThis->iQueueSize == 0);
+ /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
ENDfunc
return (pThis->iQueueSize == 0 || (pThis->bRunsDA && pThis->iQueueSize <= pThis->iLowWtrMrk));
}
@@ -1483,12 +1453,12 @@ dbgprintf("Queue %p: post mutexes, mut %p\n", pThis, pThis->mut);
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "Queue 0x%lx:Reg", (unsigned long) pThis);
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, queueChkStopWrkrReg));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, queueIsIdleReg));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, queueConsumerReg));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, queueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, queueRegOnWrkrStartup));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, queueRegOnWrkrShutdown));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))queueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrStartup));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
@@ -1511,7 +1481,7 @@ RUNLOG;
queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
bInitialized = 1; /* we are done */
} else {
- // TODO: use logerror? -- rgerhards, 2008-01-16
+ /* TODO: use logerror? -- rgerhards, 2008-01-16 */
dbgprintf("Queue 0x%lx: error %d trying to access on-disk queue files, starting without them. "
"Some data may be lost\n", queueGetID(pThis), iRetLocal);
}
@@ -1556,9 +1526,14 @@ static rsRetVal queuePersist(queue_t *pThis)
assert(pThis != NULL);
if(pThis->qType != QUEUETYPE_DISK) {
- if(pThis->iQueueSize > 0)
- ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* TODO: later... */
- else
+ if(pThis->iQueueSize > 0) {
+ /* This error code is OK, but we will probably not implement this any time
+ * The reason is that persistence happens via DA queues. But I would like to
+ * leave the code as is, as we so have a hook in case we need one.
+ * -- rgerhards, 2008-01-28
+ */
+ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED);
+ } else
FINALIZE; /* if the queue is empty, we are happy and done... */
}
@@ -1938,7 +1913,6 @@ finalize_it:
*/
BEGINObjClassInit(queue, 1)
OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
- //OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize);
ENDObjClassInit(queue)
/*