diff options
-rw-r--r-- | doc/rsyslog_conf.html | 3 | ||||
-rw-r--r-- | queue.c | 124 | ||||
-rw-r--r-- | queue.h | 6 | ||||
-rw-r--r-- | stream.c | 25 | ||||
-rw-r--r-- | stream.h | 3 | ||||
-rw-r--r-- | syslogd.c | 48 |
6 files changed, 152 insertions, 57 deletions
diff --git a/doc/rsyslog_conf.html b/doc/rsyslog_conf.html index ee54c00f..0018ca50 100644 --- a/doc/rsyslog_conf.html +++ b/doc/rsyslog_conf.html @@ -64,6 +64,9 @@ development and quite unstable...). So you have been warned ;)</p> <li>$MainMsgQueueImmediateShutdown [on/<b>off</b>]</li> <li><a href="rsconf1_mainmsgqueuesize.html">$MainMsgQueueSize</a></li> <li>$MainMsgQueueMaxFileSize <size_nbr>, default 1m</li> + <li>$MainMsgQueueTimeoutActionCompletion <number> [number is timeout in ms (1000ms is 1sec!), default 1000, 0 means immediate!]</li> + <li>$MainMsgQueueTimeoutEnqueue <number> [number is timeout in ms (1000ms is 1sec!), default 2000, 0 means indefinite]</li> + <li>$MainMsgQueueTimeoutShutdown <number> [number is timeout in ms (1000ms is 1sec!), default 0 (indefinite)]</li> <li>$MainMsgQueueType [<b>FixedArray</b>/LinkedList/Direct/Disk]</li> <li>$MainMsgQueueWorkerThreads <number>, num worker threads, default 1, recommended 1</li> @@ -53,6 +53,25 @@ rsRetVal queueChkPersist(queue_t *pThis); /* methods */ +/* compute an absolute time timeout suitable for calls to pthread_cond_timedwait() + * rgerhards, 2008-01-14 + */ +static rsRetVal +queueTimeoutComp(struct timespec *pt, int iTimeout) +{ + assert(pt != NULL); + /* compute timeout */ + clock_gettime(CLOCK_REALTIME, pt); + pt->tv_nsec += (iTimeout % 1000) * 1000000; /* think INTEGER arithmetic! */ + if(pt->tv_nsec > 999999999) { /* overrun? */ + pt->tv_nsec -= 1000000000; + ++pt->tv_sec; + } + pt->tv_sec += iTimeout / 1000; + return RS_RET_OK; /* so far, this is static... */ +} + + /* first, we define type-specific handlers. The provide a generic functionality, * but for this specific type of queue. The mapping to these handlers happens during * queue construction. Later on, handlers are called by pointers present in the @@ -206,7 +225,7 @@ queueTryLoadPersistedInfo(queue_t *pThis) strm_t *psQIF = NULL; uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; - AsPropBagstruct stat stat_buf; + struct stat stat_buf; } #endif @@ -406,7 +425,7 @@ static rsRetVal qAddDirect(queue_t *pThis, void* pUsr) iRetLocal = pThis->pConsumer(pUsr); if(iRetLocal != RS_RET_OK) dbgprintf("Queue 0x%lx: Consumer returned iRet %d\n", - (unsigned long) pThis, iRetLocal); + queueGetID(pThis), iRetLocal); --pThis->iQueueSize; /* this is kind of a hack, but its the smartest thing we can do given * the somewhat astonishing fact that this queue type does not actually * queue anything ;) @@ -436,7 +455,7 @@ queueAdd(queue_t *pThis, void *pUsr) ++pThis->iQueueSize; - dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", (unsigned long) pThis, pThis->iQueueSize); + dbgprintf("Queue 0x%lx: entry added, size now %d entries\n", queueGetID(pThis), pThis->iQueueSize); finalize_it: return iRet; @@ -460,7 +479,7 @@ queueDel(queue_t *pThis, void *pUsr) --pThis->iQueueSize; dbgprintf("Queue 0x%lx: entry deleted, state %d, size now %d entries\n", - (unsigned long) pThis, iRet, pThis->iQueueSize); + queueGetID(pThis), iRet, pThis->iQueueSize); return iRet; } @@ -471,7 +490,7 @@ queueDel(queue_t *pThis, void *pUsr) * rgerhards, 2008-01-14 */ static rsRetVal -queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int iTimeout) +queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) { DEFiRet; int i; @@ -486,18 +505,18 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int iTimeout) /* awake them... */ pthread_cond_broadcast(pThis->notEmpty); - /* and wait for their termination */ - clock_gettime(CLOCK_REALTIME, &t); /* set the timeout */ - t.tv_sec += iTimeout; /* TODO: can we just add to the seconds? - check */ + /* get timeout */ + queueTimeoutComp(&t, iTimeout); + /* and wait for their termination */ pthread_mutex_lock(pThis->mut); bTimedOut = 0; while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { - dbgprintf("Queue 0x%lx: waiting on worker thread termination, %d still running\n", - (unsigned long) pThis, pThis->iCurNumWrkThrd); + dbgprintf("Queue 0x%lx: waiting %ld ms on worker thread termination, %d still running\n", + queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd); if(pthread_cond_timedwait(&pThis->condThrdTrm, pThis->mut, &t) != 0) { - dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", (unsigned long) pThis); + dbgprintf("Queue 0x%lx: timeout waiting on worker thread termination\n", queueGetID(pThis)); bTimedOut = 1; /* we exit the loop on timeout */ } } @@ -521,10 +540,13 @@ queueWrkThrdCancel(queue_t *pThis) // TODO: we need to implement peek(), without it (today!) we lose one message upon // worker cancellation! -- rgerhards, 2008-01-14 + /* awake the workers one more time, just to be sure */ + pthread_cond_broadcast(pThis->notEmpty); + /* first tell the workers our request */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) if(pThis->pWrkThrds[i].tCurrCmd != eWRKTHRDCMD_TERMINATED) { - dbgprintf("Queue 0x%lx: canceling worker thread %d\n", (unsigned long) pThis, i); + dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i); pthread_cancel(pThis->pWrkThrds[i].thrdID); } @@ -544,11 +566,21 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) dbgprintf("Queue 0x%lx: initiating worker thread shutdown sequence\n", (unsigned long) pThis); - iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, 2); // TODO: timeout configurable! + /* even if the timeout count is set to 0 (run endless), we still call the queueWrkThrdTrm(). This + * is necessary so that all threads get sent the termination command. With a timeout of 0, however, + * the function returns immediate with RS_RET_TIMED_OUT. We catch that state and accept it as + * good. + */ + iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN, pThis->toQShutdown); if(iRet == RS_RET_TIMED_OUT) { - /* OK, we now need to try force the shutdown */ - dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", (unsigned long) pThis); - iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 4); // TODO: timeout configurable! + if(pThis->toQShutdown == 0) { + iRet = RS_RET_OK; + } else { + /* OK, we now need to try force the shutdown */ + dbgprintf("Queue 0x%lx: regular worker shutdown timed out, now trying immediate\n", + queueGetID(pThis)); + iRet = queueWrkThrdTrm(pThis, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, pThis->toActShutdown); + } } if(iRet != RS_RET_OK) { /* this is true on actual error on first try or timeout and error on second */ @@ -559,9 +591,12 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) /* finally join the threads * In case of a cancellation, this may actually take some time. This is also - * needed to clean up the thread descriptors, even with a regular termination + * needed to clean up the thread descriptors, even with a regular termination. + * And, most importantly, this is needed if we have an indifitite termination + * time set (timeout == 0)! -- rgerhards, 2008-01-14 */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { +dbgprintf("join thred %d\n", i); pthread_join(pThis->pWrkThrds[i].thrdID, NULL); } @@ -573,7 +608,7 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) } dbgprintf("Queue 0x%lx: worker threads terminated, remaining queue size %d.\n", - (unsigned long) pThis, pThis->iQueueSize); + queueGetID(pThis), pThis->iQueueSize); return iRet; } @@ -591,6 +626,7 @@ queueWorker(void *arg) void *pUsr; sigset_t sigSet; int iMyThrdIndx; /* index for this thread in queue thread table */ + int iCancelStateSave; assert(pThis != NULL); @@ -603,28 +639,37 @@ queueWorker(void *arg) break; assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()); - dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", (unsigned long) pThis, iMyThrdIndx); + dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx); /* tell the world there is one more worker */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); pThis->iCurNumWrkThrd++; pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); /* now we have our identity, on to real processing */ while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN && pThis->iQueueSize > 0)) { + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); while (pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_RUN) { dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", - (unsigned long) pThis, iMyThrdIndx); - pthread_cond_wait (pThis->notEmpty, pThis->mut); + queueGetID(pThis), iMyThrdIndx); + pthread_cond_wait(pThis->notEmpty, pThis->mut); } if(pThis->iQueueSize > 0) { /* dequeue element (still protected from mutex) */ iRet = queueDel(pThis, &pUsr); queueChkPersist(pThis); // when we support peek(), we must do this down after the del! pthread_mutex_unlock(pThis->mut); - pthread_cond_signal (pThis->notFull); + pthread_cond_signal(pThis->notFull); + pthread_setcancelstate(iCancelStateSave, NULL); + /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is + * a cancellation point in itself. As we run most of the time without cancel enabled, I fear + * we may never get cancelled if we do not create a cancellation point ourselfs. + */ + pthread_testcancel(); /* do actual processing (the lengthy part, runs in parallel) * If we had a problem while dequeing, we do not call the consumer, * but we otherwise ignore it. This is in the hopes that it will be @@ -634,19 +679,20 @@ queueWorker(void *arg) if(iRet == RS_RET_OK) { rsRetVal iRetLocal; dbgprintf("Queue 0x%lx/w%d: worker executes consumer...\n", - (unsigned long) pThis, iMyThrdIndx); + queueGetID(pThis), iMyThrdIndx); iRetLocal = pThis->pConsumer(pUsr); dbgprintf("Queue 0x%lx/w%d: worker: consumer returnd %d\n", - (unsigned long) pThis, iMyThrdIndx, iRetLocal); + queueGetID(pThis), iMyThrdIndx, iRetLocal); if(iRetLocal != RS_RET_OK) dbgprintf("Queue 0x%lx/w%d: Consumer returned iRet %d\n", (unsigned long) pThis, iMyThrdIndx, iRetLocal); } else { dbgprintf("Queue 0x%lx/w%d: error %d dequeueing element - ignoring, but strange things " - "may happen\n", (unsigned long) pThis, iMyThrdIndx, iRet); + "may happen\n", queueGetID(pThis), iMyThrdIndx, iRet); } } else { /* the mutex must be unlocked in any case (important for termination) */ pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); } /* We now yield to give the other threads a chance to obtain the mutex. If we do not @@ -666,17 +712,19 @@ queueWorker(void *arg) if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRDCMD_SHUTDOWN) && pThis->iQueueSize > 0) dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has " - " %d messages to process.\n", (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize); + " %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); } /* indicate termination */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); pThis->iCurNumWrkThrd--; pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRDCMD_TERMINATED; pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n", - (unsigned long) pThis, iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd); + queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd); pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); pthread_exit(0); } @@ -772,7 +820,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); - dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", (unsigned long) pThis, pThis->qType, + dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", queueGetID(pThis), pThis->qType, pThis->iMaxFileSize); if(pThis->qType != QUEUETYPE_DIRECT) { @@ -819,7 +867,7 @@ finalize_it: static rsRetVal queuePersist(queue_t *pThis) { DEFiRet; - strm_t *psQIF = NULL;; /* Queue Info File */ + strm_t *psQIF = NULL; /* Queue Info File */ uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; @@ -850,6 +898,7 @@ static rsRetVal queuePersist(queue_t *pThis) CHKiRet(strmConstruct(&psQIF)); CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir()))); CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_WRITE)); + CHKiRet(strmSetiAddtlOpenFlags(psQIF, O_TRUNC)); CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE)); CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam)); CHKiRet(strmConstructFinalize(psQIF)); @@ -1017,10 +1066,7 @@ queueEnqObj(queue_t *pThis, void *pUsr) while(pThis->iQueueSize >= pThis->iMaxQueueSize) { dbgprintf("Queue 0x%lx: enqueueMsg: queue FULL - waiting to drain.\n", (unsigned long) pThis); - - clock_gettime (CLOCK_REALTIME, &t); - t.tv_sec += 2; /* TODO: configurable! */ - + queueTimeoutComp(&t, pThis->toEnq); if(pthread_cond_timedwait (pThis->notFull, pThis->mut, &t) != 0) { dbgprintf("Queue 0x%lx: enqueueMsg: cond timeout, dropping message!\n", (unsigned long) pThis); @@ -1047,15 +1093,9 @@ finalize_it: /* some simple object access methods */ DEFpropSetMeth(queue, bImmediateShutdown, int); DEFpropSetMeth(queue, iPersistUpdCnt, int); -#if 0 -rsRetVal queueSetiPersistUpdCnt(queue_t *pThis, int pVal) -{ - dbgprintf("queueSetiPersistUpdCnt(), val %d\n", pVal); - pThis->iPersistUpdCnt = pVal; -dbgprintf("queSetiPersist..(): PersUpdCnt %d, UpdsSincePers %d\n", pThis->iPersistUpdCnt, pThis->iUpdsSincePersist); - return RS_RET_OK; -} -#endif +DEFpropSetMeth(queue, toQShutdown, long); +DEFpropSetMeth(queue, toActShutdown, long); +DEFpropSetMeth(queue, toEnq, long); /* This function can be used as a generic way to set properties. Only the subset @@ -85,6 +85,9 @@ typedef struct queue_s { int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ int bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */ + int toQShutdown; /* timeout for regular queue shutdown in ms */ + int toActShutdown; /* timeout for long-running action shutdown in ms */ + int toEnq; /* enqueue timeout */ rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dequeued messages */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); @@ -136,6 +139,9 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, PROTOTYPEObjClassInit(queue); PROTOTYPEpropSetMeth(queue, bImmediateShutdown, int); PROTOTYPEpropSetMeth(queue, iPersistUpdCnt, int); +PROTOTYPEpropSetMeth(queue, toQShutdown, long); +PROTOTYPEpropSetMeth(queue, toActShutdown, long); +PROTOTYPEpropSetMeth(queue, toEnq, long); #define queueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ @@ -87,9 +87,10 @@ dbgprintf("strmOpenFile actual open %p, iFileNumDigits: %d\n", pThis, pThis->iFi if(pThis->tOperationsMode == STREAMMODE_READ) iFlags = O_RDONLY; else - //iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND; iFlags = O_WRONLY | O_CREAT; + iFlags |= pThis->iAddtlOpenFlags; + pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode); if(pThis->fd == -1) { int ierrnoSave = errno; @@ -388,8 +389,8 @@ static rsRetVal strmWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) CHKiRet(strmOpenFile(pThis)); iWritten = write(pThis->fd, pBuf, lenBuf); - dbgprintf("Stream 0x%lx: write wrote %d bytes to file %d, errno: %d, err %s\n", (unsigned long) pThis, - iWritten, pThis->fd, errno, strerror(errno)); + dbgprintf("Stream 0x%lx: write wrote %d bytes to file %d, errno: %d\n", (unsigned long) pThis, + iWritten, pThis->fd, errno); /* TODO: handle error case -- rgerhards, 2008-01-07 */ /* Now indicate buffer empty again. We do this in any case, because there @@ -564,10 +565,22 @@ rsRetVal strmSetiMaxFiles(strm_t *pThis, int iNewVal) { pThis->iMaxFiles = iNewVal; pThis->iFileNumDigits = getNumberDigits(iNewVal); -dbgprintf("strmSetiMaxFiles %p val %d, digits %d\n", pThis, iNewVal, pThis->iFileNumDigits); return RS_RET_OK; } +rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal) +{ + DEFiRet; + + if(iNewVal & O_APPEND) + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + + pThis->iAddtlOpenFlags = iNewVal; + +finalize_it: + return iRet; +} + /* set the stream's file prefix * The passed-in string is duplicated. So if the caller does not need @@ -704,10 +717,6 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) l = (long) pThis->iCurrOffs; objSerializeSCALAR_VAR(pStrm, iCurrOffs, LONG, l); - // TODO: really serialize? - //l = (long) pThis->iMaxFileSize; - //objSerializeSCALAR_VAR(pStrm, iMaxFileSize, LONG, l); - CHKiRet(objEndSerialize(pStrm)); finalize_it: @@ -69,6 +69,7 @@ typedef struct strm_s { int lenFName; strmMode_t tOperationsMode; mode_t tOpenMode; + int iAddtlOpenFlags; /* can be used to specifiy additional (compatible!) open flags */ size_t iMaxFileSize;/* maximum size a file may grow to */ int iMaxFiles; /* maximum number of files if a circular mode is in use */ int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */ @@ -95,7 +96,6 @@ rsRetVal strmSetMaxFileSize(strm_t *pThis, size_t iMaxFileSize); rsRetVal strmSetFileName(strm_t *pThis, uchar *pszName, size_t iLenName); rsRetVal strmReadChar(strm_t *pThis, uchar *pC); rsRetVal strmUnreadChar(strm_t *pThis, uchar c); -//rsRetVal strmSeek(strm_t *pThis, off_t offs); rsRetVal strmSeekCurrOffs(strm_t *pThis); rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); rsRetVal strmWriteChar(strm_t *pThis, uchar c); @@ -106,6 +106,7 @@ rsRetVal strmFlush(strm_t *pThis); rsRetVal strmRecordBegin(strm_t *pThis); rsRetVal strmRecordEnd(strm_t *pThis); rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm); +rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal); PROTOTYPEObjClassInit(strm); PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int); PROTOTYPEpropSetMeth(strm, iMaxFileSize, int); @@ -409,8 +409,10 @@ static int iMainMsgQueueNumWorkers = 1; /* number of worker threads for the m static queueType_t MainMsgQueType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */ static uchar *pszMainMsgQFName = NULL; /* prefix for the main message queue file */ static size_t iMainMsgQueMaxFileSize = 1024*1024; -static int bMainMsgQImmediateShutdown = 0; /* shut down the queue immediately? */ static int iMainMsgQPersistUpdCnt = 0; /* persist queue info every n updates */ +static int iMainMsgQtoQShutdown = 0; /* queue shutdown */ +static int iMainMsgQtoActShutdown = 1000; /* action shutdown (in phase 2) */ +static int iMainMsgQtoEnq = 2000; /* timeout for queue enque */ /* This structure represents the files that will have log @@ -514,8 +516,10 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a iMainMsgQueueSize = 10000; iMainMsgQueMaxFileSize = 1024 * 1024; iMainMsgQueueNumWorkers = 1; - bMainMsgQImmediateShutdown = 0; iMainMsgQPersistUpdCnt = 0; + iMainMsgQtoQShutdown = 0; + iMainMsgQtoActShutdown = 1000; + iMainMsgQtoEnq = 2000; MainMsgQueType = QUEUETYPE_FIXED_ARRAY; return RS_RET_OK; @@ -1649,12 +1653,23 @@ int shouldProcessThisMessage(selector_t *f, msg_t *pMsg) } +/* cancellation cleanup handler - frees the action mutex + * rgerhards, 2008-01-14 + */ +static void callActionMutClean(void *arg) +{ + assert(arg != NULL); + pthread_mutex_unlock((pthread_mutex_t*) arg); +} + + /* call the configured action. Does all necessary housekeeping. * rgerhards, 2007-08-01 */ static rsRetVal callAction(msg_t *pMsg, action_t *pAction) { DEFiRet; + int iCancelStateSave; assert(pMsg != NULL); assert(pAction != NULL); @@ -1664,7 +1679,10 @@ static rsRetVal callAction(msg_t *pMsg, action_t *pAction) * become important when we (possibly) have multiple worker threads. * rgerhards, 2007-12-11 */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); LockObj(pAction); + pthread_cleanup_push(callActionMutClean, pAction->Sync_mut); + pthread_setcancelstate(iCancelStateSave, NULL); /* first, we need to check if this is a disabled * entry. If so, we must not further process it. @@ -1730,7 +1748,10 @@ static rsRetVal callAction(msg_t *pMsg, action_t *pAction) } finalize_it: + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); UnlockObj(pAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ + pthread_setcancelstate(iCancelStateSave, NULL); return iRet; } @@ -3098,8 +3119,10 @@ static void dbgPrintInitInfo(void) cCCEscapeChar); dbgprintf("Main queue size %d messages.\n", iMainMsgQueueSize); - dbgprintf("Main queue worker threads: %d, ImmediateShutdown: %d, Perists every %d updates.\n", - iMainMsgQueueNumWorkers, bMainMsgQImmediateShutdown, iMainMsgQPersistUpdCnt); + dbgprintf("Main queue worker threads: %d, Perists every %d updates.\n", + iMainMsgQueueNumWorkers, iMainMsgQPersistUpdCnt); + dbgprintf("Main queue timeouts: shutdown: %d, action completion shutdown: %d, enq: %d\n", + iMainMsgQtoQShutdown, iMainMsgQtoActShutdown, iMainMsgQtoEnq); dbgprintf("Work Directory: '%s'.\n", pszWorkDir); } @@ -3354,11 +3377,13 @@ init(void) logerrorInt("Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ } - setQPROP(queueSetbImmediateShutdown, "$MainMsgQueueImmediateShutdown", bMainMsgQImmediateShutdown); setQPROP(queueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize); setQPROPstr(queueSetFilePrefix, "$MainMsgQueueFileName", (pszMainMsgQFName == NULL ? (uchar*) "mainq" : pszMainMsgQFName)); setQPROP(queueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt); + setQPROP(queueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown ); + setQPROP(queueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown); + setQPROP(queueSettoEnq, "$MainMsgQueueTimeoutEnqueue", iMainMsgQtoEnq); # undef setQPROP # undef setQPROPstr @@ -4267,6 +4292,7 @@ extern void dbgprintf(char *fmt, ...) __attribute__((format(printf,1, 2))); void dbgprintf(char *fmt, ...) { + static pthread_t ptLastThrdID = 0; static int bWasNL = FALSE; va_list ap; @@ -4283,6 +4309,14 @@ dbgprintf(char *fmt, ...) * pretty well. * rgerhards, 2007-06-15 */ + if(ptLastThrdID != pthread_self()) { + if(!bWasNL) { + fprintf(stdout, "\n"); + bWasNL = 1; + } + ptLastThrdID = pthread_self(); + } + if(bWasNL) { fprintf(stdout, "%8.8x: ", (unsigned int) pthread_self()); //fprintf(stderr, "%8.8x: ", (unsigned int) pthread_self()); @@ -4517,10 +4551,12 @@ static rsRetVal loadBuildInModules(void) CHKiRet(regCfSysLineHdlr((uchar *)"workdirectory", 0, eCmdHdlrGetWord, NULL, &pszWorkDir, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszMainMsgQFName, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesize", 0, eCmdHdlrInt, NULL, &iMainMsgQueueSize, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueimmediateshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQImmediateShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iMainMsgQPersistUpdCnt, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetype", 0, eCmdHdlrGetWord, setMainMsgQueType, NULL, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iMainMsgQueueNumWorkers, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoQShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iMainMsgQtoActShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iMainMsgQtoEnq, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL)); |