summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-05-26 12:43:43 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-05-26 12:43:43 +0200
commitaa9426f683fa6af9280bc63050ee0187ba4c57e1 (patch)
tree5ba68517cc2661ab3de4afb417592ed67bdab183
parent210f43137d6a077abbd8b77c1f72193dcd81cc99 (diff)
downloadrsyslog-aa9426f683fa6af9280bc63050ee0187ba4c57e1.tar.gz
rsyslog-aa9426f683fa6af9280bc63050ee0187ba4c57e1.tar.xz
rsyslog-aa9426f683fa6af9280bc63050ee0187ba4c57e1.zip
solved design issue with queue termination
... and also improved the test suite. There is a design issue in the v3 queue engine that manifested to some serious problems with the new processing mode. However, in v3 shutdown may take eternally if a queue runs in DA mode, is configured to preserve data AND the action fails and retries immediately. There is no cure available for v3, it would require doing much of the work we have done on the new engine. The window of exposure, as one might guess from the description, is very small. That is probably the reason why we have not seen it in practice.
-rw-r--r--runtime/queue.c253
-rw-r--r--runtime/queue.h1
-rw-r--r--runtime/rsyslog.h3
-rw-r--r--runtime/stream.c44
-rw-r--r--runtime/stream.h1
-rw-r--r--runtime/wti.c13
-rw-r--r--runtime/wtp.c13
-rw-r--r--tests/DiagTalker.java32
-rwxr-xr-xtests/arrayqueue.sh2
-rw-r--r--tests/chkseq.c63
-rwxr-xr-xtests/da-mainmsg-q.sh2
-rwxr-xr-xtests/diskqueue.sh2
-rwxr-xr-xtests/imtcp-multiport.sh6
-rwxr-xr-xtests/linkedlistqueue.sh2
-rwxr-xr-xtests/manytcp.sh2
-rwxr-xr-xtests/memq-persist.sh12
-rw-r--r--tests/rscript.c4
17 files changed, 319 insertions, 136 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 0019297b..539cf4ec 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -202,9 +202,10 @@ getLogicalQueueSize(qqueue_t *pThis)
static inline void queueDrain(qqueue_t *pThis)
{
void *pUsr;
-
ASSERT(pThis != NULL);
+ BEGINfunc
+ dbgoprint((obj_t*) pThis, "queue will lose %d messages, destroying...\n", pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(pThis->iQueueSize-- > 0) {
pThis->qDeq(pThis, &pUsr);
@@ -213,6 +214,7 @@ static inline void queueDrain(qqueue_t *pThis)
}
pThis->qDel(pThis);
}
+ ENDfunc
}
@@ -617,6 +619,7 @@ static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out)
RETiRet;
}
+
static rsRetVal qDelFixedArray(qqueue_t *pThis)
{
DEFiRet;
@@ -631,6 +634,26 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis)
}
+/* reset the logical dequeue pointer to the physical dequeue position.
+ * This is only needed after we cancelled workers (during queue shutdown).
+ */
+static rsRetVal
+qUnDeqAllFixedArray(qqueue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+
+ dbgoprint((obj_t*) pThis, "resetting FixedArray deq index to %ld (was %ld), logical dequeue count %d\n",
+ pThis->tVars.farray.head, pThis->tVars.farray.deqhead, pThis->nLogDeq);
+
+ pThis->tVars.farray.deqhead = pThis->tVars.farray.head;
+ pThis->nLogDeq = 0;
+
+ RETiRet;
+}
+
+
/* -------------------- linked list -------------------- */
@@ -695,7 +718,6 @@ static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr)
qLinkedList_t *pEntry;
DEFiRet;
-RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot);
pEntry = pThis->tVars.linklist.pDeqRoot;
*ppUsr = pEntry->pUsr;
pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
@@ -723,6 +745,26 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis)
}
+/* reset the logical dequeue pointer to the physical dequeue position.
+ * This is only needed after we cancelled workers (during queue shutdown).
+ */
+static rsRetVal
+qUnDeqAllLinkedList(qqueue_t *pThis)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ dbgoprint((obj_t*) pThis, "resetting LinkedList deq ptr to %p (was %p), logical dequeue count %d\n",
+ pThis->tVars.linklist.pDelRoot, pThis->tVars.linklist.pDeqRoot, pThis->nLogDeq);
+
+ pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pDelRoot;
+ pThis->nLogDeq = 0;
+
+ RETiRet;
+}
+
+
/* -------------------- disk -------------------- */
@@ -822,25 +864,16 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
- CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
- CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDel));
-
/* we now need to take care of the Deq handle. It is not persisted, so we can create
* a virgin copy based on pReadDel. // TODO duplicat code, same as blow - single function!
*/
- CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDeq));
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0));
- CHKiRet(strmSetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
- CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000));
- CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ));
- CHKiRet(strmSetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strmDup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */
CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDeq));
- /* TODO: dirty, need stream methods --> */
- pThis->tVars.disk.pReadDeq->iCurrFNum = pThis->tVars.disk.pReadDel->iCurrFNum;
- pThis->tVars.disk.pReadDeq->iCurrOffs = pThis->tVars.disk.pReadDel->iCurrOffs;
- /* <-- dirty, need stream methods :TODO */
+ CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
+ CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDel));
CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDeq));
/* OK, we could successfully read the file, so we now can request that it be
@@ -1012,6 +1045,16 @@ finalize_it:
}
+/* This is a dummy function for disks - we do not need to reset anything
+ * because everything is already persisted...
+ */
+static rsRetVal
+qUnDeqAllDisk(__attribute__((unused)) qqueue_t *pThis)
+{
+ return RS_RET_OK;
+}
+
+
/* -------------------- direct (no queueing) -------------------- */
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
{
@@ -1055,6 +1098,12 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
+static rsRetVal
+qUnDeqAllDirect(__attribute__((unused)) qqueue_t *pThis)
+{
+ return RS_RET_OK;
+}
+
/* --------------- end type-specific handlers -------------------- */
@@ -1109,22 +1158,29 @@ qqueueDeq(qqueue_t *pThis, void *pUsr)
/* This function shuts down all worker threads and waits until they
- * have terminated. If they timeout, they are cancelled. Parameters have been set
- * before this function is called so that DA queues will be fully persisted to
- * disk (if configured to do so).
+ * have terminated. If they timeout, they are cancelled.
* rgerhards, 2008-01-24
* Please note that this function shuts down BOTH the parent AND the child queue
* in DA case. This is necessary because their timeouts are tightly coupled. Most
* importantly, the timeouts would be applied twice (or logic be extremely
* complex) if each would have its own shutdown. The function does not self check
* this condition - the caller must make sure it is not called with a parent.
+ * rgerhards, 2009-05-26: we do NO logner persist the queue here if bSaveOnShutdown
+ * is set. This must be handled by the caller. Not doing that cleans up the queue
+ * shutdown considerably. Also, older engines had a potential hang condition when
+ * the DA queue was already started and the DA worker configured for infinite
+ * retries and the action was during retry processing. This was a design issue,
+ * which is solved as of now. Note that the shutdown now may take a little bit
+ * longer, because we no longer can persist the queue in parallel to waiting
+ * on worker timeouts.
*/
-static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
+static rsRetVal
+ShutdownWorkers(qqueue_t *pThis)
{
- DEFiRet;
DEFVARS_mutexProtection;
struct timespec tTimeout;
rsRetVal iRetLocal;
+ DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
@@ -1193,53 +1249,18 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
}
/* when we reach this point, both queues are either empty or the regular queue shutdown timeout
- * has expired. Now we need to check if we are configured to not loose messages. If so, we need
- * to persist the queue to disk (this is only possible if the queue is DA-enabled). We must also
- * set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate as soon as its consumer
- * is done. This is especially important as we otherwise may interfere with queue order while the
- * DA consumer is running. -- rgerhards, 2008-01-27
- * Note: there was a note that we should not wait eternally on the DA worker if we run in
- * enqueue-only note. I have reviewed the code and think there is no need for this check. Howerver,
- * I'd like to keep this note in here should we happen to run into some related trouble.
- * rgerhards, 2008-01-28
+ * has expired. We must set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate
+ * as soon as its consumer is done. In particular, it does no longer need try to empty the queue.
*/
wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
- if(pThis->bRunsDA)
+ if(pThis->bRunsDA) {
qqueueWaitDAModeInitialized(pThis);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- /* optimize parameters for shutdown of DA-enabled queues */
- if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
- /* switch to enqueue-only mode so that no more actions happen */
- if(pThis->bRunsDA == 0) {
- qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
- } else {
- /* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1)
- * but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27
- */
- qqueueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
- }
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- /* make sure we do not timeout before we are done */
- dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n");
- timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
- /* and run the primary queue's DA worker to drain the queue */
- iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
- if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
- "continuing, but results are unpredictable\n", iRetLocal);
- }
- } else {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); /* also stop DA queue */
}
- /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we
- * can now request immediate shutdown of any remaining workers. Note that if bSaveOnShutdown was set,
- * the queue is now empty. If regular workers are still running, and try to pull the next message,
- * they will automatically terminate as there no longer is any message left to process.
- */
+ /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
if(getPhysicalQueueSize(pThis) > 0) {
timeoutComp(&tTimeout, pThis->toActShutdown);
@@ -1278,10 +1299,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
/* Now queue workers should have terminated. If not, we need to cancel them as we have applied
* all timeout setting. If any worker in any queue still executes, its consumer is possibly
- * long-running and cancelling is the only way to get rid of it. Note that the
- * cancellation handler will probably re-queue a user pointer, so the queue's enqueue
- * function is still needed (what is no problem as we do not yet destroy the queue - but I
- * thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25
+ * long-running and cancelling is the only way to get rid of it.
*/
dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n");
iRetLocal = wtpCancelAll(pThis->pWtpReg); /* returns immediately if all threads already have terminated */
@@ -1290,12 +1308,6 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
"threads, continuing, but results are unpredictable\n", iRetLocal);
}
-
- /* TODO: think: do we really need to do this here? Can't it happen on DA queue destruction? If we
- * disable it, we get an assertion... I think this is OK, as we need to have a certain order and
- * canceling the DA workers here ensures that order. But in any instant, we may have a look at this
- * code after we have reaced the milestone. -- rgerhards, 2008-01-27
- */
/* ... and now the DA queue, if it exists (should always be after the primary one) */
if(pThis->pqDA != NULL) {
dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the DA queue\n");
@@ -1310,7 +1322,8 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running.
*/
- dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", getPhysicalQueueSize(pThis));
+ dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n",
+ getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
}
@@ -1367,6 +1380,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddFixedArray;
pThis->qDeq = qDeqFixedArray;
pThis->qDel = qDelFixedArray;
+ pThis->qUnDeqAll = qUnDeqAllFixedArray;
break;
case QUEUETYPE_LINKEDLIST:
pThis->qConstruct = qConstructLinkedList;
@@ -1374,6 +1388,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddLinkedList;
pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->qUnDeqAll = qUnDeqAllLinkedList;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
@@ -1381,6 +1396,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddDisk;
pThis->qDeq = qDeqDisk;
pThis->qDel = qDelDisk;
+ pThis->qUnDeqAll = qUnDeqAllDisk;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
break;
@@ -1389,6 +1405,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qDestruct = qDestructDirect;
pThis->qAdd = qAddDirect;
pThis->qDel = qDelDirect;
+ pThis->qUnDeqAll = qUnDeqAllDirect;
break;
}
@@ -1817,20 +1834,17 @@ finalize_it:
* If we are a child, we have done our duty when the queue is empty. In that case,
* we can terminate.
* Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from
- * the DA queue
+ * the DA queue.
+ * If our queue is in destruction, we drain to the DA queue and so we shall not terminate
+ * until we have done so.
*/
-static int
+static rsRetVal
qqueueChkStopWrkrDA(qqueue_t *pThis)
{
- /* if our queue is in destruction, we drain to the DA queue and so we shall not terminate
- * until we have done so.
- */
- int bStopWrkr;
-
- BEGINfunc
+ DEFiRet;
if(pThis->bEnqOnly) {
- bStopWrkr = 1;
+ iRet = RS_RET_TERMINATE_NOW;
} else {
if(pThis->bRunsDA) {
ASSERT(pThis->pqDA != NULL);
@@ -1838,19 +1852,16 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
&& pThis->pqDA->sizeOnDiskMax > 0
&& pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
/* this queue can never grow, so we can give up... */
- bStopWrkr = 1;
+ iRet = RS_RET_TERMINATE_NOW;
} else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
- bStopWrkr = 1;
- } else {
- bStopWrkr = 0;
+ iRet = RS_RET_TERMINATE_NOW;
}
} else {
- bStopWrkr = 1;
+ iRet = RS_RET_TERMINATE_NOW;
}
}
- ENDfunc
- return bStopWrkr;
+ RETiRet;
}
@@ -1861,10 +1872,20 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
* Version for the regular worker thread. NOTE: the pThis->bRunsDA is different from
* the DA queue
*/
-static int
+static rsRetVal
ChkStopWrkrReg(qqueue_t *pThis)
{
+ DEFiRet;
+ /* original condition
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0);
+ * TODO: remove when verified! -- rgerhards, 2009-05-26
+ */
+ if(pThis->bEnqOnly || pThis->bRunsDA)
+ iRet = RS_RET_TERMINATE_NOW;
+ else if(pThis->pqParent != NULL)
+ iRet = RS_RET_TERMINATE_WHEN_IDLE;
+
+ RETiRet;
}
@@ -1881,7 +1902,6 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal)
}
-/* common function for the idle functions that deletes the last batch if TODO MULTI */
/* must only be called when the queue mutex is locked, else results
* are not stable! DA queue version
*/
@@ -1890,7 +1910,7 @@ qqueueIsIdleDA(qqueue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
/* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
- return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk));
}
/* must only be called when the queue mutex is locked, else results
* are not stable! Regular queue version
@@ -1905,7 +1925,7 @@ IsIdleReg(qqueue_t *pThis)
return ret;
#else
/* regular code! */
- return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk));
#endif
}
@@ -2176,19 +2196,62 @@ finalize_it:
}
+/* persist a queue with all data elements to disk - this is used to handle
+ * bSaveOnShutdown. We utilize the DA worker to do this. This must only
+ * be called after all workers have been shut down and if bSaveOnShutdown
+ * is actually set. Note that this function may potentially run long,
+ * depending on the queue configuration (e.g. store on remote machine).
+ * rgerhards, 2009-05-26
+ */
+static inline rsRetVal
+DoSaveOnShutdown(qqueue_t *pThis)
+{
+ struct timespec tTimeout;
+ rsRetVal iRetLocal;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+
+ qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
+ /* make sure we do not timeout before we are done */
+ dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n");
+ timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
+ /* and run the primary queue's DA worker to drain the queue */
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
+ dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n",
+ iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+ if(iRetLocal != RS_RET_OK) {
+ dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
+ "continuing, but results are unpredictable\n", iRetLocal);
+ }
+
+ RETiRet;
+}
+
+
/* destructor for the queue object */
BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(qqueue)
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
- /* shut down all workers (handles *all* of the persistence logic)
- * See function head comment of queueShutdownWorkers () on why we don't call it
- * We also do not need to shutdown workers when we are in enqueue-only mode or we are a
+ /* shut down all workers
+ * We do not need to shutdown workers when we are in enqueue-only mode or we are a
* direct queue - because in both cases we have none... ;)
* with a child! -- rgerhards, 2008-01-28
*/
if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL)
- qqueueShutdownWorkers(pThis);
+ ShutdownWorkers(pThis);
+
+ /* now all workers are terminated. Messages may exist. Also, some logically dequeued
+ * messages may never have been processed because their worker was terminated. So
+ * we need to reset the logical dequeue pointer, persist the queue if configured to do
+ * so and then destruct everything. -- rgerhards, 2009-05-26
+ */
+ CHKiRet(pThis->qUnDeqAll(pThis));
+
+ if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
+ CHKiRet(DoSaveOnShutdown(pThis));
+ }
/* finally destruct our (regular) worker thread pool
* Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,
diff --git a/runtime/queue.h b/runtime/queue.h
index 954a7fd4..c1fe597d 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -123,6 +123,7 @@ typedef struct queue_s {
rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr);
rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
rsRetVal (*qDel)(struct queue_s *pThis);
+ rsRetVal (*qUnDeqAll)(struct queue_s *pThis);
/* end type-specific handler */
/* synchronization variables */
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 0c671f03..a43c0327 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -298,7 +298,8 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */
RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */
RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */
- RS_RET_IDLE = 4 /**< operation successful, but callee is idle (e.g. because queue is empty) */
+ RS_RET_IDLE = 4, /**< operation successful, but callee is idle (e.g. because queue is empty) */
+ RS_RET_TERMINATE_WHEN_IDLE = 5 /**< operation successful, function is requested to terminate when idle */
};
/* some helpful macros to work with srRetVals.
diff --git a/runtime/stream.c b/runtime/stream.c
index 50d419be..59e8be3a 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -45,6 +45,7 @@
#include "srUtils.h"
#include "obj.h"
#include "stream.h"
+#include "unicode-helper.h"
/* static data */
DEFobjStaticHelpers
@@ -80,7 +81,7 @@ static rsRetVal strmOpenFile(strm_t *pThis)
pThis->pszFName, pThis->lenFName, pThis->iCurrFNum, pThis->iFileNumDigits));
} else {
if(pThis->pszDir == NULL) {
- if((pThis->pszCurrFName = (uchar*) strdup((char*) pThis->pszFName)) == NULL)
+ if((pThis->pszCurrFName = ustrdup(pThis->pszFName)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
} else {
CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir,
@@ -814,6 +815,47 @@ finalize_it:
}
+/* duplicate a stream object excluding dynamic properties. This function is
+ * primarily meant to provide a duplicate that later on can be used to access
+ * the data. This is needed, for example, for a restart of the disk queue.
+ * Note that ConstructFinalize() is NOT called. So our caller may change some
+ * properties before finalizing things.
+ * rgerhards, 2009-05-26
+ */
+rsRetVal
+strmDup(strm_t *pThis, strm_t **ppNew)
+{
+ strm_t *pNew = NULL;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+ assert(ppNew != NULL);
+
+ CHKiRet(strmConstruct(&pNew));
+ pNew->sType = pThis->sType;
+ pNew->iCurrFNum = pThis->iCurrFNum;
+ CHKmalloc(pNew->pszFName = ustrdup(pThis->pszFName));
+ pNew->lenFName = pThis->lenFName;
+ CHKmalloc(pNew->pszDir = ustrdup(pThis->pszDir));
+ pNew->lenDir = pThis->lenDir;
+ pNew->tOperationsMode = pThis->tOperationsMode;
+ pNew->tOpenMode = pThis->tOpenMode;
+ pNew->iAddtlOpenFlags = pThis->iAddtlOpenFlags;
+ pNew->iMaxFileSize = pThis->iMaxFileSize;
+ pNew->iMaxFiles = pThis->iMaxFiles;
+ pNew->iFileNumDigits = pThis->iFileNumDigits;
+ pNew->bDeleteOnClose = pThis->bDeleteOnClose;
+ pNew->iCurrOffs = pThis->iCurrOffs;
+
+ *ppNew = pNew;
+ pNew = NULL;
+
+finalize_it:
+ if(pNew != NULL)
+ strmDestruct(&pNew);
+
+ RETiRet;
+}
/* set a user write-counter. This counter is initialized to zero and
* receives the number of bytes written. It is accurate only after a
diff --git a/runtime/stream.h b/runtime/stream.h
index 371358ab..e5d05b55 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -119,6 +119,7 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm);
rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal);
rsRetVal strmGetCurrOffset(strm_t *pThis, int64 *pOffs);
rsRetVal strmSetWCntr(strm_t *pThis, number_t *pWCnt);
+rsRetVal strmDup(strm_t *pThis, strm_t **ppNew);
PROTOTYPEObjClassInit(strm);
PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int);
PROTOTYPEpropSetMeth(strm, iMaxFileSize, int);
diff --git a/runtime/wti.c b/runtime/wti.c
index 1be008df..3b6bf1b9 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -380,6 +380,7 @@ wtiWorker(wti_t *pThis)
wtp_t *pWtp; /* our worker thread pool */
int bInactivityTOOccured = 0;
rsRetVal localRet;
+ rsRetVal terminateRet;
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
@@ -406,15 +407,21 @@ wtiWorker(wti_t *pThis)
wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
- /* first check if we are in shutdown process */
- if(wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
- break; /* end worker thread run */
+ /* first check if we are in shutdown process (but evaluate a bit later) */
+ terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED);
+ if(terminateRet == RS_RET_TERMINATE_NOW) {
+ // TODO: we need to free the old batch! -- rgerhards, 2009-05-26 MULTI
+ break;
}
/* try to execute and process whatever we have */
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
if(localRet == RS_RET_IDLE) {
+ if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE) {
+ break; /* end of loop */
+ }
+
if(bInactivityTOOccured) {
/* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */
break; /* end worker thread run */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 40a9095b..41fcd8d9 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -261,16 +261,19 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
ISOBJ_TYPE_assert(pThis, wtp);
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
- || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, pThis)))
- iRet = RS_RET_TERMINATE_NOW;
+ if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
+ ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
+ } else if(pThis->wtpState == wtpState_SHUTDOWN) {
+ ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE);
+ }
/* try customer handler if one was set and we do not yet have a definite result */
- if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) {
+ if(pThis->pfChkStopWrkr != NULL) {
iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
}
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+finalize_it:
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
}
diff --git a/tests/DiagTalker.java b/tests/DiagTalker.java
index e33a5867..85a6671e 100644
--- a/tests/DiagTalker.java
+++ b/tests/DiagTalker.java
@@ -1,3 +1,24 @@
+/* A yet very simple tool to talk to imdiag.
+ *
+ * Copyright 2009 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
//package com.rsyslog.diag;
import java.io.*;
import java.net.*;
@@ -29,9 +50,14 @@ public class DiagTalker {
new InputStreamReader(System.in));
String userInput;
- while ((userInput = stdIn.readLine()) != null) {
- out.println(userInput);
- System.out.println("imdiag returns: " + in.readLine());
+ try {
+ while ((userInput = stdIn.readLine()) != null) {
+ out.println(userInput);
+ System.out.println("imdiag returns: " + in.readLine());
+ }
+ } catch (SocketException e) {
+ System.err.println("We had a socket exception and consider this to be OK: "
+ + e.getMessage());
}
out.close();
diff --git a/tests/arrayqueue.sh b/tests/arrayqueue.sh
index 5b8ebb5f..7791ed57 100755
--- a/tests/arrayqueue.sh
+++ b/tests/arrayqueue.sh
@@ -17,7 +17,7 @@ sleep 4 # we need this so that rsyslogd can receive all outstanding messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 39999
+./chkseq -fwork -e 39999
if [ "$?" -ne "0" ]; then
# rm -f work rsyslog.out.log
echo "sequence error detected"
diff --git a/tests/chkseq.c b/tests/chkseq.c
index 3203c250..5ffe855c 100644
--- a/tests/chkseq.c
+++ b/tests/chkseq.c
@@ -3,9 +3,10 @@
* be set.
*
* Params
- * argv[1] file to check
- * argv[2] start number
- * argv[3] end number
+ * -f<filename> MUST be given!
+ * -s<starting number> -e<ending number>
+ * default for s is 0. -e should be given (else it is also 0)
+ * -d may be specified, in which case duplicate messages are permitted.
*
* Part of the testbench for rsyslog.
*
@@ -31,6 +32,7 @@
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
+#include <getopt.h>
int main(int argc, char *argv[])
{
@@ -38,16 +40,36 @@ int main(int argc, char *argv[])
int val;
int i;
int ret = 0;
- int start, end;
+ int dupsPermitted = 0;
+ int start = 0, end = 0;
+ int opt;
+ int nDups = 0;
+ char *file = NULL;
- if(argc != 4) {
- printf("Invalid call of chkseq\n");
- printf("Usage: chkseq file start end\n");
+ while((opt = getopt(argc, argv, "e:f:ds:")) != EOF) {
+ switch((char)opt) {
+ case 'f':
+ file = optarg;
+ break;
+ case 'd':
+ dupsPermitted = 1;
+ break;
+ case 'e':
+ end = atoi(optarg);
+ break;
+ case 's':
+ start = atoi(optarg);
+ break;
+ default:printf("Invalid call of chkseq\n");
+ printf("Usage: chkseq file -sstart -eend -d\n");
+ exit(1);
+ }
+ }
+
+ if(file == NULL) {
+ printf("file must be given!\n");
exit(1);
}
-
- start = atoi(argv[2]);
- end = atoi(argv[3]);
if(start > end) {
printf("start must be less than or equal end!\n");
@@ -55,22 +77,35 @@ int main(int argc, char *argv[])
}
/* read file */
- fp = fopen(argv[1], "r");
+ fp = fopen(file, "r");
if(fp == NULL) {
perror(argv[1]);
exit(1);
}
- for(i = start ; i < end ; ++i) {
+ for(i = start ; i < end+1 ; ++i) {
if(fscanf(fp, "%d\n", &val) != 1) {
printf("scanf error in index i=%d\n", i);
exit(1);
}
if(val != i) {
- printf("read value %d, but expected value %d\n", val, i);
- exit(1);
+ if(val == i - 1 && dupsPermitted) {
+ --i;
+ ++nDups;
+ } else {
+ printf("read value %d, but expected value %d\n", val, i);
+ exit(1);
+ }
}
}
+ if(nDups != 0)
+ printf("info: had %d duplicates (this is no error)\n", nDups);
+
+ if(i - 1 != end) {
+ printf("only %d records in file, expected %d\n", i - 1, end);
+ exit(1);
+ }
+
exit(ret);
}
diff --git a/tests/da-mainmsg-q.sh b/tests/da-mainmsg-q.sh
index 91addf68..fde9e06e 100755
--- a/tests/da-mainmsg-q.sh
+++ b/tests/da-mainmsg-q.sh
@@ -52,7 +52,7 @@ sleep 1 # we need this so that rsyslogd can receive all outstanding messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 20099
+./chkseq -fwork -e20099
if [ "$?" -ne "0" ]; then
# rm -f work rsyslog.out.log
echo "sequence error detected"
diff --git a/tests/diskqueue.sh b/tests/diskqueue.sh
index 20767a90..42018b15 100755
--- a/tests/diskqueue.sh
+++ b/tests/diskqueue.sh
@@ -26,7 +26,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 19999
+./chkseq -fwork -e19999
if [ "$?" -ne "0" ]; then
# rm -f work rsyslog.out.log
echo "sequence error detected"
diff --git a/tests/imtcp-multiport.sh b/tests/imtcp-multiport.sh
index 17480dae..73ab9558 100755
--- a/tests/imtcp-multiport.sh
+++ b/tests/imtcp-multiport.sh
@@ -21,7 +21,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 9999
+./chkseq -fwork -e9999
if [ "$?" -ne "0" ]; then
# rm -f work rsyslog.out.log
echo "sequence error detected"
@@ -46,7 +46,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 9999
+./chkseq -fwork -e9999
if [ "$?" -ne "0" ]; then
# rm -f work rsyslog.out.log
echo "sequence error detected"
@@ -71,7 +71,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 9999
+./chkseq -fwork -e9999
if [ "$?" -ne "0" ]; then
# rm -f work rsyslog.out.log
echo "sequence error detected"
diff --git a/tests/linkedlistqueue.sh b/tests/linkedlistqueue.sh
index aac1abb6..aa574bd1 100755
--- a/tests/linkedlistqueue.sh
+++ b/tests/linkedlistqueue.sh
@@ -17,7 +17,7 @@ sleep 4 # we need this so that rsyslogd can receive all outstanding messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 39999
+./chkseq -fwork -e39999
if [ "$?" -ne "0" ]; then
# rm -f work rsyslog.out.log
echo "sequence error detected"
diff --git a/tests/manytcp.sh b/tests/manytcp.sh
index 06bd38b6..861f12ff 100755
--- a/tests/manytcp.sh
+++ b/tests/manytcp.sh
@@ -12,7 +12,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 39999
+./chkseq -fwork -e39999
if [ "$?" -ne "0" ]; then
rm -f work rsyslog.out.log
echo "sequence error detected"
diff --git a/tests/memq-persist.sh b/tests/memq-persist.sh
index 108cba57..e935d8db 100755
--- a/tests/memq-persist.sh
+++ b/tests/memq-persist.sh
@@ -10,9 +10,11 @@
#export RSYSLOG_DEBUGLOG="log"
echo testing memory queue persisting to disk
$srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
+rm -f core.*
rm -rf test-spool
mkdir test-spool
rm -f work rsyslog.out.log rsyslog.out.log.save # work files
+#valgrind ../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/memq-persist1.conf &
../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/memq-persist1.conf &
sleep 1
echo "rsyslogd started with pid " `cat rsyslog.pid`
@@ -22,20 +24,22 @@ if [ "$?" -ne "0" ]; then
echo "error during tcpflood! see rsyslog.out.log.save for what was written"
cp rsyslog.out.log rsyslog.out.log.save
fi
-sleep 3 # we need to wait to ensure everything is received (less 1 second would be better)
+sleep 4 # we need to wait to ensure everything is received (less 1 second would be better)
kill `cat rsyslog.pid`
-sleep 5 # wait for engine to terminate
+echo wait for shutdown
+$srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
echo There must exist some files now:
ls -l test-spool
# restart engine and have rest processed
../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/memq-persist2.conf &
+sleep 1
$srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 9999
+./chkseq -fwork -e9999 -d
if [ "$?" -ne "0" ]; then
- # rm -f work rsyslog.out.log
+ rm -f work rsyslog.out.log
echo "sequence error detected"
exit 1
fi
diff --git a/tests/rscript.c b/tests/rscript.c
index ce81491c..6361aec4 100644
--- a/tests/rscript.c
+++ b/tests/rscript.c
@@ -104,8 +104,8 @@ PerformTest(cstr_t *pstrIn, rsRetVal iRetExpected, cstr_t *pstrOut)
if(strcmp((char*)rsCStrGetSzStr(pstrPrg), (char*)rsCStrGetSzStr(pstrOut))) {
printf("error: compiled program different from expected result!\n");
- printf("generated vmprg (%d bytes):\n%s\n", strlen((char*)rsCStrGetSzStr(pstrPrg)), rsCStrGetSzStr(pstrPrg));
- printf("expected (%d bytes):\n%s\n", strlen((char*)rsCStrGetSzStr(pstrOut)), rsCStrGetSzStr(pstrOut));
+ printf("generated vmprg (%d bytes):\n%s\n", (int)strlen((char*)rsCStrGetSzStr(pstrPrg)), rsCStrGetSzStr(pstrPrg));
+ printf("expected (%d bytes):\n%s\n", (int)strlen((char*)rsCStrGetSzStr(pstrOut)), rsCStrGetSzStr(pstrOut));
ABORT_FINALIZE(RS_RET_ERR);
}