summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2008-01-17 12:45:10 +0000
committerRainer Gerhards <rgerhards@adiscon.com>2008-01-17 12:45:10 +0000
commited0363210c34002e5cfbab553506573f5b8a13a5 (patch)
tree518ce37551ddb1803b3f8d0ced8599b8b04cb984
parent6b8b242250123d6c3105b48cde831ef749c88647 (diff)
downloadrsyslog-ed0363210c34002e5cfbab553506573f5b8a13a5.tar.gz
rsyslog-ed0363210c34002e5cfbab553506573f5b8a13a5.tar.xz
rsyslog-ed0363210c34002e5cfbab553506573f5b8a13a5.zip
worked on threading
-rw-r--r--action.c2
-rw-r--r--iminternal.c2
-rw-r--r--msg.c13
-rw-r--r--msg.h2
-rw-r--r--obj.h2
-rw-r--r--queue.c381
-rw-r--r--queue.h32
-rw-r--r--rsyslog.h2
-rw-r--r--stream.c6
-rw-r--r--stream.h2
-rw-r--r--syslogd.c16
11 files changed, 348 insertions, 112 deletions
diff --git a/action.c b/action.c
index 206afe29..04e72a94 100644
--- a/action.c
+++ b/action.c
@@ -52,7 +52,7 @@ rsRetVal actionDestruct(action_t *pThis)
pThis->pMod->freeInstance(pThis->pModData);
if(pThis->f_pMsg != NULL)
- MsgDestruct(pThis->f_pMsg);
+ MsgDestruct(&pThis->f_pMsg);
SYNC_OBJ_TOOL_EXIT(pThis);
if(pThis->ppTpl != NULL)
diff --git a/iminternal.c b/iminternal.c
index a45fd52e..86d5097c 100644
--- a/iminternal.c
+++ b/iminternal.c
@@ -49,7 +49,7 @@ static rsRetVal iminternalDestruct(iminternal_t *pThis)
assert(pThis != NULL);
if(pThis->pMsg != NULL)
- MsgDestruct(pThis->pMsg);
+ MsgDestruct(&pThis->pMsg);
free(pThis);
diff --git a/msg.c b/msg.c
index 948274bf..0961afd4 100644
--- a/msg.c
+++ b/msg.c
@@ -232,8 +232,12 @@ finalize_it:
/* Destructor for a msg "object". Must be called to dispose
* of a msg object.
*/
-rsRetVal MsgDestruct(msg_t * pM)
-{
+rsRetVal MsgDestruct(msg_t **ppM)
+{
+ msg_t *pM;
+
+ assert(ppM != NULL);
+ pM = *ppM;
assert(pM != NULL);
/* DEV Debugging only ! dbgprintf("MsgDestruct\t0x%lx, Ref now: %d\n", (unsigned long)pM, pM->iRefCount - 1); */
if(--pM->iRefCount == 0)
@@ -289,6 +293,7 @@ rsRetVal MsgDestruct(msg_t * pM)
rsCStrDestruct(pM->pCSMSGID);
funcDeleteMutex(pM);
free(pM);
+ *ppM = NULL;
}
return RS_RET_OK;
@@ -302,7 +307,7 @@ rsRetVal MsgDestruct(msg_t * pM)
#define tmpCOPYSZ(name) \
if(pOld->psz##name != NULL) { \
if((pNew->psz##name = srUtilStrDup(pOld->psz##name, pOld->iLen##name)) == NULL) {\
- MsgDestruct(pNew);\
+ MsgDestruct(&pNew);\
return NULL;\
}\
pNew->iLen##name = pOld->iLen##name;\
@@ -315,7 +320,7 @@ rsRetVal MsgDestruct(msg_t * pM)
#define tmpCOPYCSTR(name) \
if(pOld->pCS##name != NULL) {\
if(rsCStrConstructFromCStr(&(pNew->pCS##name), pOld->pCS##name) != RS_RET_OK) {\
- MsgDestruct(pNew);\
+ MsgDestruct(&pNew);\
return NULL;\
}\
}
diff --git a/msg.h b/msg.h
index bd8d4f89..842a9349 100644
--- a/msg.h
+++ b/msg.h
@@ -109,7 +109,7 @@ typedef struct msg msg_t; /* new name */
PROTOTYPEObjClassInit(Msg);
char* getProgramName(msg_t*);
rsRetVal MsgConstruct(msg_t **ppThis);
-rsRetVal MsgDestruct(msg_t * pM);
+rsRetVal MsgDestruct(msg_t **ppM);
msg_t* MsgDup(msg_t* pOld);
msg_t *MsgAddRef(msg_t *pM);
void setProtocolVersion(msg_t *pM, int iNewVersion);
diff --git a/obj.h b/obj.h
index f4a9aee6..6b965e10 100644
--- a/obj.h
+++ b/obj.h
@@ -78,7 +78,7 @@
#else
# define objConstructSetObjInfo(pThis) ((obj_t*) (pThis))->pObjInfo = pObjInfoOBJ
#endif
-#define objDestruct(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_DESTRUCT])(pThis)
+#define objDestruct(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_DESTRUCT])(&pThis)
#define objSerialize(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_SERIALIZE])
#define objGetSeverity(pThis, piSever) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_GETSEVERITY])(pThis, piSever)
diff --git a/queue.c b/queue.c
index 9003b344..2b241d82 100644
--- a/queue.c
+++ b/queue.c
@@ -1,6 +1,6 @@
- // DA-input only
+// TODO: start up the correct num of workers when switching to non-DA mode
// TODO: "preforked" worker threads
-// TODO: do an if(debug) in dbgrintf - performanc ein release build!
+// TODO: do an if(debug) in dbgrintf - performance in release build!
// TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in
// call consumer state. Facilitates retaining messages in queue until action could
// be called!
@@ -56,9 +56,61 @@ rsRetVal queueChkPersist(queue_t *pThis);
static void *queueWorker(void *arg);
static rsRetVal queueGetQueueSize(queue_t *pThis, int *piQueueSize);
static rsRetVal queueChkWrkThrdChanges(queue_t *pThis);
+static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly);
/* methods */
+/* get the current worker state. For simplicity and speed, we have
+ * NOT used our regular calling interface this time. I hope that won't
+ * bite in the long term... -- rgerhards, 2008-01-17
+ */
+static inline qWrkCmd_t
+qWrkrGetState(qWrkThrd_t *pThis)
+{
+ assert(pThis != NULL);
+ return pThis->tCurrCmd;
+}
+
+
+/* send a command to a specific thread
+ */
+static rsRetVal
+qWrkrSetState(qWrkThrd_t *pThis, qWrkCmd_t tCmd)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+ dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis->pQueue), tCmd, pThis->iThrd);
+
+ /* change some admin structures */
+ switch(tCmd) {
+ case eWRKTHRD_TERMINATING:
+ pthread_cond_destroy(&pThis->condInitDone);
+ dbgprintf("Queue 0x%lx/w%d: thread terminating with %d entries left in queue, %d workers running.\n",
+ queueGetID(pThis->pQueue), pThis->iThrd, pThis->pQueue->iQueueSize,
+ pThis->pQueue->iCurNumWrkThrd);
+ break;
+ case eWRKTHRD_RUN_CREATED:
+ pthread_cond_init(&pThis->condInitDone, NULL);
+ break;
+ case eWRKTHRD_RUN_INIT:
+ break;
+ case eWRKTHRD_RUNNING:
+ pthread_cond_signal(&pThis->condInitDone);
+ break;
+ /* these cases just to satisfy the compiler, we do (yet) not act an them: */
+ case eWRKTHRD_STOPPED:
+ case eWRKTHRD_SHUTDOWN:
+ case eWRKTHRD_SHUTDOWN_IMMEDIATE:
+ /* DO NOTHING */
+ break;
+ }
+
+ pThis->tCurrCmd = tCmd;
+
+ return iRet;
+}
+
/* send a command to a specific active thread. If the thread is not
* active, the command is not sent.
*/
@@ -70,9 +122,9 @@ queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
ISOBJ_TYPE_assert(pThis, queue);
assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads);
- if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_INIT) {
+ if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_CREATED) {
dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx);
- pThis->pWrkThrds[iIdx].tCurrCmd = tCmd;
+ qWrkrSetState(&pThis->pWrkThrds[iIdx], tCmd);
} else {
dbgprintf("Queue 0x%lx: command %d NOT sent to inactive thread %d\n", queueGetID(pThis), tCmd, iIdx);
}
@@ -80,21 +132,60 @@ queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
return iRet;
}
-/* send a command to a specific thread
- * TODO: check if we can run into trouble with inactive threads
+
+/* Finalize construction of a wWrkrThrd_t "object"
+ * rgerhards, 2008-01-17
*/
static inline rsRetVal
-queueTellWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd)
+qWrkrConstructFinalize(qWrkThrd_t *pThis, queue_t *pQueue, int i)
{
- DEFiRet;
+ assert(pThis != NULL);
+ ISOBJ_TYPE_assert(pQueue, queue);
- dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx);
- ISOBJ_TYPE_assert(pThis, queue);
- assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads);
+ dbgprintf("Queue 0x%lx: finalizing construction of worker %d instance data\n", queueGetID(pQueue), i);
- pThis->pWrkThrds[iIdx].tCurrCmd = tCmd;
+ /* initialize our thread instance descriptor */
+ pThis = pQueue->pWrkThrds + i;
+ pThis->pQueue = pQueue;
+ pThis->iThrd = i;
+ pThis->pUsr = NULL;
- return iRet;
+ qWrkrSetState(pThis, eWRKTHRD_STOPPED);
+
+ return RS_RET_OK;
+}
+
+
+/* initialize the qWrkThrd_t structure - this MUST be called right after
+ * startup of a worker thread. -- rgerhards, 2008-01-17
+ */
+static inline rsRetVal
+qWrkrInit(qWrkThrd_t **ppThis, queue_t *pQueue)
+{
+ qWrkThrd_t *pThis;
+ int i;
+
+ assert(ppThis != NULL);
+ ISOBJ_TYPE_assert(pQueue, queue);
+
+ /* find myself in the queue's thread table */
+ for(i = 0 ; i <= pQueue->iNumWorkerThreads ; ++i)
+ if(pQueue->pWrkThrds[i].thrdID == pthread_self())
+ break;
+dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pQueue,
+ (unsigned) pQueue->pWrkThrds[i].thrdID, i, (unsigned) pthread_self());
+ assert(pQueue->pWrkThrds[i].thrdID == pthread_self());
+
+ /* initialize our thread instance descriptor */
+ pThis = pQueue->pWrkThrds + i;
+ pThis->pQueue = pQueue;
+ pThis->iThrd = i;
+ pThis->pUsr = NULL;
+
+ *ppThis = pThis;
+ qWrkrSetState(pThis, eWRKTHRD_RUN_INIT);
+
+ return RS_RET_OK;
}
@@ -112,9 +203,9 @@ queueJoinWrkThrd(queue_t *pThis, int iIdx)
dbgprintf("Queue 0x%lx: thread %d state %d, waiting for exit\n", queueGetID(pThis), iIdx,
pThis->pWrkThrds[iIdx].tCurrCmd);
pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL);
- pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRD_STOPPED; /* back to virgin... */
+ qWrkrSetState(&pThis->pWrkThrds[iIdx], eWRKTHRD_STOPPED); /* back to virgin... */
pThis->pWrkThrds[iIdx].thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
- dbgprintf("Queue 0x%lx: thread %d state %d, has exited\n", queueGetID(pThis), iIdx,
+ dbgprintf("Queue 0x%lx: thread %d state %d, has stopped\n", queueGetID(pThis), iIdx,
pThis->pWrkThrds[iIdx].tCurrCmd);
return iRet;
@@ -131,9 +222,9 @@ queueStrtWrkThrd(queue_t *pThis, int i)
ISOBJ_TYPE_assert(pThis, queue);
assert(i >= 0 && i <= pThis->iNumWorkerThreads);
- assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRD_RUN_INIT);
+ assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRD_RUN_CREATED);
- queueTellWrkThrd(pThis, i, eWRKTHRD_RUN_INIT);
+ qWrkrSetState(&pThis->pWrkThrds[i], eWRKTHRD_RUN_CREATED);
iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
dbgprintf("Queue 0x%lx: starting Worker thread %x, index %d with state %d.\n",
(unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState);
@@ -164,7 +255,7 @@ queueStrtNewWrkThrd(queue_t *pThis)
dbgprintf("Queue %p: search thrd tbl slot: i %d, CuccCmd %d\n", pThis, i, pThis->pWrkThrds[i].tCurrCmd);
if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_STOPPED) {
break;
- } else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_INIT) {
+ } else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_CREATED) {
iStartingUp = i;
break;
}
@@ -176,7 +267,7 @@ dbgprintf("Queue %p: after thrd search: i %d, iStartingUp %d\n", pThis, i, iStar
assert(i <= pThis->iNumWorkerThreads); /* now there must be a free spot, else something is really wrong! */
- queueTellWrkThrd(pThis, i, eWRKTHRD_RUN_INIT);
+ qWrkrSetState(&pThis->pWrkThrds[i], eWRKTHRD_RUN_CREATED);
iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis);
dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n",
(unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState);
@@ -206,7 +297,7 @@ queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd)
/* tell the workers our request */
for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i)
- if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATED)
+ if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING)
queueTellActWrkThrd(pThis, i, tCmd);
return iRet;
@@ -271,7 +362,7 @@ queueChkAndStrtWrk(queue_t *pThis)
queueChkWrkThrdChanges(pThis);
/* check if we need to start up another worker (only in regular mode) */
- if(pThis->qRunsDA == QRUNS_REGULAR) {
+ if(pThis->qRunsDA == QRUNS_REGULAR && pThis->bEnqOnly == 0) {
if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) {
dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qRunsDA: %d\n",
pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA);
@@ -320,14 +411,14 @@ queueTurnOffDAMode(queue_t *pThis)
* messages come into the queue, we may be well off with a single worker.
* rgerhards, 2008-01-16
*/
- queueStrtNewWrkThrd(pThis);
+ if(pThis->bEnqOnly == 0)
+ queueStrtNewWrkThrd(pThis);
pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */
/* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty,
* this will be quick.
*/
- queueDestruct(pThis->pqDA); /* and now we are ready to destruct the DA queue */
- pThis->pqDA = NULL;
+ queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */
/* now free the remaining resources */
pthread_mutex_destroy(&pThis->mutDA);
@@ -358,11 +449,12 @@ queueChkWrkThrdChanges(queue_t *pThis)
/* go through all threads (including DA thread) */
for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) {
switch(pThis->pWrkThrds[i].tCurrCmd) {
- case eWRKTHRD_TERMINATED:
+ case eWRKTHRD_TERMINATING:
queueJoinWrkThrd(pThis, i);
break;
/* these cases just to satisfy the compiler, we do not act an them: */
case eWRKTHRD_STOPPED:
+ case eWRKTHRD_RUN_CREATED:
case eWRKTHRD_RUN_INIT:
case eWRKTHRD_RUNNING:
case eWRKTHRD_SHUTDOWN:
@@ -428,7 +520,7 @@ dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx,
* Note that the child queue now in almost all cases is non-empty, because we just enqueued
* a message.
*/
- if(iQueueSize <= pThis->iLowWtrMrk && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) {
+ if(iQueueSize <= pThis->iLowWtrMrk && iQueueSize != 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) {
dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n",
queueGetID(pThis), iMyThrdIndx, iQueueSize);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
@@ -494,6 +586,7 @@ queueStrtDA(queue_t *pThis)
CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq));
+ CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly));
CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0));
if(pThis->toQShutdown == 0) {
@@ -536,8 +629,7 @@ queueStrtDA(queue_t *pThis)
finalize_it:
if(iRet != RS_RET_OK) {
if(pThis->pqDA != NULL) {
- queueDestruct(pThis->pqDA);
- pThis->pqDA = NULL;
+ queueDestruct(&pThis->pqDA);
}
dbgprintf("Queue 0x%lx: error %d creating disk queue - giving up.\n",
queueGetID(pThis), iRet);
@@ -549,19 +641,24 @@ finalize_it:
/* initiate DA mode
+ * param bEnqOnly tells if the disk queue is to be run in enqueue-only mode. This may
+ * be needed during shutdown of memory queues which need to be persisted to disk.
* rgerhards, 2008-01-16
*/
static inline rsRetVal
-queueInitDA(queue_t *pThis)
+queueInitDA(queue_t *pThis, int bEnqOnly)
{
DEFiRet;
/* indicate we now run in DA mode - this is reset by the DA worker if it fails */
pThis->qRunsDA = QRUNS_DA_INIT;
+ pThis->bDAEnqOnly = bEnqOnly;
- /* now we must start our DA worker thread - it does the rest of the initialization */
- // DA-input only mode!
- iRet = queueStrtWrkThrd(pThis, 0);
+ /* now we must start our DA worker thread - it does the rest of the initialization
+ * In enqueue-only mode, we do not start any workers.
+ */
+ if(pThis->bEnqOnly == 0)
+ iRet = queueStrtWrkThrd(pThis, 0);
return iRet;
}
@@ -606,7 +703,7 @@ queueChkStrtDA(queue_t *pThis)
dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n",
queueGetID(pThis), pThis->iQueueSize);
- queueInitDA(pThis); /* initiate DA mode */
+ queueInitDA(pThis, QUEUE_MODE_ENQDEQ); /* initiate DA mode */
}
finalize_it:
@@ -800,7 +897,6 @@ queueHaveQIF(queue_t *pThis)
(char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
/* check if the file exists */
-dbgprintf("stat HaveQIF '%s'\n", pszQIFNam);
if(stat((char*) pszQIFNam, &stat_buf) == -1) {
if(errno == ENOENT) {
dbgprintf("Queue 0x%lx: no .qi file found\n", queueGetID(pThis));
@@ -874,7 +970,7 @@ queueTryLoadPersistedInfo(queue_t *pThis)
finalize_it:
if(psQIF != NULL)
- strmDestruct(psQIF);
+ strmDestruct(&psQIF);
if(iRet != RS_RET_OK) {
dbgprintf("Queue 0x%lx: error %d reading .qi file - can not read persisted info (if any)\n",
@@ -949,8 +1045,8 @@ static rsRetVal qDestructDisk(queue_t *pThis)
assert(pThis != NULL);
- strmDestruct(pThis->tVars.disk.pWrite);
- strmDestruct(pThis->tVars.disk.pRead);
+ strmDestruct(&pThis->tVars.disk.pWrite);
+ strmDestruct(&pThis->tVars.disk.pRead);
if(pThis->pszSpoolDir != NULL)
free(pThis->pszSpoolDir);
@@ -1068,7 +1164,6 @@ queueWrkThrdReqTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int bIncludeDAWrk)
{
DEFiRet;
- // DA-input only
if(bIncludeDAWrk) {
queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */
queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
@@ -1093,13 +1188,19 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout)
struct timespec t;
queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */
+dbgprintf("WrkThrdTrm 0\n");
queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
+ /* race: must make sure all are running! */
+dbgprintf("WrkThrdTrm 1\n");
queueTimeoutComp(&t, iTimeout);/* get timeout */
+dbgprintf("WrkThrdTrm 2\n");
/* and wait for their termination */
pthread_mutex_lock(pThis->mut);
bTimedOut = 0;
+dbgprintf("WrkThrdTrm 3, thrds: %d\n", pThis->iCurNumWrkThrd);
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
+dbgprintf("WrkThrdTrm 4 to %d\n", bTimedOut);
dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n",
queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd);
@@ -1128,12 +1229,15 @@ queueWrkThrdCancel(queue_t *pThis)
// TODO: we need to implement peek(), without it (today!) we lose one message upon
// worker cancellation! -- rgerhards, 2008-01-14
+ /* process any pending thread requests so that we know who actually is still running */
+ queueChkWrkThrdChanges(pThis);
+
/* awake the workers one more time, just to be sure */
queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */
/* first tell the workers our request */
for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i)
- if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATED) {
+ if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) {
dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i);
pthread_cancel(pThis->pWrkThrds[i].thrdID);
}
@@ -1196,14 +1300,14 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis)
}
-/* This is a helper for queueWorker() it either calls the configured
+/* This is a helper for queueWorker () it either calls the configured
* consumer or the DA-consumer (if in disk-assisted mode). It is
* protected by the queue mutex, but MUST release it as soon as possible.
* Most importantly, it must release it before the consumer is called.
* rgerhards, 2008-01-14
*/
static inline rsRetVal
-queueWorkerChkAndCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave)
+queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancelStateSave)
{
DEFiRet;
rsRetVal iRetLocal;
@@ -1211,7 +1315,12 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateS
int iQueueSize;
void *pUsr;
int qRunsDA;
+ int iMyThrdIndx;
+ ISOBJ_TYPE_assert(pThis, queue);
+ assert(pWrkrInst != NULL);
+
+ iMyThrdIndx = pWrkrInst->iThrd;
/* first check if we have still something to process */
if(pThis->iQueueSize == 0 ||
@@ -1228,6 +1337,7 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateS
queueChkPersist(pThis); // when we support peek(), we must do this down after the del!
qRunsDA = pThis->qRunsDA; /* do a local copy so that we prevent a race after mutex release */
iQueueSize = pThis->iQueueSize; /* ... and the same for this property */
+ pWrkrInst->pUsr = pUsr; /* save it for the cancel cleanup handler */
pthread_mutex_unlock(pThis->mut);
pthread_cond_signal(pThis->notFull);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -1274,6 +1384,34 @@ dbgprintf("CallConsumer returns %d\n", iRet);
}
+
+/* cancellation cleanup handler for queueWorker ()
+ * Updates admin structure and frees ressources.
+ * rgerhards, 2008-01-16
+ */
+static void queueWorkerCancelCleanup(void *arg)
+{
+ qWrkThrd_t *pWrkrInst = (qWrkThrd_t*) arg;
+ queue_t *pThis;
+
+ assert(pWrkrInst != NULL);
+ ISOBJ_TYPE_assert(pWrkrInst->pQueue, queue);
+ pThis = pWrkrInst->pQueue;
+
+ dbgprintf("Queue 0x%lx/w%d: cancelation cleanup handler called (NOT FULLY IMPLEMENTED, one msgs lost!)\n",
+ queueGetID(pThis), pWrkrInst->iThrd);
+
+ pThis->iCurNumWrkThrd--; /* one worker less... */
+ pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
+ qWrkrSetState(&pThis->pWrkThrds[pWrkrInst->iThrd], eWRKTHRD_TERMINATING);
+ pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
+
+ /* TODO: re-enqueue the data element! */
+ dbgprintf("Queue 0x%lx/w%d: thread CANCELED with %d entries left in queue, %d workers running.\n",
+ queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize, pThis->iCurNumWrkThrd);
+}
+
+
/* Each queue has at least one associated worker (consumer) thread. It will pull
* the message from the queue and pass it to a user-defined function.
* This function was provided on construction. It MUST be thread-safe.
@@ -1290,6 +1428,7 @@ queueWorker(void *arg)
struct timespec t;
int iMyThrdIndx; /* index for this thread in queue thread table */
int iCancelStateSave;
+ qWrkThrd_t *pWrkrInst; /* for cleanup handler */
ISOBJ_TYPE_assert(pThis, queue);
@@ -1300,17 +1439,13 @@ queueWorker(void *arg)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
- /* first find myself in the queue's thread table */
- for(iMyThrdIndx = 0 ; iMyThrdIndx <= pThis->iNumWorkerThreads ; ++iMyThrdIndx)
- if(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self())
- break;
-dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis,
- (unsigned) pThis->pWrkThrds[iMyThrdIndx].thrdID, iMyThrdIndx, (unsigned) pthread_self());
- assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self());
+ /* initialize our thread instance descriptor */
+ qWrkrInit(&pWrkrInst, pThis);
- dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx);
+ iMyThrdIndx = pWrkrInst->iThrd;
pThis->iCurNumWrkThrd++; /* tell the world there is one more worker */
+ dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx);
if(iMyThrdIndx == 0) { /* are we the DA worker? */
if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */
@@ -1323,16 +1458,18 @@ dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis,
* because someone may have requested us to shut down even before we got a chance to do
* our init. That would be a bad race... -- rgerhards, 2008-01-16
*/
- if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUN_INIT)
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_RUNNING; /* we are running now! */
+ if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT)
+ qWrkrSetState(pWrkrInst, eWRKTHRD_RUNNING); /* we are running now! */
+
+ pthread_cleanup_push(queueWorkerCancelCleanup, pWrkrInst);
pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
/* end one-time stuff */
/* now we have our identity, on to real processing */
- while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING
- || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN && pThis->iQueueSize > 0)) {
+ while( (qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING)
+ || (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN && pThis->iQueueSize > 0)) {
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
@@ -1340,7 +1477,7 @@ dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis,
queueChkWrkThrdChanges(pThis);
dbgprintf("Queue %p/w%d: pre empty queue, qsize %d\n", pThis, iMyThrdIndx, pThis->iQueueSize);
- while(pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) {
+ while(pThis->iQueueSize == 0 && qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING) {
dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n",
queueGetID(pThis), iMyThrdIndx);
if(pThis->bSignalOnEmpty > 0) {
@@ -1377,13 +1514,13 @@ dbgprintf("worker never times out!\n");
/* we use SHUTDOWN (and not SHUTDOWN_IMMEDIATE) so that the worker
* does not terminate if in the mean time a new message arrived.
*/
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_SHUTDOWN;
+ qWrkrSetState(pWrkrInst, eWRKTHRD_SHUTDOWN);
}
}
dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx);
}
- queueWorkerChkAndCallConsumer(pThis, iMyThrdIndx, iCancelStateSave);
+ queueWorkerChkAndCallConsumer(pThis, pWrkrInst, iCancelStateSave);
/* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
* a cancellation point in itself. As we run most of the time without cancel enabled, I fear
@@ -1406,7 +1543,7 @@ dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx);
pthread_yield();
dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n",
queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd);
- if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0)
+ if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0)
dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has "
" %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize);
}
@@ -1415,17 +1552,21 @@ dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n",
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_mutex_lock(pThis->mut);
pThis->iCurNumWrkThrd--;
- if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN ||
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN_IMMEDIATE) {
+ pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
+ pthread_cleanup_pop(0); /* remove cleanup handler */
+
+ /* if we ever need finalize_it, here would be the place for it! */
+ if(qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN ||
+ qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN_IMMEDIATE ||
+ qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT ||
+ qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_CREATED) {
/* in shutdown case, we need to flag termination. All other commands
* have a meaning to the thread harvester, so we can not overwrite them
*/
- pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_TERMINATED;
+dbgprintf("Queue 0x%lx/w%d: setting termination state\n", queueGetID(pThis), iMyThrdIndx);
+ qWrkrSetState(pWrkrInst, eWRKTHRD_TERMINATING);
}
pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
- pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */
- dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n",
- queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd);
pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -1517,20 +1658,27 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
DEFiRet;
rsRetVal iRetLocal;
int bInitialized = 0; /* is queue already initialized? */
+ int i;
assert(pThis != NULL);
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
- dbgprintf("Queue 0x%lx: type %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis), pThis->qType,
- pThis->bIsDA, pThis->iMaxFileSize);
+ dbgprintf("Queue 0x%lx: type %d, enq-only %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis),
+ pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
+ /* initialize worker thread instances
+ * TODO: move to separate function
+ */
if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ for(i = 0 ; i < pThis->iNumWorkerThreads + 1 ; ++i) {
+ qWrkrConstructFinalize(&pThis->pWrkThrds[i], pThis, i);
+ }
if(pThis->bIsDA) {
/* If we are disk-assisted, we need to check if there is a QIF file
@@ -1541,7 +1689,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n",
queueGetID(pThis));
- queueInitDA(pThis); /* initiate DA mode */
+ queueInitDA(pThis, QUEUE_MODE_ENQDEQ); /* initiate DA mode */
bInitialized = 1; /* we are done */
} else {
// TODO: use logerror? -- rgerhards, 2008-01-16
@@ -1552,12 +1700,10 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
if(!bInitialized) {
dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis));
- /* worker 0 is reserved for disk-assisted mode, so do not start */
- queueTellWrkThrd(pThis, 0, eWRKTHRD_STOPPED);
-
/* fire up the worker threads */
// TODO: preforked workers! queueStrtAllWrkThrds(pThis);
}
+ pThis->bQueueStarted = 1;
finalize_it:
return iRet;
@@ -1634,7 +1780,7 @@ static rsRetVal queuePersist(queue_t *pThis)
finalize_it:
if(psQIF != NULL)
- strmDestruct(psQIF);
+ strmDestruct(&psQIF);
return iRet;
}
@@ -1661,44 +1807,60 @@ rsRetVal queueChkPersist(queue_t *pThis)
/* destructor for the queue object */
-rsRetVal queueDestruct(queue_t *pThis)
+rsRetVal queueDestruct(queue_t **ppThis)
{
+ queue_t *pThis;
DEFiRet;
- assert(pThis != NULL);
+ assert(ppThis != NULL);
+ pThis = *ppThis;
+ ISOBJ_TYPE_assert(pThis, queue);
- /* if running DA, tell the DA workers to shut down. This saves us some CPU cycles which
- * we can use to persist the remaining in-memory data to disk quicker. -- rgerhads, 2008-01-16
- * TODO: we actually need to change the queue to an "input-only" mode, that also prevents
- * startup of the thread again further down in the process. None of that really hurts, so we
- * leave it for the time being. -- rgerhards, 2008-01-16
+pThis->bSaveOnShutdown = 1; // TODO: Test remove
+ /* if running DA, switch the DA queue to enqueue-only mode. That saves us some CPU cycles as
+ * its workers do no longer need to run. It also prevents longer-running actions to spring into
+ * existence while we are draining the main (memory) queue. -- rgerhads, 2008-01-16
*/
- if(pThis->qRunsDA != QRUNS_REGULAR)
- queueWrkThrdReqTrm(pThis->pqDA, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0);
- // DA-input only
+ if(pThis->qRunsDA != QRUNS_REGULAR) {
+ queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */
+ if(pThis->bSaveOnShutdown)
+ pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL;
+ }
/* then, terminate our own worker threads */
if(pThis->pWrkThrds != NULL) {
queueShutdownWorkers(pThis);
- free(pThis->pWrkThrds);
- pThis->pWrkThrds = NULL;
}
- /* of we have now data left in in-memory queues, this data will be lost if we do not
- * persist it to a disk queue.
- * TODO: implement code rgerhards, 2008-01-16
+ /* If we currently run in DA mode, the in-memory queue is already persisted to disk.
+ * If we are not in DA mode, we may have data left in in-memory queues, this data will
+ * be lost if we do not persist it to a disk queue. So, if configured to do so, we will
+ * now start DA mode just to drain our queue. -- rgerhards, 2008-01-16
+ * TODO: move to persist function!
*/
+ if(pThis->iQueueSize > 0 && pThis->bSaveOnShutdown && pThis->bIsDA) {
+ dbgprintf("Queue 0x%lx: in-memory queue contains %d entries after worker shutdown - using DA to save to disk\n",
+ queueGetID(pThis), pThis->iQueueSize);
+ pThis->iLowWtrMrk = 0; /* disable low water mark algo */
+ queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* start DA queue in enqueue-only mode */
+ pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL;
+ queueShutdownWorkers(pThis);
+ }
/* if running DA, terminate disk queue */
if(pThis->qRunsDA != QRUNS_REGULAR)
- queueDestruct(pThis->pqDA);
+ queueDestruct(&pThis->pqDA);
- /* persist the queue (we always do that - queuePersits() does cleanup it the queue is empty) */
+ /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) */
CHKiRet_Hdlr(queuePersist(pThis)) {
dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet);
}
/* ... then free resources */
+ if(pThis->pWrkThrds != NULL) {
+ free(pThis->pWrkThrds);
+ pThis->pWrkThrds = NULL;
+ }
pthread_mutex_destroy(pThis->mut);
free(pThis->mut);
pthread_cond_destroy(pThis->notFull);
@@ -1713,6 +1875,7 @@ rsRetVal queueDestruct(queue_t *pThis)
/* and finally delete the queue objet itself */
free(pThis);
+ *ppThis = NULL;
return iRet;
}
@@ -1845,6 +2008,50 @@ finalize_it:
return iRet;
}
+
+/* set queue mode to enqueue only or not
+ * rgerhards, 2008-01-16
+ */
+static rsRetVal
+queueSetEnqOnly(queue_t *pThis, int bEnqOnly)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ISOBJ_TYPE_assert(pThis, queue);
+
+ /* for simplicity, we do one big mutex lock. This method is extremely seldom
+ * called, so that doesn't matter... -- rgerhards, 2008-01-16
+ */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ pthread_mutex_lock(pThis->mut);
+
+ if(bEnqOnly == pThis->bEnqOnly)
+ FINALIZE; /* no change, nothing to do */
+
+ if(pThis->bQueueStarted) {
+ /* we need to adjust queue operation only if we are not during initial param setup */
+ if(bEnqOnly == 1) {
+ /* switch to enqueue-only mode */
+ /* this means we need to terminate all workers - that's it... */
+ dbgprintf("Queue 0x%lx: switching to enqueue-only mode, terminating all worker threads\n",
+ queueGetID(pThis));
+ queueWrkThrdReqTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0);
+ } else {
+ /* switch back to regular mode */
+ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */
+ }
+ }
+
+ pThis->bEnqOnly = bEnqOnly;
+
+finalize_it:
+ pthread_mutex_unlock(pThis->mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ return iRet;
+}
+
+
/* some simple object access methods */
DEFpropSetMeth(queue, iPersistUpdCnt, int);
DEFpropSetMeth(queue, toQShutdown, long);
diff --git a/queue.h b/queue.h
index 03a3517b..e68467b9 100644
--- a/queue.h
+++ b/queue.h
@@ -63,23 +63,31 @@ typedef struct qLinkedList_S {
/* commands and states for worker threads. */
typedef enum {
eWRKTHRD_STOPPED = 0, /* worker thread is not running (either actually never ran or was shut down) */
- eWRKTHRD_TERMINATED = 1,/* worker thread has shut down, but some finalzing is still needed */
+ eWRKTHRD_TERMINATING = 1,/* worker thread has shut down, but some finalzing is still needed */
/* ALL active states MUST be numerically higher than eWRKTHRD_TERMINATED and NONE must be lower! */
- eWRKTHRD_RUN_INIT = 2, /* worker thread is initializing, but not yet fully running */
- eWRKTHRD_RUNNING = 3, /* worker thread is up and running and shall continue to do so */
- eWRKTHRD_SHUTDOWN = 4, /* worker thread is running but shall terminate when queue is empty */
- eWRKTHRD_SHUTDOWN_IMMEDIATE = 5/* worker thread is running but shall terminate even if queue is full */
+ eWRKTHRD_RUN_CREATED = 2,/* worker thread has been created, but not yet begun initialization (prob. not yet scheduled) */
+ eWRKTHRD_RUN_INIT = 3, /* worker thread is initializing, but not yet fully running */
+ eWRKTHRD_RUNNING = 4, /* worker thread is up and running and shall continue to do so */
+ eWRKTHRD_SHUTDOWN = 5, /* worker thread is running but shall terminate when queue is empty */
+ eWRKTHRD_SHUTDOWN_IMMEDIATE = 6/* worker thread is running but shall terminate even if queue is full */
} qWrkCmd_t;
typedef struct qWrkThrd_s {
pthread_t thrdID; /* thread ID */
qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
+ obj_t *pUsr; /* current user object being processed (or NULL if none) */
+ struct queue_s *pQueue; /* my queue (important if only the work thread instance is passed! */
+ int iThrd; /* my worker thread array index */
+ pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */
} qWrkThrd_t; /* type for queue worker threads */
/* the queue object */
typedef struct queue_s {
BEGINobjInstance;
queueType_t qType;
+ int bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
+ int bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
+ int bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
int iQueueSize; /* Current number of elements in the queue */
int iMaxQueueSize; /* how large can the queue grow? */
int iNumWorkerThreads;/* number of worker threads to use */
@@ -135,6 +143,7 @@ typedef struct queue_s {
pthread_mutex_t mutDA; /* mutex for low water mark algo */
pthread_cond_t condDA; /* and its matching condition */
struct queue_s *pqDA; /* queue for disk-assisted modes */
+ int bDAEnqOnly; /* EnqOnly setting for DA queue */
/* now follow queueing mode specific data elements */
union { /* different data elements based on queue type (qType) */
struct {
@@ -152,8 +161,19 @@ typedef struct queue_s {
} tVars;
} queue_t;
+/* some symbolic constants for easier reference */
+#define QUEUE_MODE_ENQDEQ 0
+#define QUEUE_MODE_ENQONLY 1
+
+/* the define below is an "eternal" timeout for the timeout settings which require a value.
+ * It is one day, which is not really eternal, but comes close to it if we think about
+ * rsyslog (e.g.: do you want to wait on shutdown for more than a day? ;))
+ * rgerhards, 2008-01-17
+ */
+#define QUEUE_TIMEOUT_ETERNAL 24 * 60 * 60 * 1000
+
/* prototypes */
-rsRetVal queueDestruct(queue_t *pThis);
+rsRetVal queueDestruct(queue_t **ppThis);
rsRetVal queueEnqObj(queue_t *pThis, void *pUsr);
rsRetVal queueStart(queue_t *pThis);
rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize);
diff --git a/rsyslog.h b/rsyslog.h
index e37263e3..619836d8 100644
--- a/rsyslog.h
+++ b/rsyslog.h
@@ -128,7 +128,7 @@ typedef enum rsRetVal_ rsRetVal; /**< friendly type for global return value */
#define CHKiRet_Hdlr(code) if((iRet = code) != RS_RET_OK)
/* macro below is used in conjunction with CHKiRet_Hdlr, else use ABORT_FINALIZE */
#define FINALIZE goto finalize_it;
-#if 0 /* DEV debug: set to 1 to get a rough call trace -- rgerhards, 2008-01-13 */
+#if 1 /* DEV debug: set to 1 to get a rough call trace -- rgerhards, 2008-01-13 */
# define DEFiRet dbgprintf("Entering %s, line %d\n", __FILE__, __LINE__); rsRetVal iRet = RS_RET_OK
#else
# define DEFiRet rsRetVal iRet = RS_RET_OK
diff --git a/stream.c b/stream.c
index 27fc8a41..15d9dcf6 100644
--- a/stream.c
+++ b/stream.c
@@ -324,10 +324,13 @@ finalize_it:
/* destructor for the strm object */
-rsRetVal strmDestruct(strm_t *pThis)
+rsRetVal strmDestruct(strm_t **ppThis)
{
+ strm_t *pThis;
DEFiRet;
+ assert(ppThis != NULL);
+ pThis = *ppThis;
ISOBJ_TYPE_assert(pThis, strm);
if(pThis->tOperationsMode == STREAMMODE_WRITE)
@@ -342,6 +345,7 @@ rsRetVal strmDestruct(strm_t *pThis)
/* and finally delete the strm objet itself */
free(pThis);
+ *ppThis = NULL;
return iRet;
}
diff --git a/stream.h b/stream.h
index 34eb78e2..0ac32b12 100644
--- a/stream.h
+++ b/stream.h
@@ -91,7 +91,7 @@ typedef struct strm_s {
/* prototypes */
rsRetVal strmConstruct(strm_t **ppThis);
rsRetVal strmConstructFinalize(strm_t __attribute__((unused)) *pThis);
-rsRetVal strmDestruct(strm_t *pThis);
+rsRetVal strmDestruct(strm_t **ppThis);
rsRetVal strmSetMaxFileSize(strm_t *pThis, size_t iMaxFileSize);
rsRetVal strmSetFileName(strm_t *pThis, uchar *pszName, size_t iLenName);
rsRetVal strmReadChar(strm_t *pThis, uchar *pC);
diff --git a/syslogd.c b/syslogd.c
index 99f90402..f62737cc 100644
--- a/syslogd.c
+++ b/syslogd.c
@@ -1729,7 +1729,7 @@ static rsRetVal callAction(msg_t *pMsg, action_t *pAction)
pAction->f_prevcount, time(NULL) - pAction->f_time,
repeatinterval[pAction->f_repeatcount]);
/* use current message, so we have the new timestamp (means we need to discard previous one) */
- MsgDestruct(pAction->f_pMsg);
+ MsgDestruct(&pAction->f_pMsg);
pAction->f_pMsg = MsgAddRef(pMsg);
/* If domark would have logged this by now, flush it now (so we don't hold
* isolated messages), but back off so we'll flush less often in the future.
@@ -1749,7 +1749,7 @@ static rsRetVal callAction(msg_t *pMsg, action_t *pAction)
/* we do not care about iRet above - I think it's right but if we have
* some troubles, you know where to look at ;) -- rgerhards, 2007-08-01
*/
- MsgDestruct(pAction->f_pMsg);
+ MsgDestruct(&pAction->f_pMsg);
}
pAction->f_pMsg = MsgAddRef(pMsg);
/* call the output driver */
@@ -1848,7 +1848,7 @@ msgConsumer(void *pUsr)
assert(pMsg != NULL);
processMsg(pMsg);
- MsgDestruct(pMsg);
+ MsgDestruct(&pMsg);
return RS_RET_OK;
}
@@ -2267,14 +2267,14 @@ logmsg(int pri, msg_t *pMsg, int flags)
dbgprintf("Message has syslog-protocol format.\n");
setProtocolVersion(pMsg, 1);
if(parseRFCSyslogMsg(pMsg, flags) == 1) {
- MsgDestruct(pMsg);
+ MsgDestruct(&pMsg);
return;
}
} else { /* we have legacy syslog */
dbgprintf("Message has legacy syslog format.\n");
setProtocolVersion(pMsg, 0);
if(parseLegacySyslogMsg(pMsg, flags) == 1) {
- MsgDestruct(pMsg);
+ MsgDestruct(&pMsg);
return;
}
}
@@ -2428,7 +2428,7 @@ finalize_it:
* message object will be discarded by our callers, so this is nothing
* of our business. rgerhards, 2007-07-10
*/
- MsgDestruct(pAction->f_pMsg);
+ MsgDestruct(&pAction->f_pMsg);
pAction->f_pMsg = pMsgSave; /* restore it */
}
@@ -2617,7 +2617,7 @@ die(int sig)
/* drain queue (if configured so) and stop main queue worker thread pool */
dbgprintf("Terminating main queue...\n");
- queueDestruct(pMsgQueue);
+ queueDestruct(&pMsgQueue);
pMsgQueue = NULL;
/* Free ressources and close connections. This includes flushing any remaining
@@ -3296,7 +3296,7 @@ init(void)
/* delete the message queue, which also flushes all messages left over */
if(pMsgQueue != NULL) {
dbgprintf("deleting main message queue\n");
- queueDestruct(pMsgQueue); /* delete pThis here! */
+ queueDestruct(&pMsgQueue); /* delete pThis here! */
pMsgQueue = NULL;
}