summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--runtime/queue.c253
-rw-r--r--runtime/queue.h1
-rw-r--r--runtime/rsyslog.h3
-rw-r--r--runtime/stream.c44
-rw-r--r--runtime/stream.h1
-rw-r--r--runtime/wti.c13
-rw-r--r--runtime/wtp.c13
-rw-r--r--tests/DiagTalker.java32
-rwxr-xr-xtests/arrayqueue.sh2
-rw-r--r--tests/chkseq.c63
-rwxr-xr-xtests/da-mainmsg-q.sh2
-rwxr-xr-xtests/diskqueue.sh2
-rwxr-xr-xtests/imtcp-multiport.sh6
-rwxr-xr-xtests/linkedlistqueue.sh2
-rwxr-xr-xtests/manytcp.sh2
-rwxr-xr-xtests/memq-persist.sh12
-rw-r--r--tests/rscript.c4
17 files changed, 319 insertions, 136 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 0019297b..539cf4ec 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -202,9 +202,10 @@ getLogicalQueueSize(qqueue_t *pThis)
static inline void queueDrain(qqueue_t *pThis)
{
void *pUsr;
-
ASSERT(pThis != NULL);
+ BEGINfunc
+ dbgoprint((obj_t*) pThis, "queue will lose %d messages, destroying...\n", pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(pThis->iQueueSize-- > 0) {
pThis->qDeq(pThis, &pUsr);
@@ -213,6 +214,7 @@ static inline void queueDrain(qqueue_t *pThis)
}
pThis->qDel(pThis);
}
+ ENDfunc
}
@@ -617,6 +619,7 @@ static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out)
RETiRet;
}
+
static rsRetVal qDelFixedArray(qqueue_t *pThis)
{
DEFiRet;
@@ -631,6 +634,26 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis)
}
+/* reset the logical dequeue pointer to the physical dequeue position.
+ * This is only needed after we cancelled workers (during queue shutdown).
+ */
+static rsRetVal
+qUnDeqAllFixedArray(qqueue_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+
+ dbgoprint((obj_t*) pThis, "resetting FixedArray deq index to %ld (was %ld), logical dequeue count %d\n",
+ pThis->tVars.farray.head, pThis->tVars.farray.deqhead, pThis->nLogDeq);
+
+ pThis->tVars.farray.deqhead = pThis->tVars.farray.head;
+ pThis->nLogDeq = 0;
+
+ RETiRet;
+}
+
+
/* -------------------- linked list -------------------- */
@@ -695,7 +718,6 @@ static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr)
qLinkedList_t *pEntry;
DEFiRet;
-RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot);
pEntry = pThis->tVars.linklist.pDeqRoot;
*ppUsr = pEntry->pUsr;
pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
@@ -723,6 +745,26 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis)
}
+/* reset the logical dequeue pointer to the physical dequeue position.
+ * This is only needed after we cancelled workers (during queue shutdown).
+ */
+static rsRetVal
+qUnDeqAllLinkedList(qqueue_t *pThis)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ dbgoprint((obj_t*) pThis, "resetting LinkedList deq ptr to %p (was %p), logical dequeue count %d\n",
+ pThis->tVars.linklist.pDelRoot, pThis->tVars.linklist.pDeqRoot, pThis->nLogDeq);
+
+ pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pDelRoot;
+ pThis->nLogDeq = 0;
+
+ RETiRet;
+}
+
+
/* -------------------- disk -------------------- */
@@ -822,25 +864,16 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
- CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
- CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDel));
-
/* we now need to take care of the Deq handle. It is not persisted, so we can create
* a virgin copy based on pReadDel. // TODO duplicat code, same as blow - single function!
*/
- CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDeq));
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0));
- CHKiRet(strmSetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
- CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000));
- CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ));
- CHKiRet(strmSetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strmDup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */
CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDeq));
- /* TODO: dirty, need stream methods --> */
- pThis->tVars.disk.pReadDeq->iCurrFNum = pThis->tVars.disk.pReadDel->iCurrFNum;
- pThis->tVars.disk.pReadDeq->iCurrOffs = pThis->tVars.disk.pReadDel->iCurrOffs;
- /* <-- dirty, need stream methods :TODO */
+ CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
+ CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDel));
CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDeq));
/* OK, we could successfully read the file, so we now can request that it be
@@ -1012,6 +1045,16 @@ finalize_it:
}
+/* This is a dummy function for disks - we do not need to reset anything
+ * because everything is already persisted...
+ */
+static rsRetVal
+qUnDeqAllDisk(__attribute__((unused)) qqueue_t *pThis)
+{
+ return RS_RET_OK;
+}
+
+
/* -------------------- direct (no queueing) -------------------- */
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
{
@@ -1055,6 +1098,12 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
+static rsRetVal
+qUnDeqAllDirect(__attribute__((unused)) qqueue_t *pThis)
+{
+ return RS_RET_OK;
+}
+
/* --------------- end type-specific handlers -------------------- */
@@ -1109,22 +1158,29 @@ qqueueDeq(qqueue_t *pThis, void *pUsr)
/* This function shuts down all worker threads and waits until they
- * have terminated. If they timeout, they are cancelled. Parameters have been set
- * before this function is called so that DA queues will be fully persisted to
- * disk (if configured to do so).
+ * have terminated. If they timeout, they are cancelled.
* rgerhards, 2008-01-24
* Please note that this function shuts down BOTH the parent AND the child queue
* in DA case. This is necessary because their timeouts are tightly coupled. Most
* importantly, the timeouts would be applied twice (or logic be extremely
* complex) if each would have its own shutdown. The function does not self check
* this condition - the caller must make sure it is not called with a parent.
+ * rgerhards, 2009-05-26: we do NO logner persist the queue here if bSaveOnShutdown
+ * is set. This must be handled by the caller. Not doing that cleans up the queue
+ * shutdown considerably. Also, older engines had a potential hang condition when
+ * the DA queue was already started and the DA worker configured for infinite
+ * retries and the action was during retry processing. This was a design issue,
+ * which is solved as of now. Note that the shutdown now may take a little bit
+ * longer, because we no longer can persist the queue in parallel to waiting
+ * on worker timeouts.
*/
-static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
+static rsRetVal
+ShutdownWorkers(qqueue_t *pThis)
{
- DEFiRet;
DEFVARS_mutexProtection;
struct timespec tTimeout;
rsRetVal iRetLocal;
+ DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
@@ -1193,53 +1249,18 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
}
/* when we reach this point, both queues are either empty or the regular queue shutdown timeout
- * has expired. Now we need to check if we are configured to not loose messages. If so, we need
- * to persist the queue to disk (this is only possible if the queue is DA-enabled). We must also
- * set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate as soon as its consumer
- * is done. This is especially important as we otherwise may interfere with queue order while the
- * DA consumer is running. -- rgerhards, 2008-01-27
- * Note: there was a note that we should not wait eternally on the DA worker if we run in
- * enqueue-only note. I have reviewed the code and think there is no need for this check. Howerver,
- * I'd like to keep this note in here should we happen to run into some related trouble.
- * rgerhards, 2008-01-28
+ * has expired. We must set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate
+ * as soon as its consumer is done. In particular, it does no longer need try to empty the queue.
*/
wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
- if(pThis->bRunsDA)
+ if(pThis->bRunsDA) {
qqueueWaitDAModeInitialized(pThis);
-
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
- /* optimize parameters for shutdown of DA-enabled queues */
- if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
- /* switch to enqueue-only mode so that no more actions happen */
- if(pThis->bRunsDA == 0) {
- qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */
- } else {
- /* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1)
- * but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27
- */
- qqueueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */
- }
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- /* make sure we do not timeout before we are done */
- dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n");
- timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
- /* and run the primary queue's DA worker to drain the queue */
- iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
- if(iRetLocal != RS_RET_OK) {
- dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
- "continuing, but results are unpredictable\n", iRetLocal);
- }
- } else {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
+ wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); /* also stop DA queue */
}
- /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we
- * can now request immediate shutdown of any remaining workers. Note that if bSaveOnShutdown was set,
- * the queue is now empty. If regular workers are still running, and try to pull the next message,
- * they will automatically terminate as there no longer is any message left to process.
- */
+ /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
if(getPhysicalQueueSize(pThis) > 0) {
timeoutComp(&tTimeout, pThis->toActShutdown);
@@ -1278,10 +1299,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
/* Now queue workers should have terminated. If not, we need to cancel them as we have applied
* all timeout setting. If any worker in any queue still executes, its consumer is possibly
- * long-running and cancelling is the only way to get rid of it. Note that the
- * cancellation handler will probably re-queue a user pointer, so the queue's enqueue
- * function is still needed (what is no problem as we do not yet destroy the queue - but I
- * thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25
+ * long-running and cancelling is the only way to get rid of it.
*/
dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n");
iRetLocal = wtpCancelAll(pThis->pWtpReg); /* returns immediately if all threads already have terminated */
@@ -1290,12 +1308,6 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
"threads, continuing, but results are unpredictable\n", iRetLocal);
}
-
- /* TODO: think: do we really need to do this here? Can't it happen on DA queue destruction? If we
- * disable it, we get an assertion... I think this is OK, as we need to have a certain order and
- * canceling the DA workers here ensures that order. But in any instant, we may have a look at this
- * code after we have reaced the milestone. -- rgerhards, 2008-01-27
- */
/* ... and now the DA queue, if it exists (should always be after the primary one) */
if(pThis->pqDA != NULL) {
dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the DA queue\n");
@@ -1310,7 +1322,8 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running.
*/
- dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", getPhysicalQueueSize(pThis));
+ dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n",
+ getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
}
@@ -1367,6 +1380,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddFixedArray;
pThis->qDeq = qDeqFixedArray;
pThis->qDel = qDelFixedArray;
+ pThis->qUnDeqAll = qUnDeqAllFixedArray;
break;
case QUEUETYPE_LINKEDLIST:
pThis->qConstruct = qConstructLinkedList;
@@ -1374,6 +1388,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddLinkedList;
pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
+ pThis->qUnDeqAll = qUnDeqAllLinkedList;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
@@ -1381,6 +1396,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddDisk;
pThis->qDeq = qDeqDisk;
pThis->qDel = qDelDisk;
+ pThis->qUnDeqAll = qUnDeqAllDisk;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
break;
@@ -1389,6 +1405,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qDestruct = qDestructDirect;
pThis->qAdd = qAddDirect;
pThis->qDel = qDelDirect;
+ pThis->qUnDeqAll = qUnDeqAllDirect;
break;
}
@@ -1817,20 +1834,17 @@ finalize_it:
* If we are a child, we have done our duty when the queue is empty. In that case,
* we can terminate.
* Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from
- * the DA queue
+ * the DA queue.
+ * If our queue is in destruction, we drain to the DA queue and so we shall not terminate
+ * until we have done so.
*/
-static int
+static rsRetVal
qqueueChkStopWrkrDA(qqueue_t *pThis)
{
- /* if our queue is in destruction, we drain to the DA queue and so we shall not terminate
- * until we have done so.
- */
- int bStopWrkr;
-
- BEGINfunc
+ DEFiRet;
if(pThis->bEnqOnly) {
- bStopWrkr = 1;
+ iRet = RS_RET_TERMINATE_NOW;
} else {
if(pThis->bRunsDA) {
ASSERT(pThis->pqDA != NULL);
@@ -1838,19 +1852,16 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
&& pThis->pqDA->sizeOnDiskMax > 0
&& pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
/* this queue can never grow, so we can give up... */
- bStopWrkr = 1;
+ iRet = RS_RET_TERMINATE_NOW;
} else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
- bStopWrkr = 1;
- } else {
- bStopWrkr = 0;
+ iRet = RS_RET_TERMINATE_NOW;
}
} else {
- bStopWrkr = 1;
+ iRet = RS_RET_TERMINATE_NOW;
}
}
- ENDfunc
- return bStopWrkr;
+ RETiRet;
}
@@ -1861,10 +1872,20 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
* Version for the regular worker thread. NOTE: the pThis->bRunsDA is different from
* the DA queue
*/
-static int
+static rsRetVal
ChkStopWrkrReg(qqueue_t *pThis)
{
+ DEFiRet;
+ /* original condition
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0);
+ * TODO: remove when verified! -- rgerhards, 2009-05-26
+ */
+ if(pThis->bEnqOnly || pThis->bRunsDA)
+ iRet = RS_RET_TERMINATE_NOW;
+ else if(pThis->pqParent != NULL)
+ iRet = RS_RET_TERMINATE_WHEN_IDLE;
+
+ RETiRet;
}
@@ -1881,7 +1902,6 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal)
}
-/* common function for the idle functions that deletes the last batch if TODO MULTI */
/* must only be called when the queue mutex is locked, else results
* are not stable! DA queue version
*/
@@ -1890,7 +1910,7 @@ qqueueIsIdleDA(qqueue_t *pThis)
{
/* remember: iQueueSize is the DA queue size, not the main queue! */
/* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */
- return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk));
}
/* must only be called when the queue mutex is locked, else results
* are not stable! Regular queue version
@@ -1905,7 +1925,7 @@ IsIdleReg(qqueue_t *pThis)
return ret;
#else
/* regular code! */
- return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk));
+ return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk));
#endif
}
@@ -2176,19 +2196,62 @@ finalize_it:
}
+/* persist a queue with all data elements to disk - this is used to handle
+ * bSaveOnShutdown. We utilize the DA worker to do this. This must only
+ * be called after all workers have been shut down and if bSaveOnShutdown
+ * is actually set. Note that this function may potentially run long,
+ * depending on the queue configuration (e.g. store on remote machine).
+ * rgerhards, 2009-05-26
+ */
+static inline rsRetVal
+DoSaveOnShutdown(qqueue_t *pThis)
+{
+ struct timespec tTimeout;
+ rsRetVal iRetLocal;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+
+ qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
+ /* make sure we do not timeout before we are done */
+ dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n");
+ timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
+ /* and run the primary queue's DA worker to drain the queue */
+ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
+ dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n",
+ iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+ if(iRetLocal != RS_RET_OK) {
+ dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, "
+ "continuing, but results are unpredictable\n", iRetLocal);
+ }
+
+ RETiRet;
+}
+
+
/* destructor for the queue object */
BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(qqueue)
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
- /* shut down all workers (handles *all* of the persistence logic)
- * See function head comment of queueShutdownWorkers () on why we don't call it
- * We also do not need to shutdown workers when we are in enqueue-only mode or we are a
+ /* shut down all workers
+ * We do not need to shutdown workers when we are in enqueue-only mode or we are a
* direct queue - because in both cases we have none... ;)
* with a child! -- rgerhards, 2008-01-28
*/
if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL)
- qqueueShutdownWorkers(pThis);
+ ShutdownWorkers(pThis);
+
+ /* now all workers are terminated. Messages may exist. Also, some logically dequeued
+ * messages may never have been processed because their worker was terminated. So
+ * we need to reset the logical dequeue pointer, persist the queue if configured to do
+ * so and then destruct everything. -- rgerhards, 2009-05-26
+ */
+ CHKiRet(pThis->qUnDeqAll(pThis));
+
+ if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
+ CHKiRet(DoSaveOnShutdown(pThis));
+ }
/* finally destruct our (regular) worker thread pool
* Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,
diff --git a/runtime/queue.h b/runtime/queue.h
index 954a7fd4..c1fe597d 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -123,6 +123,7 @@ typedef struct queue_s {
rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr);
rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
rsRetVal (*qDel)(struct queue_s *pThis);
+ rsRetVal (*qUnDeqAll)(struct queue_s *pThis);
/* end type-specific handler */
/* synchronization variables */
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 0c671f03..a43c0327 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -298,7 +298,8 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */
RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */
RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */
- RS_RET_IDLE = 4 /**< operation successful, but callee is idle (e.g. because queue is empty) */
+ RS_RET_IDLE = 4, /**< operation successful, but callee is idle (e.g. because queue is empty) */
+ RS_RET_TERMINATE_WHEN_IDLE = 5 /**< operation successful, function is requested to terminate when idle */
};
/* some helpful macros to work with srRetVals.
diff --git a/runtime/stream.c b/runtime/stream.c
index 50d419be..59e8be3a 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -45,6 +45,7 @@
#include "srUtils.h"
#include "obj.h"
#include "stream.h"
+#include "unicode-helper.h"
/* static data */
DEFobjStaticHelpers
@@ -80,7 +81,7 @@ static rsRetVal strmOpenFile(strm_t *pThis)
pThis->pszFName, pThis->lenFName, pThis->iCurrFNum, pThis->iFileNumDigits));
} else {
if(pThis->pszDir == NULL) {
- if((pThis->pszCurrFName = (uchar*) strdup((char*) pThis->pszFName)) == NULL)
+ if((pThis->pszCurrFName = ustrdup(pThis->pszFName)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
} else {
CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir,
@@ -814,6 +815,47 @@ finalize_it:
}
+/* duplicate a stream object excluding dynamic properties. This function is
+ * primarily meant to provide a duplicate that later on can be used to access
+ * the data. This is needed, for example, for a restart of the disk queue.
+ * Note that ConstructFinalize() is NOT called. So our caller may change some
+ * properties before finalizing things.
+ * rgerhards, 2009-05-26
+ */
+rsRetVal
+strmDup(strm_t *pThis, strm_t **ppNew)
+{
+ strm_t *pNew = NULL;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+ assert(ppNew != NULL);
+
+ CHKiRet(strmConstruct(&pNew));
+ pNew->sType = pThis->sType;
+ pNew->iCurrFNum = pThis->iCurrFNum;
+ CHKmalloc(pNew->pszFName = ustrdup(pThis->pszFName));
+ pNew->lenFName = pThis->lenFName;
+ CHKmalloc(pNew->pszDir = ustrdup(pThis->pszDir));
+ pNew->lenDir = pThis->lenDir;
+ pNew->tOperationsMode = pThis->tOperationsMode;
+ pNew->tOpenMode = pThis->tOpenMode;
+ pNew->iAddtlOpenFlags = pThis->iAddtlOpenFlags;
+ pNew->iMaxFileSize = pThis->iMaxFileSize;
+ pNew->iMaxFiles = pThis->iMaxFiles;
+ pNew->iFileNumDigits = pThis->iFileNumDigits;
+ pNew->bDeleteOnClose = pThis->bDeleteOnClose;
+ pNew->iCurrOffs = pThis->iCurrOffs;
+
+ *ppNew = pNew;
+ pNew = NULL;
+
+finalize_it:
+ if(pNew != NULL)
+ strmDestruct(&pNew);
+
+ RETiRet;
+}
/* set a user write-counter. This counter is initialized to zero and
* receives the number of bytes written. It is accurate only after a
diff --git a/runtime/stream.h b/runtime/stream.h
index 371358ab..e5d05b55 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -119,6 +119,7 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm);
rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal);
rsRetVal strmGetCurrOffset(strm_t *pThis, int64 *pOffs);
rsRetVal strmSetWCntr(strm_t *pThis, number_t *pWCnt);
+rsRetVal strmDup(strm_t *pThis, strm_t **ppNew);
PROTOTYPEObjClassInit(strm);
PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int);
PROTOTYPEpropSetMeth(strm, iMaxFileSize, int);
diff --git a/runtime/wti.c b/runtime/wti.c
index 1be008df..3b6bf1b9 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -380,6 +380,7 @@ wtiWorker(wti_t *pThis)
wtp_t *pWtp; /* our worker thread pool */
int bInactivityTOOccured = 0;
rsRetVal localRet;
+ rsRetVal terminateRet;
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
@@ -406,15 +407,21 @@ wtiWorker(wti_t *pThis)
wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
- /* first check if we are in shutdown process */
- if(wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
- break; /* end worker thread run */
+ /* first check if we are in shutdown process (but evaluate a bit later) */
+ terminateRet = wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED);
+ if(terminateRet == RS_RET_TERMINATE_NOW) {
+ // TODO: we need to free the old batch! -- rgerhards, 2009-05-26 MULTI
+ break;
}
/* try to execute and process whatever we have */
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis, iCancelStateSave);
if(localRet == RS_RET_IDLE) {
+ if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE) {
+ break; /* end of loop */
+ }
+
if(bInactivityTOOccured) {
/* we had an inactivity timeout in the last run and are still idle, so it is time to exit... */
break; /* end worker thread run */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 40a9095b..41fcd8d9 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -261,16 +261,19 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
ISOBJ_TYPE_assert(pThis, wtp);
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
- || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, pThis)))
- iRet = RS_RET_TERMINATE_NOW;
+ if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
+ ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
+ } else if(pThis->wtpState == wtpState_SHUTDOWN) {
+ ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE);
+ }
/* try customer handler if one was set and we do not yet have a definite result */
- if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) {
+ if(pThis->pfChkStopWrkr != NULL) {
iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
}
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+finalize_it:
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
}
diff --git a/tests/DiagTalker.java b/tests/DiagTalker.java
index e33a5867..85a6671e 100644
--- a/tests/DiagTalker.java
+++ b/tests/DiagTalker.java
@@ -1,3 +1,24 @@
+/* A yet very simple tool to talk to imdiag.
+ *
+ * Copyright 2009 Rainer Gerhards and Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Rsyslog is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Rsyslog is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * A copy of the GPL can be found in the file "COPYING" in this distribution.
+ */
//package com.rsyslog.diag;
import java.io.*;
import java.net.*;
@@ -29,9 +50,14 @@ public class DiagTalker {
new InputStreamReader(System.in));
String userInput;
- while ((userInput = stdIn.readLine()) != null) {
- out.println(userInput);
- System.out.println("imdiag returns: " + in.readLine());
+ try {
+ while ((userInput = stdIn.readLine()) != null) {
+ out.println(userInput);
+ System.out.println("imdiag returns: " + in.readLine());
+ }
+ } catch (SocketException e) {
+ System.err.println("We had a socket exception and consider this to be OK: "
+ + e.getMessage());
}
out.close();
diff --git a/tests/arrayqueue.sh b/tests/arrayqueue.sh
index 5b8ebb5f..7791ed57 100755
--- a/tests/arrayqueue.sh
+++ b/tests/arrayqueue.sh
@@ -17,7 +17,7 @@ sleep 4 # we need this so that rsyslogd can receive all outstanding messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 39999
+./chkseq -fwork -e 39999
if [ "$?" -ne "0" ]; then
# rm -f work rsyslog.out.log
echo "sequence error detected"
diff --git a/tests/chkseq.c b/tests/chkseq.c
index 3203c250..5ffe855c 100644
--- a/tests/chkseq.c
+++ b/tests/chkseq.c
@@ -3,9 +3,10 @@
* be set.
*
* Params
- * argv[1] file to check
- * argv[2] start number
- * argv[3] end number
+ * -f<filename> MUST be given!
+ * -s<starting number> -e<ending number>
+ * default for s is 0. -e should be given (else it is also 0)
+ * -d may be specified, in which case duplicate messages are permitted.
*
* Part of the testbench for rsyslog.
*
@@ -31,6 +32,7 @@
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
+#include <getopt.h>
int main(int argc, char *argv[])
{
@@ -38,16 +40,36 @@ int main(int argc, char *argv[])
int val;
int i;
int ret = 0;
- int start, end;
+ int dupsPermitted = 0;
+ int start = 0, end = 0;
+ int opt;
+ int nDups = 0;
+ char *file = NULL;
- if(argc != 4) {
- printf("Invalid call of chkseq\n");
- printf("Usage: chkseq file start end\n");
+ while((opt = getopt(argc, argv, "e:f:ds:")) != EOF) {
+ switch((char)opt) {
+ case 'f':
+ file = optarg;
+ break;
+ case 'd':
+ dupsPermitted = 1;
+ break;
+ case 'e':
+ end = atoi(optarg);
+ break;
+ case 's':
+ start = atoi(optarg);
+ break;
+ default:printf("Invalid call of chkseq\n");
+ printf("Usage: chkseq file -sstart -eend -d\n");
+ exit(1);
+ }
+ }
+
+ if(file == NULL) {
+ printf("file must be given!\n");
exit(1);
}
-
- start = atoi(argv[2]);
- end = atoi(argv[3]);
if(start > end) {
printf("start must be less than or equal end!\n");
@@ -55,22 +77,35 @@ int main(int argc, char *argv[])
}
/* read file */
- fp = fopen(argv[1], "r");
+ fp = fopen(file, "r");
if(fp == NULL) {
perror(argv[1]);
exit(1);
}
- for(i = start ; i < end ; ++i) {
+ for(i = start ; i < end+1 ; ++i) {
if(fscanf(fp, "%d\n", &val) != 1) {
printf("scanf error in index i=%d\n", i);
exit(1);
}
if(val != i) {
- printf("read value %d, but expected value %d\n", val, i);
- exit(1);
+ if(val == i - 1 && dupsPermitted) {
+ --i;
+ ++nDups;
+ } else {
+ printf("read value %d, but expected value %d\n", val, i);
+ exit(1);
+ }
}
}
+ if(nDups != 0)
+ printf("info: had %d duplicates (this is no error)\n", nDups);
+
+ if(i - 1 != end) {
+ printf("only %d records in file, expected %d\n", i - 1, end);
+ exit(1);
+ }
+
exit(ret);
}
diff --git a/tests/da-mainmsg-q.sh b/tests/da-mainmsg-q.sh
index 91addf68..fde9e06e 100755
--- a/tests/da-mainmsg-q.sh
+++ b/tests/da-mainmsg-q.sh
@@ -52,7 +52,7 @@ sleep 1 # we need this so that rsyslogd can receive all outstanding messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 20099
+./chkseq -fwork -e20099
if [ "$?" -ne "0" ]; then
# rm -f work rsyslog.out.log
echo "sequence error detected"
diff --git a/tests/diskqueue.sh b/tests/diskqueue.sh
index 20767a90..42018b15 100755
--- a/tests/diskqueue.sh
+++ b/tests/diskqueue.sh
@@ -26,7 +26,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 19999
+./chkseq -fwork -e19999
if [ "$?" -ne "0" ]; then
# rm -f work rsyslog.out.log
echo "sequence error detected"
diff --git a/tests/imtcp-multiport.sh b/tests/imtcp-multiport.sh
index 17480dae..73ab9558 100755
--- a/tests/imtcp-multiport.sh
+++ b/tests/imtcp-multiport.sh
@@ -21,7 +21,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 9999
+./chkseq -fwork -e9999
if [ "$?" -ne "0" ]; then
# rm -f work rsyslog.out.log
echo "sequence error detected"
@@ -46,7 +46,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 9999
+./chkseq -fwork -e9999
if [ "$?" -ne "0" ]; then
# rm -f work rsyslog.out.log
echo "sequence error detected"
@@ -71,7 +71,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 9999
+./chkseq -fwork -e9999
if [ "$?" -ne "0" ]; then
# rm -f work rsyslog.out.log
echo "sequence error detected"
diff --git a/tests/linkedlistqueue.sh b/tests/linkedlistqueue.sh
index aac1abb6..aa574bd1 100755
--- a/tests/linkedlistqueue.sh
+++ b/tests/linkedlistqueue.sh
@@ -17,7 +17,7 @@ sleep 4 # we need this so that rsyslogd can receive all outstanding messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 39999
+./chkseq -fwork -e39999
if [ "$?" -ne "0" ]; then
# rm -f work rsyslog.out.log
echo "sequence error detected"
diff --git a/tests/manytcp.sh b/tests/manytcp.sh
index 06bd38b6..861f12ff 100755
--- a/tests/manytcp.sh
+++ b/tests/manytcp.sh
@@ -12,7 +12,7 @@ $srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 39999
+./chkseq -fwork -e39999
if [ "$?" -ne "0" ]; then
rm -f work rsyslog.out.log
echo "sequence error detected"
diff --git a/tests/memq-persist.sh b/tests/memq-persist.sh
index 108cba57..e935d8db 100755
--- a/tests/memq-persist.sh
+++ b/tests/memq-persist.sh
@@ -10,9 +10,11 @@
#export RSYSLOG_DEBUGLOG="log"
echo testing memory queue persisting to disk
$srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
+rm -f core.*
rm -rf test-spool
mkdir test-spool
rm -f work rsyslog.out.log rsyslog.out.log.save # work files
+#valgrind ../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/memq-persist1.conf &
../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/memq-persist1.conf &
sleep 1
echo "rsyslogd started with pid " `cat rsyslog.pid`
@@ -22,20 +24,22 @@ if [ "$?" -ne "0" ]; then
echo "error during tcpflood! see rsyslog.out.log.save for what was written"
cp rsyslog.out.log rsyslog.out.log.save
fi
-sleep 3 # we need to wait to ensure everything is received (less 1 second would be better)
+sleep 4 # we need to wait to ensure everything is received (less 1 second would be better)
kill `cat rsyslog.pid`
-sleep 5 # wait for engine to terminate
+echo wait for shutdown
+$srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
echo There must exist some files now:
ls -l test-spool
# restart engine and have rest processed
../tools/rsyslogd -c4 -u2 -n -irsyslog.pid -M../runtime/.libs:../.libs -f$srcdir/testsuites/memq-persist2.conf &
+sleep 1
$srcdir/waitqueueempty.sh # wait until rsyslogd is done processing messages
kill `cat rsyslog.pid`
rm -f work
sort < rsyslog.out.log > work
-./chkseq work 0 9999
+./chkseq -fwork -e9999 -d
if [ "$?" -ne "0" ]; then
- # rm -f work rsyslog.out.log
+ rm -f work rsyslog.out.log
echo "sequence error detected"
exit 1
fi
diff --git a/tests/rscript.c b/tests/rscript.c
index ce81491c..6361aec4 100644
--- a/tests/rscript.c
+++ b/tests/rscript.c
@@ -104,8 +104,8 @@ PerformTest(cstr_t *pstrIn, rsRetVal iRetExpected, cstr_t *pstrOut)
if(strcmp((char*)rsCStrGetSzStr(pstrPrg), (char*)rsCStrGetSzStr(pstrOut))) {
printf("error: compiled program different from expected result!\n");
- printf("generated vmprg (%d bytes):\n%s\n", strlen((char*)rsCStrGetSzStr(pstrPrg)), rsCStrGetSzStr(pstrPrg));
- printf("expected (%d bytes):\n%s\n", strlen((char*)rsCStrGetSzStr(pstrOut)), rsCStrGetSzStr(pstrOut));
+ printf("generated vmprg (%d bytes):\n%s\n", (int)strlen((char*)rsCStrGetSzStr(pstrPrg)), rsCStrGetSzStr(pstrPrg));
+ printf("expected (%d bytes):\n%s\n", (int)strlen((char*)rsCStrGetSzStr(pstrOut)), rsCStrGetSzStr(pstrOut));
ABORT_FINALIZE(RS_RET_ERR);
}