diff options
-rw-r--r-- | runtime/queue.c | 253 | ||||
-rw-r--r-- | runtime/queue.h | 1 | ||||
-rw-r--r-- | runtime/rsyslog.h | 3 | ||||
-rw-r--r-- | runtime/stream.c | 44 | ||||
-rw-r--r-- | runtime/stream.h | 1 | ||||
-rw-r--r-- | runtime/wti.c | 13 | ||||
-rw-r--r-- | runtime/wtp.c | 13 | ||||
-rw-r--r-- | tests/DiagTalker.java | 32 | ||||
-rwxr-xr-x | tests/arrayqueue.sh | 2 | ||||
-rw-r--r-- | tests/chkseq.c | 63 | ||||
-rwxr-xr-x | tests/da-mainmsg-q.sh | 2 | ||||
-rwxr-xr-x | tests/diskqueue.sh | 2 | ||||
-rwxr-xr-x | tests/imtcp-multiport.sh | 6 | ||||
-rwxr-xr-x | tests/linkedlistqueue.sh | 2 | ||||
-rwxr-xr-x | tests/manytcp.sh | 2 | ||||
-rwxr-xr-x | tests/memq-persist.sh | 12 | ||||
-rw-r--r-- | tests/rscript.c | 4 |
17 files changed, 319 insertions, 136 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 0019297b..539cf4ec 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -202,9 +202,10 @@ getLogicalQueueSize(qqueue_t *pThis) static inline void queueDrain(qqueue_t *pThis) { void *pUsr; - ASSERT(pThis != NULL); + BEGINfunc + dbgoprint((obj_t*) pThis, "queue will lose %d messages, destroying...\n", pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(pThis->iQueueSize-- > 0) { pThis->qDeq(pThis, &pUsr); @@ -213,6 +214,7 @@ static inline void queueDrain(qqueue_t *pThis) } pThis->qDel(pThis); } + ENDfunc } @@ -617,6 +619,7 @@ static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out) RETiRet; } + static rsRetVal qDelFixedArray(qqueue_t *pThis) { DEFiRet; @@ -631,6 +634,26 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis) } +/* reset the logical dequeue pointer to the physical dequeue position. + * This is only needed after we cancelled workers (during queue shutdown). + */ +static rsRetVal +qUnDeqAllFixedArray(qqueue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + + dbgoprint((obj_t*) pThis, "resetting FixedArray deq index to %ld (was %ld), logical dequeue count %d\n", + pThis->tVars.farray.head, pThis->tVars.farray.deqhead, pThis->nLogDeq); + + pThis->tVars.farray.deqhead = pThis->tVars.farray.head; + pThis->nLogDeq = 0; + + RETiRet; +} + + /* -------------------- linked list -------------------- */ @@ -695,7 +718,6 @@ static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr) qLinkedList_t *pEntry; DEFiRet; -RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot); pEntry = pThis->tVars.linklist.pDeqRoot; *ppUsr = pEntry->pUsr; pThis->tVars.linklist.pDeqRoot = pEntry->pNext; @@ -723,6 +745,26 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis) } +/* reset the logical dequeue pointer to the physical dequeue position. + * This is only needed after we cancelled workers (during queue shutdown). + */ +static rsRetVal +qUnDeqAllLinkedList(qqueue_t *pThis) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + dbgoprint((obj_t*) pThis, "resetting LinkedList deq ptr to %p (was %p), logical dequeue count %d\n", + pThis->tVars.linklist.pDelRoot, pThis->tVars.linklist.pDeqRoot, pThis->nLogDeq); + + pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pDelRoot; + pThis->nLogDeq = 0; + + RETiRet; +} + + /* -------------------- disk -------------------- */ @@ -822,25 +864,16 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF, (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); - CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite)); - CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDel)); - /* we now need to take care of the Deq handle. It is not persisted, so we can create * a virgin copy based on pReadDel. // TODO duplicat code, same as blow - single function! */ - CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDeq)); - CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); - CHKiRet(strmSetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); - CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000)); - CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ)); - CHKiRet(strmSetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR)); + CHKiRet(strmDup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDeq)); - /* TODO: dirty, need stream methods --> */ - pThis->tVars.disk.pReadDeq->iCurrFNum = pThis->tVars.disk.pReadDel->iCurrFNum; - pThis->tVars.disk.pReadDeq->iCurrOffs = pThis->tVars.disk.pReadDel->iCurrOffs; - /* <-- dirty, need stream methods :TODO */ + CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite)); + CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDel)); CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDeq)); /* OK, we could successfully read the file, so we now can request that it be @@ -1012,6 +1045,16 @@ finalize_it: } +/* This is a dummy function for disks - we do not need to reset anything + * because everything is already persisted... + */ +static rsRetVal +qUnDeqAllDisk(__attribute__((unused)) qqueue_t *pThis) +{ + return RS_RET_OK; +} + + /* -------------------- direct (no queueing) -------------------- */ static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis) { @@ -1055,6 +1098,12 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } +static rsRetVal +qUnDeqAllDirect(__attribute__((unused)) qqueue_t *pThis) +{ + return RS_RET_OK; +} + /* --------------- end type-specific handlers -------------------- */ @@ -1109,22 +1158,29 @@ qqueueDeq(qqueue_t *pThis, void *pUsr) /* This function shuts down all worker threads and waits until they - * have terminated. If they timeout, they are cancelled. Parameters have been set - * before this function is called so that DA queues will be fully persisted to - * disk (if configured to do so). + * have terminated. If they timeout, they are cancelled. * rgerhards, 2008-01-24 * Please note that this function shuts down BOTH the parent AND the child queue * in DA case. This is necessary because their timeouts are tightly coupled. Most * importantly, the timeouts would be applied twice (or logic be extremely * complex) if each would have its own shutdown. The function does not self check * this condition - the caller must make sure it is not called with a parent. + * rgerhards, 2009-05-26: we do NO logner persist the queue here if bSaveOnShutdown + * is set. This must be handled by the caller. Not doing that cleans up the queue + * shutdown considerably. Also, older engines had a potential hang condition when + * the DA queue was already started and the DA worker configured for infinite + * retries and the action was during retry processing. This was a design issue, + * which is solved as of now. Note that the shutdown now may take a little bit + * longer, because we no longer can persist the queue in parallel to waiting + * on worker timeouts. */ -static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) +static rsRetVal +ShutdownWorkers(qqueue_t *pThis) { - DEFiRet; DEFVARS_mutexProtection; struct timespec tTimeout; rsRetVal iRetLocal; + DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ @@ -1193,53 +1249,18 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) } /* when we reach this point, both queues are either empty or the regular queue shutdown timeout - * has expired. Now we need to check if we are configured to not loose messages. If so, we need - * to persist the queue to disk (this is only possible if the queue is DA-enabled). We must also - * set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate as soon as its consumer - * is done. This is especially important as we otherwise may interfere with queue order while the - * DA consumer is running. -- rgerhards, 2008-01-27 - * Note: there was a note that we should not wait eternally on the DA worker if we run in - * enqueue-only note. I have reviewed the code and think there is no need for this check. Howerver, - * I'd like to keep this note in here should we happen to run into some related trouble. - * rgerhards, 2008-01-28 + * has expired. We must set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate + * as soon as its consumer is done. In particular, it does no longer need try to empty the queue. */ wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */ /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ - if(pThis->bRunsDA) + if(pThis->bRunsDA) { qqueueWaitDAModeInitialized(pThis); - - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - /* optimize parameters for shutdown of DA-enabled queues */ - if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { - /* switch to enqueue-only mode so that no more actions happen */ - if(pThis->bRunsDA == 0) { - qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ - } else { - /* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1) - * but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27 - */ - qqueueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */ - } - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - /* make sure we do not timeout before we are done */ - dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n"); - timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); - /* and run the primary queue's DA worker to drain the queue */ - iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); - if(iRetLocal != RS_RET_OK) { - dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, " - "continuing, but results are unpredictable\n", iRetLocal); - } - } else { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); /* also stop DA queue */ } - /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we - * can now request immediate shutdown of any remaining workers. Note that if bSaveOnShutdown was set, - * the queue is now empty. If regular workers are still running, and try to pull the next message, - * they will automatically terminate as there no longer is any message left to process. - */ + /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ if(getPhysicalQueueSize(pThis) > 0) { timeoutComp(&tTimeout, pThis->toActShutdown); @@ -1278,10 +1299,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) /* Now queue workers should have terminated. If not, we need to cancel them as we have applied * all timeout setting. If any worker in any queue still executes, its consumer is possibly - * long-running and cancelling is the only way to get rid of it. Note that the - * cancellation handler will probably re-queue a user pointer, so the queue's enqueue - * function is still needed (what is no problem as we do not yet destroy the queue - but I - * thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25 + * long-running and cancelling is the only way to get rid of it. */ dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n"); iRetLocal = wtpCancelAll(pThis->pWtpReg); /* returns immediately if all threads already have terminated */ @@ -1290,12 +1308,6 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) "threads, continuing, but results are unpredictable\n", iRetLocal); } - - /* TODO: think: do we really need to do this here? Can't it happen on DA queue destruction? If we - * disable it, we get an assertion... I think this is OK, as we need to have a certain order and - * canceling the DA workers here ensures that order. But in any instant, we may have a look at this - * code after we have reaced the milestone. -- rgerhards, 2008-01-27 - */ /* ... and now the DA queue, if it exists (should always be after the primary one) */ if(pThis->pqDA != NULL) { dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the DA queue\n"); @@ -1310,7 +1322,8 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) * Well, more precisely, they *are in termination*. Some cancel cleanup handlers * may still be running. */ - dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", getPhysicalQueueSize(pThis)); + dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n", + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; } @@ -1367,6 +1380,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddFixedArray; pThis->qDeq = qDeqFixedArray; pThis->qDel = qDelFixedArray; + pThis->qUnDeqAll = qUnDeqAllFixedArray; break; case QUEUETYPE_LINKEDLIST: pThis->qConstruct = qConstructLinkedList; @@ -1374,6 +1388,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddLinkedList; pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->qUnDeqAll = qUnDeqAllLinkedList; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; @@ -1381,6 +1396,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddDisk; pThis->qDeq = qDeqDisk; pThis->qDel = qDelDisk; + pThis->qUnDeqAll = qUnDeqAllDisk; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ break; @@ -1389,6 +1405,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qDestruct = qDestructDirect; pThis->qAdd = qAddDirect; pThis->qDel = qDelDirect; + pThis->qUnDeqAll = qUnDeqAllDirect; break; } @@ -1817,20 +1834,17 @@ finalize_it: * If we are a child, we have done our duty when the queue is empty. In that case, * we can terminate. * Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from - * the DA queue + * the DA queue. + * If our queue is in destruction, we drain to the DA queue and so we shall not terminate + * until we have done so. */ -static int +static rsRetVal qqueueChkStopWrkrDA(qqueue_t *pThis) { - /* if our queue is in destruction, we drain to the DA queue and so we shall not terminate - * until we have done so. - */ - int bStopWrkr; - - BEGINfunc + DEFiRet; if(pThis->bEnqOnly) { - bStopWrkr = 1; + iRet = RS_RET_TERMINATE_NOW; } else { if(pThis->bRunsDA) { ASSERT(pThis->pqDA != NULL); @@ -1838,19 +1852,16 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) && pThis->pqDA->sizeOnDiskMax > 0 && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { /* this queue can never grow, so we can give up... */ - bStopWrkr = 1; + iRet = RS_RET_TERMINATE_NOW; } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { - bStopWrkr = 1; - } else { - bStopWrkr = 0; + iRet = RS_RET_TERMINATE_NOW; } } else { - bStopWrkr = 1; + iRet = RS_RET_TERMINATE_NOW; } } - ENDfunc - return bStopWrkr; + RETiRet; } @@ -1861,10 +1872,20 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) * Version for the regular worker thread. NOTE: the pThis->bRunsDA is different from * the DA queue */ -static int +static rsRetVal ChkStopWrkrReg(qqueue_t *pThis) { + DEFiRet; + /* original condition return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0); + * TODO: remove when verified! -- rgerhards, 2009-05-26 + */ + if(pThis->bEnqOnly || pThis->bRunsDA) + iRet = RS_RET_TERMINATE_NOW; + else if(pThis->pqParent != NULL) + iRet = RS_RET_TERMINATE_WHEN_IDLE; + + RETiRet; } @@ -1881,7 +1902,6 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal) } -/* common function for the idle functions that deletes the last batch if TODO MULTI */ /* must only be called when the queue mutex is locked, else results * are not stable! DA queue version */ @@ -1890,7 +1910,7 @@ qqueueIsIdleDA(qqueue_t *pThis) { /* remember: iQueueSize is the DA queue size, not the main queue! */ /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */ - return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk)); } /* must only be called when the queue mutex is locked, else results * are not stable! Regular queue version @@ -1905,7 +1925,7 @@ IsIdleReg(qqueue_t *pThis) return ret; #else /* regular code! */ - return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk)); #endif } @@ -2176,19 +2196,62 @@ finalize_it: } +/* persist a queue with all data elements to disk - this is used to handle + * bSaveOnShutdown. We utilize the DA worker to do this. This must only + * be called after all workers have been shut down and if bSaveOnShutdown + * is actually set. Note that this function may potentially run long, + * depending on the queue configuration (e.g. store on remote machine). + * rgerhards, 2009-05-26 + */ +static inline rsRetVal +DoSaveOnShutdown(qqueue_t *pThis) +{ + struct timespec tTimeout; + rsRetVal iRetLocal; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + + qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ + /* make sure we do not timeout before we are done */ + dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n"); + timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); + /* and run the primary queue's DA worker to drain the queue */ + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); + dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n", + iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + if(iRetLocal != RS_RET_OK) { + dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, " + "continuing, but results are unpredictable\n", iRetLocal); + } + + RETiRet; +} + + /* destructor for the queue object */ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(qqueue) pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ - /* shut down all workers (handles *all* of the persistence logic) - * See function head comment of queueShutdownWorkers () on why we don't call it - * We also do not need to shutdown workers when we are in enqueue-only mode or we are a + /* shut down all workers + * We do not need to shutdown workers when we are in enqueue-only mode or we are a * direct queue - because in both cases we have none... ;) * with a child! -- rgerhards, 2008-01-28 */ if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL) - qqueueShutdownWorkers(pThis); + ShutdownWorkers(pThis); + + /* now all workers are terminated. Messages may exist. Also, some logically dequeued + * messages may never have been processed because their worker was terminated. So + * we need to reset the logical dequeue pointer, persist the queue if configured to do + * so and then destruct everything. -- rgerhards, 2009-05-26 + */ + CHKiRet(pThis->qUnDeqAll(pThis)); + + if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { + CHKiRet(DoSaveOnShutdown(pThis)); + } /* finally destruct our (regular) worker thread pool * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen, diff --git a/runtime/queue.h b/runtime/queue.h index 954a7fd4..c1fe597d 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -123,6 +123,7 @@ typedef struct queue_s { rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr); rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr); rsRetVal (*qDel)(struct queue_s *pThis); + rsRetVal (*qUnDeqAll)(struct queue_s *pThis); /* end type-specific handler */ /* synchronization variables */ pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 0c671f03..a43c0327 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -298,7 +298,8 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */ RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */ RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */ - RS_RET_IDLE = 4 /**< operation successful, but callee is idle (e.g. because queue is empty) */ + RS_RET_IDLE = 4, /**< operation successful, but callee is idle (e.g. because queue is empty) */ + RS_RET_TERMINATE_WHEN_IDLE = 5 /**< operation successful, function is requested to terminate when idle */ }; /* some helpful macros to work with srRetVals. diff --git a/runtime/stream.c b/runtime/stream.c index 50d419be..59e8be3a 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -45,6 +45,7 @@ #include "srUtils.h" #include "obj.h" #include "stream.h" +#include "unicode-helper.h" /* static data */ DEFobjStaticHelpers @@ -80,7 +81,7 @@ static rsRetVal strmOpenFile(strm_t *pThis) pThis->pszFName, pThis->lenFName, pThis->iCurrFNum, pThis->iFileNumDigits)); } else { if(pThis->pszDir == NULL) { - if((pThis->pszCurrFName = (uchar*) strdup((char*) pThis->pszFName)) == NULL) + if((pThis->pszCurrFName = ustrdup(pThis->pszFName)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } else { CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir, @@ -814,6 +815,47 @@ finalize_it: } +/* duplicate a stream object excluding dynamic properties. This function is + * primarily meant to provide a duplicate that later on can be used to access + * the data. This is needed, for example, for a restart of the disk queue. + * Note that ConstructFinalize() is NOT called. So our caller may change some + * properties before finalizing things. + * rgerhards, 2009-05-26 + */ +rsRetVal +strmDup(strm_t *pThis, strm_t **ppNew) +{ + strm_t *pNew = NULL; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, strm); + assert(ppNew != NULL); + + CHKiRet(strmConstruct(&pNew)); + pNew->sType = pThis->sType; + pNew->iCurrFNum = pThis->iCurrFNum; + CHKmalloc(pNew->pszFName = ustrdup(pThis->pszFName)); + pNew->lenFName = pThis->lenFName; + CHKmalloc(pNew->pszDir = ustrdup(pThis->pszDir)); + pNew->lenDir = pThis->lenDir; + pNew->tOperationsMode = pThis->tOperationsMode; + pNew->tOpenMode = pThis->tOpenMode; + pNew->iAddtlOpenFlags = pThis->iAddtlOpenFlags; + pNew->iMaxFileSize = pThis->iMaxFileSize; + pNew->iMaxFiles = pThis->iMaxFiles; + pNew->iFileNumDigits = pThis->iFileNumDigits; + pNew->bDeleteOnClose = pThis->bDeleteOnClose; + pNew->iCurrOffs = pThis->iCurrOffs; + + *ppNew = pNew; + pNew = NULL; + +finalize_it: + if(pNew != NULL) + strmDestruct(&pNew); + + RETiRet; +} /* set a user write-counter. This counter is initialized to zero and * receives the number of bytes written. It is accurate only after a diff --git a/runtime/stream.h b/runtime/stream.h index 371358ab..e5d05b55 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -119,6 +119,7 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm); rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal); rsRetVal strmGetCurrOffset(strm_t *pThis, int64 *pOffs); rsRetVal strmSetWCntr(strm_t *pThis, number_t *pWCnt); +rsRetVal strmDup(strm_t *pThis, strm_t **ppNew); PROTOTYPEObjClassInit(strm); PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int); PROTOTYPEpropSetMeth(strm, iMaxFileSize, int); diff --git a/runtime/wti.c b/runtime/wti.c index 1be008df..3b6bf1b9 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -380,6 +380,7 @@ wtiWorker(wti_t *pThis) wtp_t *pWtp; /* our worker thread pool */ int bInactivityTOOccured = 0; rsRetVal localRet; + rsRetVal terminateRet; DEFiRet; ISOBJ_TYPE_assert(pThis, wti); @@ -406,15 +407,21 @@ wtiWorker(wti_t *pThis) wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */ BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); - /* first check if we are in shutdown process */ - if(wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) { - break; /* end worker thread run */ + /* first check if we are in shutdown process (but evaluate a bit later) */ + terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED); + if(terminateRet == RS_RET_TERMINATE_NOW) { + // TODO: we need to free the old batch! -- rgerhards, 2009-05-26 MULTI + break; } /* try to execute and process whatever we have */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave); if(localRet == RS_RET_IDLE) { + if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE) { + break; /* end of loop */ + } + if(bInactivityTOOccured) { /* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */ break; /* end worker thread run */ diff --git a/runtime/wtp.c b/runtime/wtp.c index 40a9095b..41fcd8d9 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -261,16 +261,19 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) ISOBJ_TYPE_assert(pThis, wtp); BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) - || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, pThis))) - iRet = RS_RET_TERMINATE_NOW; + if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) { + ABORT_FINALIZE(RS_RET_TERMINATE_NOW); + } else if(pThis->wtpState == wtpState_SHUTDOWN) { + ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); + } /* try customer handler if one was set and we do not yet have a definite result */ - if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) { + if(pThis->pfChkStopWrkr != NULL) { iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex); } - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); +finalize_it: + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; } diff --git a/tests/DiagTalker.java b/tests/DiagTalker.java index e33a5867..85a6671e 100644 --- a/tests/DiagTalker.java +++ b/tests/DiagTalker.java @@ -1,3 +1,24 @@ +/* A yet very simple tool to talk to imdiag. + * + * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ //package com.rsyslog.diag; import java.io.*; import java.net.*; @@ -29,9 +50,14 @@ public class DiagTalker { new InputStreamReader(System.in)); String userInput; - while ((userInput = stdIn.readLine()) != null) { - out.println(userInput); - System.out.println("imdiag returns: " + in.readLine()); + try { + while ((userInput = stdIn.readLine()) != null) { + out.println(userInput); + System.out.println("imdiag returns: " + in.readLine()); + } + } catch (SocketException e) { + System.err.println("We had a socket exception and consider this to be OK: " + + e.getMessage()); } out.close(); diff --git a/tests/arrayqueue.sh b/tests/arrayqueue.sh index 5b8ebb5f..7791ed57 100755 --- a/tests/arrayqueue.sh +++ b/tests/arrayqueue.sh @@ -17,7 +17,7 @@ sleep 4 # we need this so that rsyslogd can receive all outstanding messages kill `cat rsyslog.pid` rm -f work sort < rsyslog.out.log > work -./chkseq work 0 39999 +./chkseq -fwork -e 39999 if [ "$?" -ne "0" ]; then # rm -f work rsyslog.out.log echo "sequence error detected" diff --git a/tests/chkseq.c b/tests/chkseq.c index 3203c250..5ffe855c 100644 --- a/tests/chkseq.c +++ b/tests/chkseq.c @@ -3,9 +3,10 @@ * be set. * * Params - * argv[1] file to check - * argv[2] start number - * argv[3] end number + * -f<filename> MUST be given! + * -s<starting number> -e<ending number> + * default for s is 0. -e should be given (else it is also 0) + * -d may be specified, in which case duplicate messages are permitted. * * Part of the testbench for rsyslog. * @@ -31,6 +32,7 @@ #include "config.h" #include <stdio.h> #include <stdlib.h> +#include <getopt.h> int main(int argc, char *argv[]) { @@ -38,16 +40,36 @@ int main(int argc, char *argv[]) int val; int i; int ret = 0; - int start, end; + int dupsPermitted = 0; + int start = 0, end = 0; + int opt; + int nDups = 0; + char *file = NULL; - if(argc != 4) { - printf("Invalid call of chkseq\n"); - printf("Usage: chkseq file start end\n"); + while((opt = getopt(argc, argv, "e:f:ds:")) != EOF) { + switch((char)opt) { + case 'f': + file = optarg; + break; + case 'd': + dupsPermitted = 1; + break; + case 'e': + end = atoi(optarg); + break; + case 's': + start = atoi(optarg); + break; + default:printf("Invalid call of chkseq\n"); + printf("Usage: chkseq file -sstart -eend -d\n"); + exit(1); + } + } + + if(file == NULL) { + printf("file must be given!\n"); exit(1); } - - start = atoi(argv[2]); - end = atoi(argv[3]); if(start > end) { printf("start must be less than or equal end!\n"); @@ -55,22 +77,35 @@ int main(int argc, char *argv[]) } /* read file */ - fp = fopen(argv[1], "r"); + fp = fopen(file, "r"); if(fp == NULL) { perror(argv[1]); exit(1); } - for(i = start ; i < end ; ++i) { + for(i = start ; i < end+1 ; ++i) { if(fscanf(fp, "%d\n", &val) != 1) { printf("scanf error in index i=%d\n", i); exit(1); } if(val != i) { - printf("read value %d, but expected value %d\n", val, i); - exit(1); + if(val == i - 1 && dupsPermitted) { + --i; + ++nDups; + } else { + printf("read value %d, but expected value %d\n", val, i); + exit(1); + } } } + if(nDups != 0) + printf("info: had %d duplicates (this is no error)\n", nDups); + + if(i - 1 != end) { + printf("only %d records in file, expected %d\n", i - 1, end); + exit(1); + } + exit(ret); } diff --git a/tests/da-mainmsg-q.sh b/tests/da-mainmsg-q.sh index 91addf68..fde9e06e 100755 --- a/tests/da-mainmsg-q.sh +++ b/tests/da-mainmsg-q.sh @@ -52,7 +52,7 @@ sleep 1 # we need this so that rsyslogd can receive all outstanding messages kill `cat rsyslog.pid` rm -f work sort < rsyslog.out.log > work -./chkseq work 0 20099 +./chkseq -fwork -e20099 if [ "$?" -ne "0" ]; then # rm -f work rsyslog.out.log echo "sequence error detected" diff --git a/tests/diskqueue.sh b/tests/diskqueue.sh index 20767a90..42018b15 100755 --- a/tests/diskqueue.sh +++ b/tests/diskqueue.sh @@ -26,7 +26,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages kill `cat rsyslog.pid` rm -f work sort < rsyslog.out.log > work -./chkseq work 0 19999 +./chkseq -fwork -e19999 if [ "$?" -ne "0" ]; then # rm -f work rsyslog.out.log echo "sequence error detected" diff --git a/tests/imtcp-multiport.sh b/tests/imtcp-multiport.sh index 17480dae..73ab9558 100755 --- a/tests/imtcp-multiport.sh +++ b/tests/imtcp-multiport.sh @@ -21,7 +21,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages kill `cat rsyslog.pid` rm -f work sort < rsyslog.out.log > work -./chkseq work 0 9999 +./chkseq -fwork -e9999 if [ "$?" -ne "0" ]; then # rm -f work rsyslog.out.log echo "sequence error detected" @@ -46,7 +46,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages kill `cat rsyslog.pid` rm -f work sort < rsyslog.out.log > work -./chkseq work 0 9999 +./chkseq -fwork -e9999 if [ "$?" -ne "0" ]; then # rm -f work rsyslog.out.log echo "sequence error detected" @@ -71,7 +71,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages kill `cat rsyslog.pid` rm -f work sort < rsyslog.out.log > work -./chkseq work 0 9999 +./chkseq -fwork -e9999 if [ "$?" -ne "0" ]; then # rm -f work rsyslog.out.log echo "sequence error detected" diff --git a/tests/linkedlistqueue.sh b/tests/linkedlistqueue.sh index aac1abb6..aa574bd1 100755 --- a/tests/linkedlistqueue.sh +++ b/tests/linkedlistqueue.sh @@ -17,7 +17,7 @@ sleep 4 # we need this so that rsyslogd can receive all outstanding messages kill `cat rsyslog.pid` rm -f work sort < rsyslog.out.log > work -./chkseq work 0 39999 +./chkseq -fwork -e39999 if [ "$?" -ne "0" ]; then # rm -f work rsyslog.out.log echo "sequence error detected" diff --git a/tests/manytcp.sh b/tests/manytcp.sh index 06bd38b6..861f12ff 100755 --- a/tests/manytcp.sh +++ b/tests/manytcp.sh @@ -12,7 +12,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages kill `cat rsyslog.pid` rm -f work sort < rsyslog.out.log > work -./chkseq work 0 39999 +./chkseq -fwork -e39999 if [ "$?" -ne "0" ]; then rm -f work rsyslog.out.log echo "sequence error detected" diff --git a/tests/memq-persist.sh b/tests/memq-persist.sh index 108cba57..e935d8db 100755 --- a/tests/memq-persist.sh +++ b/tests/memq-persist.sh @@ -10,9 +10,11 @@ #export RSYSLOG_DEBUGLOG="log" echo testing memory queue persisting to disk $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason +rm -f core.* rm -rf test-spool mkdir test-spool rm -f work rsyslog.out.log rsyslog.out.log.save # work files +#valgrind ../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/memq-persist1.conf & ../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/memq-persist1.conf & sleep 1 echo "rsyslogd started with pid " `cat rsyslog.pid` @@ -22,20 +24,22 @@ if [ "$?" -ne "0" ]; then echo "error during tcpflood! see rsyslog.out.log.save for what was written" cp rsyslog.out.log rsyslog.out.log.save fi -sleep 3 # we need to wait to ensure everything is received (less 1 second would be better) +sleep 4 # we need to wait to ensure everything is received (less 1 second would be better) kill `cat rsyslog.pid` -sleep 5 # wait for engine to terminate +echo wait for shutdown +$srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages echo There must exist some files now: ls -l test-spool # restart engine and have rest processed ../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/memq-persist2.conf & +sleep 1 $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages kill `cat rsyslog.pid` rm -f work sort < rsyslog.out.log > work -./chkseq work 0 9999 +./chkseq -fwork -e9999 -d if [ "$?" -ne "0" ]; then - # rm -f work rsyslog.out.log + rm -f work rsyslog.out.log echo "sequence error detected" exit 1 fi diff --git a/tests/rscript.c b/tests/rscript.c index ce81491c..6361aec4 100644 --- a/tests/rscript.c +++ b/tests/rscript.c @@ -104,8 +104,8 @@ PerformTest(cstr_t *pstrIn, rsRetVal iRetExpected, cstr_t *pstrOut) if(strcmp((char*)rsCStrGetSzStr(pstrPrg), (char*)rsCStrGetSzStr(pstrOut))) { printf("error: compiled program different from expected result!\n"); - printf("generated vmprg (%d bytes):\n%s\n", strlen((char*)rsCStrGetSzStr(pstrPrg)), rsCStrGetSzStr(pstrPrg)); - printf("expected (%d bytes):\n%s\n", strlen((char*)rsCStrGetSzStr(pstrOut)), rsCStrGetSzStr(pstrOut)); + printf("generated vmprg (%d bytes):\n%s\n", (int)strlen((char*)rsCStrGetSzStr(pstrPrg)), rsCStrGetSzStr(pstrPrg)); + printf("expected (%d bytes):\n%s\n", (int)strlen((char*)rsCStrGetSzStr(pstrOut)), rsCStrGetSzStr(pstrOut)); ABORT_FINALIZE(RS_RET_ERR); } |