summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/rsyslog_conf.html3
-rw-r--r--queue.c124
-rw-r--r--queue.h6
-rw-r--r--stream.c25
-rw-r--r--stream.h3
-rw-r--r--syslogd.c48
6 files changed, 152 insertions, 57 deletions
diff --git a/doc/rsyslog_conf.html b/doc/rsyslog_conf.html
index ee54c00f..0018ca50 100644
--- a/doc/rsyslog_conf.html
+++ b/doc/rsyslog_conf.html
@@ -64,6 +64,9 @@ development and quite unstable...). So you have been warned ;)</p>
<li>$MainMsgQueueImmediateShutdown [on/<b>off</b>]</li>
<li><a href="rsconf1_mainmsgqueuesize.html">$MainMsgQueueSize</a></li>
<li>$MainMsgQueueMaxFileSize &lt;size_nbr&gt;, default 1m</li>
+ <li>$MainMsgQueueTimeoutActionCompletion &lt;number&gt; [number is timeout in ms (1000ms is 1sec!), default 1000, 0 means immediate!]</li>
+ <li>$MainMsgQueueTimeoutEnqueue &lt;number&gt; [number is timeout in ms (1000ms is 1sec!), default 2000, 0 means indefinite]</li>
+ <li>$MainMsgQueueTimeoutShutdown &lt;number&gt; [number is timeout in ms (1000ms is 1sec!), default 0 (indefinite)]</li>
<li>$MainMsgQueueType [<b>FixedArray</b>/LinkedList/Direct/Disk]</li>
<li>$MainMsgQueueWorkerThreads &lt;number&gt;, num worker threads, default 1,
recommended 1</li>
diff --git a/queue.c b/queue.c
index 07d2ccbf..1b90151b 100644
--- a/queue.c
+++ b/queue.c
@@ -53,6 +53,25 @@ rsRetVal queueChkPersist(queue_t *pThis);
/* methods */
+/* compute an absolute time timeout suitable for calls to pthread_cond_timedwait()
+ * rgerhards, 2008-01-14
+ */
+static rsRetVal
+queueTimeoutComp(struct timespec *pt, int iTimeout)
+{
+ assert(pt != NULL);
+ /* compute timeout */
+ clock_gettime(CLOCK_REALTIME, pt);
+ pt->tv_nsec += (iTimeout % 1000) * 1000000; /* think INTEGER arithmetic! */
+ if(pt->tv_nsec > 999999999) { /* overrun? */
+ pt->tv_nsec -= 1000000000;
+ ++pt->tv_sec;
+ }
+ pt->tv_sec += iTimeout / 1000;
+ return RS_RET_OK; /* so far, this is static... */
+}
+
+
/* first, we define type-specific handlers. The provide a generic functionality,
* but for this specific type of queue. The mapping to these handlers happens during
* queue construction. Later on, handlers are called by pointers present in the
@@ -206,7 +225,7 @@ queueTryLoadPersistedInfo(queue_t *pThis)
strm_t *psQIF = NULL;
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
- AsPropBagstruct stat stat_buf;
+ struct stat stat_buf;
}
#endif
@@ -406,7 +425,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr)
iRetLocal = pThis->pConsumer(pUsr);
if(iRetLocal != RS_RET_OK)
dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n",
- (unsigned long) pThis, iRetLocal);
+ queueGetID(pThis), iRetLocal);
--pThis->iQueueSize; /* this is kind of a hack, but its the smartest thing we can do given
* the somewhat astonishing fact that this queue type does not actually
* queue anything ;)
@@ -436,7 +455,7 @@ queueAdd(queue_t *pThis, void *pUsr)
++pThis->iQueueSize;
- dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", (unsigned long) pThis, pThis->iQueueSize);
+ dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", queueGetID(pThis), pThis->iQueueSize);
finalize_it:
return iRet;
@@ -460,7 +479,7 @@ queueDel(queue_t *pThis, void *pUsr)
--pThis->iQueueSize;
dbgprintf("Queue 0x%lx: entry deleted, state %d, size now %d entries\n",
- (unsigned long) pThis, iRet, pThis->iQueueSize);
+ queueGetID(pThis), iRet, pThis->iQueueSize);
return iRet;
}
@@ -471,7 +490,7 @@ queueDel(queue_t *pThis, void *pUsr)
* rgerhards, 2008-01-14
*/
static rsRetVal
-queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int iTimeout)
+queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
{
DEFiRet;
int i;
@@ -486,18 +505,18 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int iTimeout)
/* awake them... */
pthread_cond_broadcast(pThis->notEmpty);
- /* and wait for their termination */
- clock_gettime(CLOCK_REALTIME, &t); /* set the timeout */
- t.tv_sec += iTimeout; /* TODO: can we just add to the seconds? - check */
+ /* get timeout */
+ queueTimeoutComp(&t, iTimeout);
+ /* and wait for their termination */
pthread_mutex_lock(pThis->mut);
bTimedOut = 0;
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
- dbgprintf("Queue 0x%lx: waiting on worker thread termination, %d still running\n",
- (unsigned long) pThis, pThis->iCurNumWrkThrd);
+ dbgprintf("Queue 0x%lx: waiting %ld ms on worker thread termination, %d still running\n",
+ queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd);
if(pthread_cond_timedwait(&pThis->condThrdTrm, pThis->mut, &t) != 0) {
- dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", (unsigned long) pThis);
+ dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", queueGetID(pThis));
bTimedOut = 1; /* we exit the loop on timeout */
}
}
@@ -521,10 +540,13 @@ queueWrkThrdCancel(queue_t *pThis)
// TODO: we need to implement peek(), without it (today!) we lose one message upon
// worker cancellation! -- rgerhards, 2008-01-14
+ /* awake the workers one more time, just to be sure */
+ pthread_cond_broadcast(pThis->notEmpty);
+
/* first tell the workers our request */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED) {
- dbgprintf("Queue 0x%lx: canceling worker thread %d\n", (unsigned long) pThis, i);
+ dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i);
pthread_cancel(pThis->pWrkThrds[i].thrdID);
}
@@ -544,11 +566,21 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis);
- iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, 2); // TODO: timeout configurable!
+ /* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This
+ * is necessary so that all threads get sent the termination command. With a timeout of 0, however,
+ * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as
+ * good.
+ */
+ iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, pThis->toQShutdown);
if(iRet == RS_RET_TIMED_OUT) {
- /* OK, we now need to try force the shutdown */
- dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", (unsigned long) pThis);
- iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 4); // TODO: timeout configurable!
+ if(pThis->toQShutdown == 0) {
+ iRet = RS_RET_OK;
+ } else {
+ /* OK, we now need to try force the shutdown */
+ dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n",
+ queueGetID(pThis));
+ iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown);
+ }
}
if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */
@@ -559,9 +591,12 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
/* finally join the threads
* In case of a cancellation, this may actually take some time. This is also
- * needed to clean up the thread descriptors, even with a regular termination
+ * needed to clean up the thread descriptors, even with a regular termination.
+ * And, most importantly, this is needed if we have an indifitite termination
+ * time set (timeout == 0)! -- rgerhards, 2008-01-14
*/
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+dbgprintf("join thred %d\n", i);
pthread_join(pThis->pWrkThrds[i].thrdID, NULL);
}
@@ -573,7 +608,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
}
dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n",
- (unsigned long) pThis, pThis->iQueueSize);
+ queueGetID(pThis), pThis->iQueueSize);
return iRet;
}
@@ -591,6 +626,7 @@ queueWorker(void *arg)
void *pUsr;
sigset_t sigSet;
int iMyThrdIndx; /* index for this thread in queue thread table */
+ int iCancelStateSave;
assert(pThis != NULL);
@@ -603,28 +639,37 @@ queueWorker(void *arg)
break;
assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self());
- dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", (unsigned long) pThis, iMyThrdIndx);
+ dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx);
/* tell the world there is one more worker */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
pThis->iCurNumWrkThrd++;
pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
/* now we have our identity, on to real processing */
while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN
|| (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) {
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) {
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
- (unsigned long) pThis, iMyThrdIndx);
- pthread_cond_wait (pThis->notEmpty, pThis->mut);
+ queueGetID(pThis), iMyThrdIndx);
+ pthread_cond_wait(pThis->notEmpty, pThis->mut);
}
if(pThis->iQueueSize > 0) {
/* dequeue element (still protected from mutex) */
iRet = queueDel(pThis, &pUsr);
queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
pthread_mutex_unlock(pThis->mut);
- pthread_cond_signal (pThis->notFull);
+ pthread_cond_signal(pThis->notFull);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
+ * a cancellation point in itself. As we run most of the time without cancel enabled, I fear
+ * we may never get cancelled if we do not create a cancellation point ourselfs.
+ */
+ pthread_testcancel();
/* do actual processing (the lengthy part, runs in parallel)
* If we had a problem while dequeing, we do not call the consumer,
* but we otherwise ignore it. This is in the hopes that it will be
@@ -634,19 +679,20 @@ queueWorker(void *arg)
if(iRet == RS_RET_OK) {
rsRetVal iRetLocal;
dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n",
- (unsigned long) pThis, iMyThrdIndx);
+ queueGetID(pThis), iMyThrdIndx);
iRetLocal = pThis->pConsumer(pUsr);
dbgprintf("Queue 0x%lx/w%d: worker: consumer returnd %d\n",
- (unsigned long) pThis, iMyThrdIndx, iRetLocal);
+ queueGetID(pThis), iMyThrdIndx, iRetLocal);
if(iRetLocal != RS_RET_OK)
dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n",
(unsigned long) pThis, iMyThrdIndx, iRetLocal);
} else {
dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things "
- "may happen\n", (unsigned long) pThis, iMyThrdIndx, iRet);
+ "may happen\n", queueGetID(pThis), iMyThrdIndx, iRet);
}
} else { /* the mutex must be unlocked in any case (important for termination) */
pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
}
/* We now yield to give the other threads a chance to obtain the mutex. If we do not
@@ -666,17 +712,19 @@ queueWorker(void *arg)
if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN) && pThis->iQueueSize > 0)
dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has "
- " %d messages to process.\n", (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize);
+ " %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
}
/* indicate termination */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
pThis->iCurNumWrkThrd--;
pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED;
pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n",
- (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd);
+ queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd);
pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
pthread_exit(0);
}
@@ -772,7 +820,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis));
- dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", (unsigned long) pThis, pThis->qType,
+ dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", queueGetID(pThis), pThis->qType,
pThis->iMaxFileSize);
if(pThis->qType != QUEUETYPE_DIRECT) {
@@ -819,7 +867,7 @@ finalize_it:
static rsRetVal queuePersist(queue_t *pThis)
{
DEFiRet;
- strm_t *psQIF = NULL;; /* Queue Info File */
+ strm_t *psQIF = NULL; /* Queue Info File */
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
@@ -850,6 +898,7 @@ static rsRetVal queuePersist(queue_t *pThis)
CHKiRet(strmConstruct(&psQIF));
CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE));
+ CHKiRet(strmSetiAddtlOpenFlags(psQIF, O_TRUNC));
CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE));
CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam));
CHKiRet(strmConstructFinalize(psQIF));
@@ -1017,10 +1066,7 @@ queueEnqObj(queue_t *pThis, void *pUsr)
while(pThis->iQueueSize >= pThis->iMaxQueueSize) {
dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", (unsigned long) pThis);
-
- clock_gettime (CLOCK_REALTIME, &t);
- t.tv_sec += 2; /* TODO: configurable! */
-
+ queueTimeoutComp(&t, pThis->toEnq);
if(pthread_cond_timedwait (pThis->notFull,
pThis->mut, &t) != 0) {
dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", (unsigned long) pThis);
@@ -1047,15 +1093,9 @@ finalize_it:
/* some simple object access methods */
DEFpropSetMeth(queue, bImmediateShutdown, int);
DEFpropSetMeth(queue, iPersistUpdCnt, int);
-#if 0
-rsRetVal queueSetiPersistUpdCnt(queue_t *pThis, int pVal)
-{
- dbgprintf("queueSetiPersistUpdCnt(), val %d\n", pVal);
- pThis->iPersistUpdCnt = pVal;
-dbgprintf("queSetiPersist..(): PersUpdCnt %d, UpdsSincePers %d\n", pThis->iPersistUpdCnt, pThis->iUpdsSincePersist);
- return RS_RET_OK;
-}
-#endif
+DEFpropSetMeth(queue, toQShutdown, long);
+DEFpropSetMeth(queue, toActShutdown, long);
+DEFpropSetMeth(queue, toEnq, long);
/* This function can be used as a generic way to set properties. Only the subset
diff --git a/queue.h b/queue.h
index ee38d725..cc5243ad 100644
--- a/queue.h
+++ b/queue.h
@@ -85,6 +85,9 @@ typedef struct queue_s {
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
int bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */
+ int toQShutdown; /* timeout for regular queue shutdown in ms */
+ int toActShutdown; /* timeout for long-running action shutdown in ms */
+ int toEnq; /* enqueue timeout */
rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
@@ -136,6 +139,9 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
PROTOTYPEObjClassInit(queue);
PROTOTYPEpropSetMeth(queue, bImmediateShutdown, int);
PROTOTYPEpropSetMeth(queue, iPersistUpdCnt, int);
+PROTOTYPEpropSetMeth(queue, toQShutdown, long);
+PROTOTYPEpropSetMeth(queue, toActShutdown, long);
+PROTOTYPEpropSetMeth(queue, toEnq, long);
#define queueGetID(pThis) ((unsigned long) pThis)
#endif /* #ifndef QUEUE_H_INCLUDED */
diff --git a/stream.c b/stream.c
index 811012ae..a641637b 100644
--- a/stream.c
+++ b/stream.c
@@ -87,9 +87,10 @@ dbgprintf("strmOpenFile actual open %p, iFileNumDigits: %d\n", pThis, pThis->iFi
if(pThis->tOperationsMode == STREAMMODE_READ)
iFlags = O_RDONLY;
else
- //iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND;
iFlags = O_WRONLY | O_CREAT;
+ iFlags |= pThis->iAddtlOpenFlags;
+
pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode);
if(pThis->fd == -1) {
int ierrnoSave = errno;
@@ -388,8 +389,8 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
CHKiRet(strmOpenFile(pThis));
iWritten = write(pThis->fd, pBuf, lenBuf);
- dbgprintf("Stream 0x%lx: write wrote %d bytes to file %d, errno: %d, err %s\n", (unsigned long) pThis,
- iWritten, pThis->fd, errno, strerror(errno));
+ dbgprintf("Stream 0x%lx: write wrote %d bytes to file %d, errno: %d\n", (unsigned long) pThis,
+ iWritten, pThis->fd, errno);
/* TODO: handle error case -- rgerhards, 2008-01-07 */
/* Now indicate buffer empty again. We do this in any case, because there
@@ -564,10 +565,22 @@ rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal)
{
pThis->iMaxFiles = iNewVal;
pThis->iFileNumDigits = getNumberDigits(iNewVal);
-dbgprintf("strmSetiMaxFiles %p val %d, digits %d\n", pThis, iNewVal, pThis->iFileNumDigits);
return RS_RET_OK;
}
+rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal)
+{
+ DEFiRet;
+
+ if(iNewVal & O_APPEND)
+ ABORT_FINALIZE(RS_RET_PARAM_ERROR);
+
+ pThis->iAddtlOpenFlags = iNewVal;
+
+finalize_it:
+ return iRet;
+}
+
/* set the stream's file prefix
* The passed-in string is duplicated. So if the caller does not need
@@ -704,10 +717,6 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
l = (long) pThis->iCurrOffs;
objSerializeSCALAR_VAR(pStrm, iCurrOffs, LONG, l);
- // TODO: really serialize?
- //l = (long) pThis->iMaxFileSize;
- //objSerializeSCALAR_VAR(pStrm, iMaxFileSize, LONG, l);
-
CHKiRet(objEndSerialize(pStrm));
finalize_it:
diff --git a/stream.h b/stream.h
index 403eefaa..34eb78e2 100644
--- a/stream.h
+++ b/stream.h
@@ -69,6 +69,7 @@ typedef struct strm_s {
int lenFName;
strmMode_t tOperationsMode;
mode_t tOpenMode;
+ int iAddtlOpenFlags; /* can be used to specifiy additional (compatible!) open flags */
size_t iMaxFileSize;/* maximum size a file may grow to */
int iMaxFiles; /* maximum number of files if a circular mode is in use */
int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */
@@ -95,7 +96,6 @@ rsRetVal strmSetMaxFileSize(strm_t *pThis, size_t iMaxFileSize);
rsRetVal strmSetFileName(strm_t *pThis, uchar *pszName, size_t iLenName);
rsRetVal strmReadChar(strm_t *pThis, uchar *pC);
rsRetVal strmUnreadChar(strm_t *pThis, uchar c);
-//rsRetVal strmSeek(strm_t *pThis, off_t offs);
rsRetVal strmSeekCurrOffs(strm_t *pThis);
rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
rsRetVal strmWriteChar(strm_t *pThis, uchar c);
@@ -106,6 +106,7 @@ rsRetVal strmFlush(strm_t *pThis);
rsRetVal strmRecordBegin(strm_t *pThis);
rsRetVal strmRecordEnd(strm_t *pThis);
rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm);
+rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal);
PROTOTYPEObjClassInit(strm);
PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int);
PROTOTYPEpropSetMeth(strm, iMaxFileSize, int);
diff --git a/syslogd.c b/syslogd.c
index ac7170e8..aec1a486 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -409,8 +409,10 @@ static int iMainMsgQueueNumWorkers = 1; /* number of worker threads for the m
static queueType_t MainMsgQueType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */
static uchar *pszMainMsgQFName = NULL; /* prefix for the main message queue file */
static size_t iMainMsgQueMaxFileSize = 1024*1024;
-static int bMainMsgQImmediateShutdown = 0; /* shut down the queue immediately? */
static int iMainMsgQPersistUpdCnt = 0; /* persist queue info every n updates */
+static int iMainMsgQtoQShutdown = 0; /* queue shutdown */
+static int iMainMsgQtoActShutdown = 1000; /* action shutdown (in phase 2) */
+static int iMainMsgQtoEnq = 2000; /* timeout for queue enque */
/* This structure represents the files that will have log
@@ -514,8 +516,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
iMainMsgQueueSize = 10000;
iMainMsgQueMaxFileSize = 1024 * 1024;
iMainMsgQueueNumWorkers = 1;
- bMainMsgQImmediateShutdown = 0;
iMainMsgQPersistUpdCnt = 0;
+ iMainMsgQtoQShutdown = 0;
+ iMainMsgQtoActShutdown = 1000;
+ iMainMsgQtoEnq = 2000;
MainMsgQueType = QUEUETYPE_FIXED_ARRAY;
return RS_RET_OK;
@@ -1649,12 +1653,23 @@ int shouldProcessThisMessage(selector_t *f, msg_t *pMsg)
}
+/* cancellation cleanup handler - frees the action mutex
+ * rgerhards, 2008-01-14
+ */
+static void callActionMutClean(void *arg)
+{
+ assert(arg != NULL);
+ pthread_mutex_unlock((pthread_mutex_t*) arg);
+}
+
+
/* call the configured action. Does all necessary housekeeping.
* rgerhards, 2007-08-01
*/
static rsRetVal callAction(msg_t *pMsg, action_t *pAction)
{
DEFiRet;
+ int iCancelStateSave;
assert(pMsg != NULL);
assert(pAction != NULL);
@@ -1664,7 +1679,10 @@ static rsRetVal callAction(msg_t *pMsg, action_t *pAction)
* become important when we (possibly) have multiple worker threads.
* rgerhards, 2007-12-11
*/
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
LockObj(pAction);
+ pthread_cleanup_push(callActionMutClean, pAction->Sync_mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
/* first, we need to check if this is a disabled
* entry. If so, we must not further process it.
@@ -1730,7 +1748,10 @@ static rsRetVal callAction(msg_t *pMsg, action_t *pAction)
}
finalize_it:
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
UnlockObj(pAction);
+ pthread_cleanup_pop(0); /* remove mutex cleanup handler */
+ pthread_setcancelstate(iCancelStateSave, NULL);
return iRet;
}
@@ -3098,8 +3119,10 @@ static void dbgPrintInitInfo(void)
cCCEscapeChar);
dbgprintf("Main queue size %d messages.\n", iMainMsgQueueSize);
- dbgprintf("Main queue worker threads: %d, ImmediateShutdown: %d, Perists every %d updates.\n",
- iMainMsgQueueNumWorkers, bMainMsgQImmediateShutdown, iMainMsgQPersistUpdCnt);
+ dbgprintf("Main queue worker threads: %d, Perists every %d updates.\n",
+ iMainMsgQueueNumWorkers, iMainMsgQPersistUpdCnt);
+ dbgprintf("Main queue timeouts: shutdown: %d, action completion shutdown: %d, enq: %d\n",
+ iMainMsgQtoQShutdown, iMainMsgQtoActShutdown, iMainMsgQtoEnq);
dbgprintf("Work Directory: '%s'.\n", pszWorkDir);
}
@@ -3354,11 +3377,13 @@ init(void)
logerrorInt("Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
}
- setQPROP(queueSetbImmediateShutdown, "$MainMsgQueueImmediateShutdown", bMainMsgQImmediateShutdown);
setQPROP(queueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize);
setQPROPstr(queueSetFilePrefix, "$MainMsgQueueFileName",
(pszMainMsgQFName == NULL ? (uchar*) "mainq" : pszMainMsgQFName));
setQPROP(queueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt);
+ setQPROP(queueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown );
+ setQPROP(queueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown);
+ setQPROP(queueSettoEnq, "$MainMsgQueueTimeoutEnqueue", iMainMsgQtoEnq);
# undef setQPROP
# undef setQPROPstr
@@ -4267,6 +4292,7 @@ extern void dbgprintf(char *fmt, ...) __attribute__((format(printf,1, 2)));
void
dbgprintf(char *fmt, ...)
{
+ static pthread_t ptLastThrdID = 0;
static int bWasNL = FALSE;
va_list ap;
@@ -4283,6 +4309,14 @@ dbgprintf(char *fmt, ...)
* pretty well.
* rgerhards, 2007-06-15
*/
+ if(ptLastThrdID != pthread_self()) {
+ if(!bWasNL) {
+ fprintf(stdout, "\n");
+ bWasNL = 1;
+ }
+ ptLastThrdID = pthread_self();
+ }
+
if(bWasNL) {
fprintf(stdout, "%8.8x: ", (unsigned int) pthread_self());
//fprintf(stderr, "%8.8x: ", (unsigned int) pthread_self());
@@ -4517,10 +4551,12 @@ static rsRetVal loadBuildInModules(void)
CHKiRet(regCfSysLineHdlr((uchar *)"workdirectory", 0, eCmdHdlrGetWord, NULL, &pszWorkDir, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszMainMsgQFName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesize", 0, eCmdHdlrInt, NULL, &iMainMsgQueueSize, NULL));
- CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueimmediateshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQImmediateShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iMainMsgQPersistUpdCnt, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetype", 0, eCmdHdlrGetWord, setMainMsgQueType, NULL, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iMainMsgQueueNumWorkers, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoQShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iMainMsgQtoActShutdown, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iMainMsgQtoEnq, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL));