/* queue.c
*
* This file implements the queue object and its several queueing methods.
*
* File begun on 2008-01-03 by RGerhards
*
* There is some in-depth documentation available in doc/dev_queue.html
* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
* if you are getting aquainted to the object.
*
* Copyright 2008 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
* The rsyslog runtime library is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* The rsyslog runtime library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
*
* A copy of the GPL can be found in the file "COPYING" in this distribution.
* A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
*/
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <signal.h>
#include <pthread.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h> /* required for HP UX */
#include <time.h>
#include <errno.h>
#include "rsyslog.h"
#include "syslogd.h"
#include "queue.h"
#include "stringbuf.h"
#include "srUtils.h"
#include "obj.h"
#include "wtp.h"
#include "wti.h"
/* static data */
DEFobjStaticHelpers
/* forward-definitions */
rsRetVal queueChkPersist(queue_t *pThis);
static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex);
static rsRetVal queueRateLimiter(queue_t *pThis);
static int queueChkStopWrkrDA(queue_t *pThis);
static int queueIsIdleDA(queue_t *pThis);
static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave);
static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2);
static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
#define QUEUE_NO_CHECKPOINT 0
/* methods */
/* get the overall queue size, which includes ungotten objects. Must only be called
* while mutex is locked!
* rgerhards, 2008-01-29
*/
static inline int
queueGetOverallQueueSize(queue_t *pThis)
{
#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */
BEGINfunc
dbgoprint((obj_t*) pThis, "queue size: %d (regular %d, ungotten %d)\n",
pThis->iQueueSize + pThis->iUngottenObjs, pThis->iQueueSize, pThis->iUngottenObjs);
ENDfunc
#endif
return pThis->iQueueSize + pThis->iUngottenObjs;
}
/* --------------- code for disk-assisted (DA) queue modes -------------------- */
/* returns the number of workers that should be advised at
* this point in time. The mutex must be locked when
* ths function is called. -- rgerhards, 2008-01-25
*/
static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis)
{
DEFiRet;
int iMaxWorkers;
ISOBJ_TYPE_assert(pThis, queue);
if(!pThis->bEnqOnly) {
if(pThis->bRunsDA) {
/* if we have not yet reached the high water mark, there is no need to start a
* worker. -- rgerhards, 2008-01-26
*/
if(queueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
}
} else {
if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
iMaxWorkers = 1;
} else {
iMaxWorkers = queueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
}
wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
}
}
RETiRet;
}
/* wait until we have a fully initialized DA queue. Sometimes, we need to
* sync with it, as we expect it for some function.
* rgerhards, 2008-02-27
*/
static rsRetVal
queueWaitDAModeInitialized(queue_t *pThis)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, queue);
ASSERT(pThis->bRunsDA);
while(pThis->bRunsDA != 2) {
d_pthread_cond_wait(&pThis->condDAReady, pThis->mut);
}
RETiRet;
}
/* Destruct DA queue. This is the last part of DA-to-normal-mode
* transistion. This is called asynchronously and some time quite a
* while after the actual transistion. The key point is that we need to
* do it at some later time, because we need to destruct the DA queue. That,
* however, can not be done in a thread that has been signalled
* This is to be called when we revert back to our own queue.
* This function must be called with the queue mutex locked (the wti
* class ensures this).
* rgerhards, 2008-01-15
*/
static rsRetVal
queueTurnOffDAMode(queue_t *pThis)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, queue);
ASSERT(pThis->bRunsDA);
/* at this point, we need a fully initialized DA queue. So if it isn't, we finally need
* to wait for its startup... -- rgerhards, 2008-01-25
*/
queueWaitDAModeInitialized(pThis);
/* if we need to pull any data that we still need from the (child) disk queue,
* now would be the time to do so. At present, we do not need this, but I'd like to
* keep that comment if future need arises.
*/
/* we need to check if the DA queue is empty because the DA worker may simply have
* terminated do to no new messages arriving. That does not, however, mean that the
* DA queue is empty. If there is still data in that queue, we do nothing and leave
* that for a later incarnation of this function (it will be called multiple times
* during the lifetime of DA-mode, depending on how often the DA worker receives an
* inactivity timeout. -- rgerhards, 2008-01-25
*/
if(pThis->pqDA->iQueueSize == 0) {
pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
* this will be quick.
*/
queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
iRet);
/* now we need to check if the regular queue has some messages. This may be the case
* when it is waiting that the high water mark is reached again. If so, we need to start up
* a regular worker. -- rgerhards, 2008-01-26
*/
if(queueGetOverallQueueSize(pThis) > 0) {
queueAdviseMaxWorkers(pThis);
}
}
/* TODO: we have a *really biiiiig* memory leak here: if the queue could not be persisted, all of
* its data elements are still in memory. That doesn't really matter if we are terminated, but on
* HUP this memory leaks. We MUST add a loop of destructor calls here. However, this takes time
* (possibly a lot), so it is probably best to have a config variable for that.
* Something for 3.11.1!
* rgerhards, 2008-01-30
*/
RETiRet;
}
/* check if we run in disk-assisted mode and record that
* setting for easy (and quick!) access in the future. This
* function must only be called from constructors and only
* from those that support disk-assisted modes (aka memory-
* based queue drivers).
* rgerhards, 2008-01-14
*/
static rsRetVal
queueChkIsDA(queue_t *pThis)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, queue);
if(pThis->pszFilePrefix != NULL) {
pThis->bIsDA = 1;
dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n");
} else {
dbgoprint((obj_t*) pThis, "is NOT disk-assisted\n");
}
RETiRet;
}
/* Start disk-assisted queue mode. All internal settings are changed. This is supposed
* to be called from the DA worker, which must have been started before. The most important
* chore of this function is to create the DA queue object. If that function fails,
* the DA worker should return with an appropriate state, which in turn should lead to
* a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this
* function is called, else a number of races will happen.
* Please note that this function may be called *while* we in DA mode. This is due to the
* fact that the DA worker calls it and the DA worker may be suspended (and restarted) due
* to inactivity timeouts.
* rgerhards, 2008-01-15
*/
static rsRetVal
queueStartDA(queue_t *pThis)
{
DEFiRet;
uchar pszDAQName[128];
ISOBJ_TYPE_assert(pThis, queue);
if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */
FINALIZE; /* ... then we are already done! */
/* create message queue */
CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer));
/* give it a name */
snprintf((char*) pszDAQName, sizeof(pszDAQName)/sizeof(uchar), "%s[DA]", obj.GetName((obj_t*) pThis));
obj.SetName((obj_t*) pThis->pqDA, pszDAQName);
/* as the created queue is the same object class, we take the
* liberty to access its properties directly.
*/
pThis->pqDA->pqParent = pThis;
CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr));
CHKiRet(queueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
if(pThis->toQShutdown == 0) {
CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
} else {
/* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND
|