summaryrefslogtreecommitdiffstats
path: root/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-14 11:55:24 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-14 11:55:24 +0000
commit77a338e180fd51811041363f615760a14a2dc889 (patch)
tree8edab9c877db4c25e6291d68f833b5bd4e6feb57 /queue.c
parentdd36718bd11c85af49546ab589fa42bf512075ce (diff)
downloadrsyslog-77a338e180fd51811041363f615760a14a2dc889.tar.gz
rsyslog-77a338e180fd51811041363f615760a14a2dc889.tar.xz
rsyslog-77a338e180fd51811041363f615760a14a2dc889.zip
- implemented $MainMsgQueueTimeoutActionCompletion config directive
- implemented $MainMsgQueueTimeoutEnqueue config directive - implemented $MainMsgQueueTimeoutShutdown config directive - some cleanup
Diffstat (limited to 'queue.c')
-rw-r--r--queue.c124
1 files changed, 82 insertions, 42 deletions
diff --git a/queue.c b/queue.c
index 07d2ccbf..1b90151b 100644
--- a/queue.c
+++ b/queue.c
@@ -53,6 +53,25 @@ rsRetVal queueChkPersist(queue_t *pThis);
/* methods */
+/* compute an absolute time timeout suitable for calls to pthread_cond_timedwait()
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueTimeoutComp(struct timespec *pt, int iTimeout)
+{
+ assert(pt != NULL);
+ /* compute timeout */
+ clock_gettime(CLOCK_REALTIME, pt);
+ pt->tv_nsec += (iTimeout % 1000) * 1000000; /* think INTEGER arithmetic! */
+ if(pt->tv_nsec > 999999999) { /* overrun? */
+ pt->tv_nsec -= 1000000000;
+ ++pt->tv_sec;
+ }
+ pt->tv_sec += iTimeout / 1000;
+ return RS_RET_OK; /* so far, this is static... */
+}
+
+
/* first, we define type-specific handlers. The provide a generic functionality,
* but for this specific type of queue. The mapping to these handlers happens during
* queue construction. Later on, handlers are called by pointers present in the
@@ -206,7 +225,7 @@ queueTryLoadPersistedInfo(queue_t *pThis)
strm_t *psQIF = NULL;
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
- AsPropBagstruct stat stat_buf;
+ struct stat stat_buf;
}
#endif
@@ -406,7 +425,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
iRetLocal = pThis->pConsumer(pUsr);
if(iRetLocal != RS_RET_OK)
dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n",
- (unsigned long) pThis, iRetLocal);
+ queueGetID(pThis), iRetLocal);
--pThis->iQueueSize; /* this is kind of a hack, but its the smartest thing we can do given
* the somewhat astonishing fact that this queue type does not actually
* queue anything ;)
@@ -436,7 +455,7 @@ queueAdd(queue_t *pThis, void *pUsr)
++pThis->iQueueSize;
- dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", (unsigned long) pThis, pThis->iQueueSize);
+ dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", queueGetID(pThis), pThis->iQueueSize);
finalize_it:
return iRet;
@@ -460,7 +479,7 @@ queueDel(queue_t *pThis, void *pUsr)
--pThis->iQueueSize;
dbgprintf("Queue 0x%lx: entry deleted, state %d, size now %d entries\n",
- (unsigned long) pThis, iRet, pThis->iQueueSize);
+ queueGetID(pThis), iRet, pThis->iQueueSize);
return iRet;
}
@@ -471,7 +490,7 @@ queueDel(queue_t *pThis, void *pUsr)
* rgerhards, 2008-01-14
*/
static rsRetVal
-queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int iTimeout)
+queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
{
DEFiRet;
int i;
@@ -486,18 +505,18 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int iTimeout)
/* awake them... */
pthread_cond_broadcast(pThis->notEmpty);
- /* and wait for their termination */
- clock_gettime(CLOCK_REALTIME, &t); /* set the timeout */
- t.tv_sec += iTimeout; /* TODO: can we just add to the seconds? - check */
+ /* get timeout */
+ queueTimeoutComp(&t, iTimeout);
+ /* and wait for their termination */
pthread_mutex_lock(pThis->mut);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
- dbgprintf("Queue 0x%lx: waiting on worker thread termination, %d still running\n",
- (unsigned long) pThis, pThis->iCurNumWrkThrd);
+ dbgprintf("Queue 0x%lx: waiting %ld ms on worker thread termination, %d still running\n",
+ queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd);
if(pthread_cond_timedwait(&pThis->condThrdTrm, pThis->mut, &t) != 0) {
- dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", (unsigned long) pThis);
+ dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", queueGetID(pThis));
bTimedOut = 1; /* we exit the loop on timeout */
}
}
@@ -521,10 +540,13 @@ queueWrkThrdCancel(queue_t *pThis)
// TODO: we need to implement peek(), without it (today!) we lose one message upon
// worker cancellation! -- rgerhards, 2008-01-14
+ /* awake the workers one more time, just to be sure */
+ pthread_cond_broadcast(pThis->notEmpty);
+
/* first tell the workers our request */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED) {
- dbgprintf("Queue 0x%lx: canceling worker thread %d\n", (unsigned long) pThis, i);
+ dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i);
pthread_cancel(pThis->pWrkThrds[i].thrdID);
}
@@ -544,11 +566,21 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis);
- iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, 2); // TODO: timeout configurable!
+ /* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This
+ * is necessary so that all threads get sent the termination command. With a timeout of 0, however,
+ * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as
+ * good.
+ */
+ iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, pThis->toQShutdown);
if(iRet == RS_RET_TIMED_OUT) {
- /* OK, we now need to try force the shutdown */
- dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", (unsigned long) pThis);
- iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 4); // TODO: timeout configurable!
+ if(pThis->toQShutdown == 0) {
+ iRet = RS_RET_OK;
+ } else {
+ /* OK, we now need to try force the shutdown */
+ dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n",
+ queueGetID(pThis));
+ iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown);
+ }
}
if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */
@@ -559,9 +591,12 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* finally join the threads
* In case of a cancellation, this may actually take some time. This is also
- * needed to clean up the thread descriptors, even with a regular termination
+ * needed to clean up the thread descriptors, even with a regular termination.
+ * And, most importantly, this is needed if we have an indifitite termination
+ * time set (timeout == 0)! -- rgerhards, 2008-01-14
*/
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+dbgprintf("join thred %d\n", i);
pthread_join(pThis->pWrkThrds[i].thrdID, NULL);
}
@@ -573,7 +608,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
}
dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
- (unsigned long) pThis, pThis->iQueueSize);
+ queueGetID(pThis), pThis->iQueueSize);
return iRet;
}
@@ -591,6 +626,7 @@ queueWorker(void *arg)
void *pUsr;
sigset_t sigSet;
int iMyThrdIndx; /* index for this thread in queue thread table */
+ int iCancelStateSave;
assert(pThis != NULL);
@@ -603,28 +639,37 @@ queueWorker(void *arg)
break;
assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self());
- dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", (unsigned long) pThis, iMyThrdIndx);
+ dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx);
/* tell the world there is one more worker */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
pThis->iCurNumWrkThrd++;
pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
/* now we have our identity, on to real processing */
while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN
|| (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) {
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
- (unsigned long) pThis, iMyThrdIndx);
- pthread_cond_wait (pThis->notEmpty, pThis->mut);
+ queueGetID(pThis), iMyThrdIndx);
+ pthread_cond_wait(pThis->notEmpty, pThis->mut);
}
if(pThis->iQueueSize > 0) {
/* dequeue element (still protected from mutex) */
iRet = queueDel(pThis, &pUsr);
queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
pthread_mutex_unlock(pThis->mut);
- pthread_cond_signal (pThis->notFull);
+ pthread_cond_signal(pThis->notFull);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
+ * a cancellation point in itself. As we run most of the time without cancel enabled, I fear
+ * we may never get cancelled if we do not create a cancellation point ourselfs.
+ */
+ pthread_testcancel();
/* do actual processing (the lengthy part, runs in parallel)
* If we had a problem while dequeing, we do not call the consumer,
* but we otherwise ignore it. This is in the hopes that it will be
@@ -634,19 +679,20 @@ queueWorker(void *arg)
if(iRet == RS_RET_OK) {
rsRetVal iRetLocal;
dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n",
- (unsigned long) pThis, iMyThrdIndx);
+ queueGetID(pThis), iMyThrdIndx);
iRetLocal = pThis->pConsumer(pUsr);
dbgprintf("Queue 0x%lx/w%d: worker: consumer returnd %d\n",
- (unsigned long) pThis, iMyThrdIndx, iRetLocal);
+ queueGetID(pThis), iMyThrdIndx, iRetLocal);
if(iRetLocal != RS_RET_OK)
dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n",
(unsigned long) pThis, iMyThrdIndx, iRetLocal);
} else {
dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things "
- "may happen\n", (unsigned long) pThis, iMyThrdIndx, iRet);
+ "may happen\n", queueGetID(pThis), iMyThrdIndx, iRet);
}
} else { /* the mutex must be unlocked in any case (important for termination) */
pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
}
/* We now yield to give the other threads a chance to obtain the mutex. If we do not
@@ -666,17 +712,19 @@ queueWorker(void *arg)
if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN) && pThis->iQueueSize > 0)
dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has "
- " %d messages to process.\n", (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize);
+ " %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
}
/* indicate termination */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
pThis->iCurNumWrkThrd--;
pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED;
pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n",
- (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd);
+ queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd);
pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
pthread_exit(0);
}
@@ -772,7 +820,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis));
- dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", (unsigned long) pThis, pThis->qType,
+ dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", queueGetID(pThis), pThis->qType,
pThis->iMaxFileSize);
if(pThis->qType != QUEUETYPE_DIRECT) {
@@ -819,7 +867,7 @@ finalize_it:
static rsRetVal queuePersist(queue_t *pThis)
{
DEFiRet;
- strm_t *psQIF = NULL;; /* Queue Info File */
+ strm_t *psQIF = NULL; /* Queue Info File */
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
@@ -850,6 +898,7 @@ static rsRetVal queuePersist(queue_t *pThis)
CHKiRet(strmConstruct(&psQIF));
CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE));
+ CHKiRet(strmSetiAddtlOpenFlags(psQIF, O_TRUNC));
CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE));
CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam));
CHKiRet(strmConstructFinalize(psQIF));
@@ -1017,10 +1066,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
while(pThis->iQueueSize >= pThis->iMaxQueueSize) {
dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", (unsigned long) pThis);
-
- clock_gettime (CLOCK_REALTIME, &t);
- t.tv_sec += 2; /* TODO: configurable! */
-
+ queueTimeoutComp(&t, pThis->toEnq);
if(pthread_cond_timedwait (pThis->notFull,
pThis->mut, &t) != 0) {
dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", (unsigned long) pThis);
@@ -1047,15 +1093,9 @@ finalize_it:
/* some simple object access methods */
DEFpropSetMeth(queue, bImmediateShutdown, int);
DEFpropSetMeth(queue, iPersistUpdCnt, int);
-#if 0
-rsRetVal queueSetiPersistUpdCnt(queue_t *pThis, int pVal)
-{
- dbgprintf("queueSetiPersistUpdCnt(), val %d\n", pVal);
- pThis->iPersistUpdCnt = pVal;
-dbgprintf("queSetiPersist..(): PersUpdCnt %d, UpdsSincePers %d\n", pThis->iPersistUpdCnt, pThis->iUpdsSincePersist);
- return RS_RET_OK;
-}
-#endif
+DEFpropSetMeth(queue, toQShutdown, long);
+DEFpropSetMeth(queue, toActShutdown, long);
+DEFpropSetMeth(queue, toEnq, long);
/* This function can be used as a generic way to set properties. Only the subset