summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c22
1 files changed, 18 insertions, 4 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index e4922f37..ef6e843b 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -246,6 +246,7 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis)
if(!pThis->bEnqOnly) {
if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) {
+ DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n");
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
} else {
if(getLogicalQueueSize(pThis) == 0) {
@@ -841,6 +842,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
batch_t singleBatch;
batch_obj_t batchObj;
+ int i;
DEFiRet;
//TODO: init batchObj (states _OK and new fields -- CHECK)
@@ -862,6 +864,10 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate);
+ /* delete the batch string params: TODO: create its own "class" for this */
+ for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
+ free(batchObj.staticActStrings[i]);
+ }
objDestruct(pUsr);
RETiRet;
@@ -1211,7 +1217,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
/* set some water marks so that we have useful defaults if none are set specifically */
pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */
pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */
-
pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir);
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
@@ -1497,7 +1502,7 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
* now that we dequeue batches of pointers, this is much less an issue...
* rgerhards, 2009-04-22
*/
- if(iQueueSize < pThis->iFullDlyMrk / 2) {
+ if(iQueueSize < pThis->iFullDlyMrk / 2 || glbl.GetGlobalInputTermState() == 1) {
pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk);
}
@@ -1819,6 +1824,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
uchar pszBuf[64];
+ int wrk;
uchar *qName;
size_t lenBuf;
@@ -1841,7 +1847,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
}
pthread_mutex_init(&pThis->mutThrdMgmt, NULL);
- pthread_cond_init (&pThis->condDAReady, NULL);
pthread_cond_init (&pThis->notFull, NULL);
pthread_cond_init (&pThis->notEmpty, NULL);
pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL);
@@ -1850,6 +1855,16 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
+ /* re-adjust some params if required */
+ if(pThis->bIsDA) {
+ /* if we are in DA mode, we must make sure full delayable messages do not
+ * initiate going to disk!
+ */
+ wrk = pThis->iHighWtrMrk - (pThis->iHighWtrMrk / 100) * 50; /* 50% of high water mark */
+ if(wrk < pThis->iFullDlyMrk)
+ pThis->iFullDlyMrk = wrk;
+ }
+
DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, "
"full delay %d, light delay %d, deq batch size %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
@@ -2129,7 +2144,6 @@ CODESTARTobjDestruct(qqueue)
free(pThis->mut);
}
pthread_mutex_destroy(&pThis->mutThrdMgmt);
- pthread_cond_destroy(&pThis->condDAReady);
pthread_cond_destroy(&pThis->notFull);
pthread_cond_destroy(&pThis->notEmpty);
pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);