From ed0363210c34002e5cfbab553506573f5b8a13a5 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 17 Jan 2008 12:45:10 +0000 Subject: worked on threading --- action.c | 2 +- iminternal.c | 2 +- msg.c | 13 +- msg.h | 2 +- obj.h | 2 +- queue.c | 381 +++++++++++++++++++++++++++++++++++++++++++++-------------- queue.h | 32 ++++- rsyslog.h | 2 +- stream.c | 6 +- stream.h | 2 +- syslogd.c | 16 +-- 11 files changed, 348 insertions(+), 112 deletions(-) diff --git a/action.c b/action.c index 206afe29..04e72a94 100644 --- a/action.c +++ b/action.c @@ -52,7 +52,7 @@ rsRetVal actionDestruct(action_t *pThis) pThis->pMod->freeInstance(pThis->pModData); if(pThis->f_pMsg != NULL) - MsgDestruct(pThis->f_pMsg); + MsgDestruct(&pThis->f_pMsg); SYNC_OBJ_TOOL_EXIT(pThis); if(pThis->ppTpl != NULL) diff --git a/iminternal.c b/iminternal.c index a45fd52e..86d5097c 100644 --- a/iminternal.c +++ b/iminternal.c @@ -49,7 +49,7 @@ static rsRetVal iminternalDestruct(iminternal_t *pThis) assert(pThis != NULL); if(pThis->pMsg != NULL) - MsgDestruct(pThis->pMsg); + MsgDestruct(&pThis->pMsg); free(pThis); diff --git a/msg.c b/msg.c index 948274bf..0961afd4 100644 --- a/msg.c +++ b/msg.c @@ -232,8 +232,12 @@ finalize_it: /* Destructor for a msg "object". Must be called to dispose * of a msg object. */ -rsRetVal MsgDestruct(msg_t * pM) -{ +rsRetVal MsgDestruct(msg_t **ppM) +{ + msg_t *pM; + + assert(ppM != NULL); + pM = *ppM; assert(pM != NULL); /* DEV Debugging only ! dbgprintf("MsgDestruct\t0x%lx, Ref now: %d\n", (unsigned long)pM, pM->iRefCount - 1); */ if(--pM->iRefCount == 0) @@ -289,6 +293,7 @@ rsRetVal MsgDestruct(msg_t * pM) rsCStrDestruct(pM->pCSMSGID); funcDeleteMutex(pM); free(pM); + *ppM = NULL; } return RS_RET_OK; @@ -302,7 +307,7 @@ rsRetVal MsgDestruct(msg_t * pM) #define tmpCOPYSZ(name) \ if(pOld->psz##name != NULL) { \ if((pNew->psz##name = srUtilStrDup(pOld->psz##name, pOld->iLen##name)) == NULL) {\ - MsgDestruct(pNew);\ + MsgDestruct(&pNew);\ return NULL;\ }\ pNew->iLen##name = pOld->iLen##name;\ @@ -315,7 +320,7 @@ rsRetVal MsgDestruct(msg_t * pM) #define tmpCOPYCSTR(name) \ if(pOld->pCS##name != NULL) {\ if(rsCStrConstructFromCStr(&(pNew->pCS##name), pOld->pCS##name) != RS_RET_OK) {\ - MsgDestruct(pNew);\ + MsgDestruct(&pNew);\ return NULL;\ }\ } diff --git a/msg.h b/msg.h index bd8d4f89..842a9349 100644 --- a/msg.h +++ b/msg.h @@ -109,7 +109,7 @@ typedef struct msg msg_t; /* new name */ PROTOTYPEObjClassInit(Msg); char* getProgramName(msg_t*); rsRetVal MsgConstruct(msg_t **ppThis); -rsRetVal MsgDestruct(msg_t * pM); +rsRetVal MsgDestruct(msg_t **ppM); msg_t* MsgDup(msg_t* pOld); msg_t *MsgAddRef(msg_t *pM); void setProtocolVersion(msg_t *pM, int iNewVersion); diff --git a/obj.h b/obj.h index f4a9aee6..6b965e10 100644 --- a/obj.h +++ b/obj.h @@ -78,7 +78,7 @@ #else # define objConstructSetObjInfo(pThis) ((obj_t*) (pThis))->pObjInfo = pObjInfoOBJ #endif -#define objDestruct(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_DESTRUCT])(pThis) +#define objDestruct(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_DESTRUCT])(&pThis) #define objSerialize(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_SERIALIZE]) #define objGetSeverity(pThis, piSever) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_GETSEVERITY])(pThis, piSever) diff --git a/queue.c b/queue.c index 9003b344..2b241d82 100644 --- a/queue.c +++ b/queue.c @@ -1,6 +1,6 @@ - // DA-input only +// TODO: start up the correct num of workers when switching to non-DA mode // TODO: "preforked" worker threads -// TODO: do an if(debug) in dbgrintf - performanc ein release build! +// TODO: do an if(debug) in dbgrintf - performance in release build! // TODO: peekmsg() on first entry, with new/inprogress/deleted entry, destruction in // call consumer state. Facilitates retaining messages in queue until action could // be called! @@ -56,9 +56,61 @@ rsRetVal queueChkPersist(queue_t *pThis); static void *queueWorker(void *arg); static rsRetVal queueGetQueueSize(queue_t *pThis, int *piQueueSize); static rsRetVal queueChkWrkThrdChanges(queue_t *pThis); +static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly); /* methods */ +/* get the current worker state. For simplicity and speed, we have + * NOT used our regular calling interface this time. I hope that won't + * bite in the long term... -- rgerhards, 2008-01-17 + */ +static inline qWrkCmd_t +qWrkrGetState(qWrkThrd_t *pThis) +{ + assert(pThis != NULL); + return pThis->tCurrCmd; +} + + +/* send a command to a specific thread + */ +static rsRetVal +qWrkrSetState(qWrkThrd_t *pThis, qWrkCmd_t tCmd) +{ + DEFiRet; + + assert(pThis != NULL); + dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis->pQueue), tCmd, pThis->iThrd); + + /* change some admin structures */ + switch(tCmd) { + case eWRKTHRD_TERMINATING: + pthread_cond_destroy(&pThis->condInitDone); + dbgprintf("Queue 0x%lx/w%d: thread terminating with %d entries left in queue, %d workers running.\n", + queueGetID(pThis->pQueue), pThis->iThrd, pThis->pQueue->iQueueSize, + pThis->pQueue->iCurNumWrkThrd); + break; + case eWRKTHRD_RUN_CREATED: + pthread_cond_init(&pThis->condInitDone, NULL); + break; + case eWRKTHRD_RUN_INIT: + break; + case eWRKTHRD_RUNNING: + pthread_cond_signal(&pThis->condInitDone); + break; + /* these cases just to satisfy the compiler, we do (yet) not act an them: */ + case eWRKTHRD_STOPPED: + case eWRKTHRD_SHUTDOWN: + case eWRKTHRD_SHUTDOWN_IMMEDIATE: + /* DO NOTHING */ + break; + } + + pThis->tCurrCmd = tCmd; + + return iRet; +} + /* send a command to a specific active thread. If the thread is not * active, the command is not sent. */ @@ -70,9 +122,9 @@ queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) ISOBJ_TYPE_assert(pThis, queue); assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); - if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_INIT) { + if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_CREATED) { dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx); - pThis->pWrkThrds[iIdx].tCurrCmd = tCmd; + qWrkrSetState(&pThis->pWrkThrds[iIdx], tCmd); } else { dbgprintf("Queue 0x%lx: command %d NOT sent to inactive thread %d\n", queueGetID(pThis), tCmd, iIdx); } @@ -80,21 +132,60 @@ queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) return iRet; } -/* send a command to a specific thread - * TODO: check if we can run into trouble with inactive threads + +/* Finalize construction of a wWrkrThrd_t "object" + * rgerhards, 2008-01-17 */ static inline rsRetVal -queueTellWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) +qWrkrConstructFinalize(qWrkThrd_t *pThis, queue_t *pQueue, int i) { - DEFiRet; + assert(pThis != NULL); + ISOBJ_TYPE_assert(pQueue, queue); - dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx); - ISOBJ_TYPE_assert(pThis, queue); - assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); + dbgprintf("Queue 0x%lx: finalizing construction of worker %d instance data\n", queueGetID(pQueue), i); - pThis->pWrkThrds[iIdx].tCurrCmd = tCmd; + /* initialize our thread instance descriptor */ + pThis = pQueue->pWrkThrds + i; + pThis->pQueue = pQueue; + pThis->iThrd = i; + pThis->pUsr = NULL; - return iRet; + qWrkrSetState(pThis, eWRKTHRD_STOPPED); + + return RS_RET_OK; +} + + +/* initialize the qWrkThrd_t structure - this MUST be called right after + * startup of a worker thread. -- rgerhards, 2008-01-17 + */ +static inline rsRetVal +qWrkrInit(qWrkThrd_t **ppThis, queue_t *pQueue) +{ + qWrkThrd_t *pThis; + int i; + + assert(ppThis != NULL); + ISOBJ_TYPE_assert(pQueue, queue); + + /* find myself in the queue's thread table */ + for(i = 0 ; i <= pQueue->iNumWorkerThreads ; ++i) + if(pQueue->pWrkThrds[i].thrdID == pthread_self()) + break; +dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pQueue, + (unsigned) pQueue->pWrkThrds[i].thrdID, i, (unsigned) pthread_self()); + assert(pQueue->pWrkThrds[i].thrdID == pthread_self()); + + /* initialize our thread instance descriptor */ + pThis = pQueue->pWrkThrds + i; + pThis->pQueue = pQueue; + pThis->iThrd = i; + pThis->pUsr = NULL; + + *ppThis = pThis; + qWrkrSetState(pThis, eWRKTHRD_RUN_INIT); + + return RS_RET_OK; } @@ -112,9 +203,9 @@ queueJoinWrkThrd(queue_t *pThis, int iIdx) dbgprintf("Queue 0x%lx: thread %d state %d, waiting for exit\n", queueGetID(pThis), iIdx, pThis->pWrkThrds[iIdx].tCurrCmd); pthread_join(pThis->pWrkThrds[iIdx].thrdID, NULL); - pThis->pWrkThrds[iIdx].tCurrCmd = eWRKTHRD_STOPPED; /* back to virgin... */ + qWrkrSetState(&pThis->pWrkThrds[iIdx], eWRKTHRD_STOPPED); /* back to virgin... */ pThis->pWrkThrds[iIdx].thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */ - dbgprintf("Queue 0x%lx: thread %d state %d, has exited\n", queueGetID(pThis), iIdx, + dbgprintf("Queue 0x%lx: thread %d state %d, has stopped\n", queueGetID(pThis), iIdx, pThis->pWrkThrds[iIdx].tCurrCmd); return iRet; @@ -131,9 +222,9 @@ queueStrtWrkThrd(queue_t *pThis, int i) ISOBJ_TYPE_assert(pThis, queue); assert(i >= 0 && i <= pThis->iNumWorkerThreads); - assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRD_RUN_INIT); + assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRD_RUN_CREATED); - queueTellWrkThrd(pThis, i, eWRKTHRD_RUN_INIT); + qWrkrSetState(&pThis->pWrkThrds[i], eWRKTHRD_RUN_CREATED); iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); dbgprintf("Queue 0x%lx: starting Worker thread %x, index %d with state %d.\n", (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); @@ -164,7 +255,7 @@ queueStrtNewWrkThrd(queue_t *pThis) dbgprintf("Queue %p: search thrd tbl slot: i %d, CuccCmd %d\n", pThis, i, pThis->pWrkThrds[i].tCurrCmd); if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_STOPPED) { break; - } else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_INIT) { + } else if(pThis->pWrkThrds[i].tCurrCmd == eWRKTHRD_RUN_CREATED) { iStartingUp = i; break; } @@ -176,7 +267,7 @@ dbgprintf("Queue %p: after thrd search: i %d, iStartingUp %d\n", pThis, i, iStar assert(i <= pThis->iNumWorkerThreads); /* now there must be a free spot, else something is really wrong! */ - queueTellWrkThrd(pThis, i, eWRKTHRD_RUN_INIT); + qWrkrSetState(&pThis->pWrkThrds[i], eWRKTHRD_RUN_CREATED); iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); dbgprintf("Queue 0x%lx: Worker thread %x, index %d started with state %d.\n", (unsigned long) pThis, (unsigned) pThis->pWrkThrds[i].thrdID, i, iState); @@ -206,7 +297,7 @@ queueTellActWrkThrds(queue_t *pThis, int iStartIdx, qWrkCmd_t tCmd) /* tell the workers our request */ for(i = iStartIdx ; i <= pThis->iNumWorkerThreads ; ++i) - if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATED) + if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) queueTellActWrkThrd(pThis, i, tCmd); return iRet; @@ -271,7 +362,7 @@ queueChkAndStrtWrk(queue_t *pThis) queueChkWrkThrdChanges(pThis); /* check if we need to start up another worker (only in regular mode) */ - if(pThis->qRunsDA == QRUNS_REGULAR) { + if(pThis->qRunsDA == QRUNS_REGULAR && pThis->bEnqOnly == 0) { if(pThis->iCurNumWrkThrd < pThis->iNumWorkerThreads) { dbgprintf("Queue %p: less than max workers are running, qsize %d, workers %d, qRunsDA: %d\n", pThis, pThis->iQueueSize, pThis->iCurNumWrkThrd, pThis->qRunsDA); @@ -320,14 +411,14 @@ queueTurnOffDAMode(queue_t *pThis) * messages come into the queue, we may be well off with a single worker. * rgerhards, 2008-01-16 */ - queueStrtNewWrkThrd(pThis); + if(pThis->bEnqOnly == 0) + queueStrtNewWrkThrd(pThis); pThis->qRunsDA = QRUNS_REGULAR; /* tell the world we are back in non-DA mode */ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, * this will be quick. */ - queueDestruct(pThis->pqDA); /* and now we are ready to destruct the DA queue */ - pThis->pqDA = NULL; + queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ /* now free the remaining resources */ pthread_mutex_destroy(&pThis->mutDA); @@ -358,11 +449,12 @@ queueChkWrkThrdChanges(queue_t *pThis) /* go through all threads (including DA thread) */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { switch(pThis->pWrkThrds[i].tCurrCmd) { - case eWRKTHRD_TERMINATED: + case eWRKTHRD_TERMINATING: queueJoinWrkThrd(pThis, i); break; /* these cases just to satisfy the compiler, we do not act an them: */ case eWRKTHRD_STOPPED: + case eWRKTHRD_RUN_CREATED: case eWRKTHRD_RUN_INIT: case eWRKTHRD_RUNNING: case eWRKTHRD_SHUTDOWN: @@ -428,7 +520,7 @@ dbgprintf("Queue %p/w%d: queueDAConsumer, queue size %d\n", pThis, iMyThrdIndx, * Note that the child queue now in almost all cases is non-empty, because we just enqueued * a message. */ - if(iQueueSize <= pThis->iLowWtrMrk && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) { + if(iQueueSize <= pThis->iLowWtrMrk && iQueueSize != 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) { dbgprintf("Queue 0x%lx/w%d: %d entries - passed low water mark in DA mode, sleeping\n", queueGetID(pThis), iMyThrdIndx, iQueueSize); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); @@ -494,6 +586,7 @@ queueStrtDA(queue_t *pThis) CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); + CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly)); CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); if(pThis->toQShutdown == 0) { @@ -536,8 +629,7 @@ queueStrtDA(queue_t *pThis) finalize_it: if(iRet != RS_RET_OK) { if(pThis->pqDA != NULL) { - queueDestruct(pThis->pqDA); - pThis->pqDA = NULL; + queueDestruct(&pThis->pqDA); } dbgprintf("Queue 0x%lx: error %d creating disk queue - giving up.\n", queueGetID(pThis), iRet); @@ -549,19 +641,24 @@ finalize_it: /* initiate DA mode + * param bEnqOnly tells if the disk queue is to be run in enqueue-only mode. This may + * be needed during shutdown of memory queues which need to be persisted to disk. * rgerhards, 2008-01-16 */ static inline rsRetVal -queueInitDA(queue_t *pThis) +queueInitDA(queue_t *pThis, int bEnqOnly) { DEFiRet; /* indicate we now run in DA mode - this is reset by the DA worker if it fails */ pThis->qRunsDA = QRUNS_DA_INIT; + pThis->bDAEnqOnly = bEnqOnly; - /* now we must start our DA worker thread - it does the rest of the initialization */ - // DA-input only mode! - iRet = queueStrtWrkThrd(pThis, 0); + /* now we must start our DA worker thread - it does the rest of the initialization + * In enqueue-only mode, we do not start any workers. + */ + if(pThis->bEnqOnly == 0) + iRet = queueStrtWrkThrd(pThis, 0); return iRet; } @@ -606,7 +703,7 @@ queueChkStrtDA(queue_t *pThis) dbgprintf("Queue 0x%lx: %d entries - passed high water mark for disk-assisted mode, initiating...\n", queueGetID(pThis), pThis->iQueueSize); - queueInitDA(pThis); /* initiate DA mode */ + queueInitDA(pThis, QUEUE_MODE_ENQDEQ); /* initiate DA mode */ } finalize_it: @@ -800,7 +897,6 @@ queueHaveQIF(queue_t *pThis) (char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix); /* check if the file exists */ -dbgprintf("stat HaveQIF '%s'\n", pszQIFNam); if(stat((char*) pszQIFNam, &stat_buf) == -1) { if(errno == ENOENT) { dbgprintf("Queue 0x%lx: no .qi file found\n", queueGetID(pThis)); @@ -874,7 +970,7 @@ queueTryLoadPersistedInfo(queue_t *pThis) finalize_it: if(psQIF != NULL) - strmDestruct(psQIF); + strmDestruct(&psQIF); if(iRet != RS_RET_OK) { dbgprintf("Queue 0x%lx: error %d reading .qi file - can not read persisted info (if any)\n", @@ -949,8 +1045,8 @@ static rsRetVal qDestructDisk(queue_t *pThis) assert(pThis != NULL); - strmDestruct(pThis->tVars.disk.pWrite); - strmDestruct(pThis->tVars.disk.pRead); + strmDestruct(&pThis->tVars.disk.pWrite); + strmDestruct(&pThis->tVars.disk.pRead); if(pThis->pszSpoolDir != NULL) free(pThis->pszSpoolDir); @@ -1068,7 +1164,6 @@ queueWrkThrdReqTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, int bIncludeDAWrk) { DEFiRet; - // DA-input only if(bIncludeDAWrk) { queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ @@ -1093,13 +1188,19 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) struct timespec t; queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ +dbgprintf("WrkThrdTrm 0\n"); queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ + /* race: must make sure all are running! */ +dbgprintf("WrkThrdTrm 1\n"); queueTimeoutComp(&t, iTimeout);/* get timeout */ +dbgprintf("WrkThrdTrm 2\n"); /* and wait for their termination */ pthread_mutex_lock(pThis->mut); bTimedOut = 0; +dbgprintf("WrkThrdTrm 3, thrds: %d\n", pThis->iCurNumWrkThrd); while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { +dbgprintf("WrkThrdTrm 4 to %d\n", bTimedOut); dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n", queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd); @@ -1128,12 +1229,15 @@ queueWrkThrdCancel(queue_t *pThis) // TODO: we need to implement peek(), without it (today!) we lose one message upon // worker cancellation! -- rgerhards, 2008-01-14 + /* process any pending thread requests so that we know who actually is still running */ + queueChkWrkThrdChanges(pThis); + /* awake the workers one more time, just to be sure */ queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ /* first tell the workers our request */ for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) - if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATED) { + if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) { dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i); pthread_cancel(pThis->pWrkThrds[i].thrdID); } @@ -1196,14 +1300,14 @@ static rsRetVal queueShutdownWorkers(queue_t *pThis) } -/* This is a helper for queueWorker() it either calls the configured +/* This is a helper for queueWorker () it either calls the configured * consumer or the DA-consumer (if in disk-assisted mode). It is * protected by the queue mutex, but MUST release it as soon as possible. * Most importantly, it must release it before the consumer is called. * rgerhards, 2008-01-14 */ static inline rsRetVal -queueWorkerChkAndCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateSave) +queueWorkerChkAndCallConsumer(queue_t *pThis, qWrkThrd_t *pWrkrInst, int iCancelStateSave) { DEFiRet; rsRetVal iRetLocal; @@ -1211,7 +1315,12 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateS int iQueueSize; void *pUsr; int qRunsDA; + int iMyThrdIndx; + ISOBJ_TYPE_assert(pThis, queue); + assert(pWrkrInst != NULL); + + iMyThrdIndx = pWrkrInst->iThrd; /* first check if we have still something to process */ if(pThis->iQueueSize == 0 || @@ -1228,6 +1337,7 @@ queueWorkerChkAndCallConsumer(queue_t *pThis, int iMyThrdIndx, int iCancelStateS queueChkPersist(pThis); // when we support peek(), we must do this down after the del! qRunsDA = pThis->qRunsDA; /* do a local copy so that we prevent a race after mutex release */ iQueueSize = pThis->iQueueSize; /* ... and the same for this property */ + pWrkrInst->pUsr = pUsr; /* save it for the cancel cleanup handler */ pthread_mutex_unlock(pThis->mut); pthread_cond_signal(pThis->notFull); pthread_setcancelstate(iCancelStateSave, NULL); @@ -1274,6 +1384,34 @@ dbgprintf("CallConsumer returns %d\n", iRet); } + +/* cancellation cleanup handler for queueWorker () + * Updates admin structure and frees ressources. + * rgerhards, 2008-01-16 + */ +static void queueWorkerCancelCleanup(void *arg) +{ + qWrkThrd_t *pWrkrInst = (qWrkThrd_t*) arg; + queue_t *pThis; + + assert(pWrkrInst != NULL); + ISOBJ_TYPE_assert(pWrkrInst->pQueue, queue); + pThis = pWrkrInst->pQueue; + + dbgprintf("Queue 0x%lx/w%d: cancelation cleanup handler called (NOT FULLY IMPLEMENTED, one msgs lost!)\n", + queueGetID(pThis), pWrkrInst->iThrd); + + pThis->iCurNumWrkThrd--; /* one worker less... */ + pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ + qWrkrSetState(&pThis->pWrkThrds[pWrkrInst->iThrd], eWRKTHRD_TERMINATING); + pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ + + /* TODO: re-enqueue the data element! */ + dbgprintf("Queue 0x%lx/w%d: thread CANCELED with %d entries left in queue, %d workers running.\n", + queueGetID(pThis), pWrkrInst->iThrd, pThis->iQueueSize, pThis->iCurNumWrkThrd); +} + + /* Each queue has at least one associated worker (consumer) thread. It will pull * the message from the queue and pass it to a user-defined function. * This function was provided on construction. It MUST be thread-safe. @@ -1290,6 +1428,7 @@ queueWorker(void *arg) struct timespec t; int iMyThrdIndx; /* index for this thread in queue thread table */ int iCancelStateSave; + qWrkThrd_t *pWrkrInst; /* for cleanup handler */ ISOBJ_TYPE_assert(pThis, queue); @@ -1300,17 +1439,13 @@ queueWorker(void *arg) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); - /* first find myself in the queue's thread table */ - for(iMyThrdIndx = 0 ; iMyThrdIndx <= pThis->iNumWorkerThreads ; ++iMyThrdIndx) - if(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()) - break; -dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis, - (unsigned) pThis->pWrkThrds[iMyThrdIndx].thrdID, iMyThrdIndx, (unsigned) pthread_self()); - assert(pThis->pWrkThrds[iMyThrdIndx].thrdID == pthread_self()); + /* initialize our thread instance descriptor */ + qWrkrInit(&pWrkrInst, pThis); - dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx); + iMyThrdIndx = pWrkrInst->iThrd; pThis->iCurNumWrkThrd++; /* tell the world there is one more worker */ + dbgprintf("Queue 0x%lx/w%d: worker thread startup.\n", queueGetID(pThis), iMyThrdIndx); if(iMyThrdIndx == 0) { /* are we the DA worker? */ if(queueStrtDA(pThis) != RS_RET_OK) { /* then fully initialize the DA queue! */ @@ -1323,16 +1458,18 @@ dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis, * because someone may have requested us to shut down even before we got a chance to do * our init. That would be a bad race... -- rgerhards, 2008-01-16 */ - if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUN_INIT) - pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_RUNNING; /* we are running now! */ + if(qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT) + qWrkrSetState(pWrkrInst, eWRKTHRD_RUNNING); /* we are running now! */ + + pthread_cleanup_push(queueWorkerCancelCleanup, pWrkrInst); pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); /* end one-time stuff */ /* now we have our identity, on to real processing */ - while(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING - || (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN && pThis->iQueueSize > 0)) { + while( (qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING) + || (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN && pThis->iQueueSize > 0)) { pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); @@ -1340,7 +1477,7 @@ dbgprintf("Queue %p, worker start, thrd id found %x, idx %d, self %x\n", pThis, queueChkWrkThrdChanges(pThis); dbgprintf("Queue %p/w%d: pre empty queue, qsize %d\n", pThis, iMyThrdIndx, pThis->iQueueSize); - while(pThis->iQueueSize == 0 && pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_RUNNING) { + while(pThis->iQueueSize == 0 && qWrkrGetState(pWrkrInst) == eWRKTHRD_RUNNING) { dbgprintf("Queue 0x%lx/w%d: queue EMPTY, waiting for next message.\n", queueGetID(pThis), iMyThrdIndx); if(pThis->bSignalOnEmpty > 0) { @@ -1377,13 +1514,13 @@ dbgprintf("worker never times out!\n"); /* we use SHUTDOWN (and not SHUTDOWN_IMMEDIATE) so that the worker * does not terminate if in the mean time a new message arrived. */ - pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_SHUTDOWN; + qWrkrSetState(pWrkrInst, eWRKTHRD_SHUTDOWN); } } dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); } - queueWorkerChkAndCallConsumer(pThis, iMyThrdIndx, iCancelStateSave); + queueWorkerChkAndCallConsumer(pThis, pWrkrInst, iCancelStateSave); /* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is * a cancellation point in itself. As we run most of the time without cancel enabled, I fear @@ -1406,7 +1543,7 @@ dbgprintf("Queue %p/w%d: post condwait ->notEmpty\n", pThis, iMyThrdIndx); pthread_yield(); dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n", queueGetID(pThis), iMyThrdIndx, pThis->pWrkThrds[iMyThrdIndx].tCurrCmd); - if(Debug && (pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0) + if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0) dbgprintf("Queue 0x%lx/w%d: worker does not yet terminate because it still has " " %d messages to process.\n", queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize); } @@ -1415,17 +1552,21 @@ dbgprintf("Queue 0x%lx/w%d: end worker run, queue cmd currently %d\n", pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_mutex_lock(pThis->mut); pThis->iCurNumWrkThrd--; - if(pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN || - pThis->pWrkThrds[iMyThrdIndx].tCurrCmd == eWRKTHRD_SHUTDOWN_IMMEDIATE) { + pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ + pthread_cleanup_pop(0); /* remove cleanup handler */ + + /* if we ever need finalize_it, here would be the place for it! */ + if(qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN || + qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN_IMMEDIATE || + qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT || + qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_CREATED) { /* in shutdown case, we need to flag termination. All other commands * have a meaning to the thread harvester, so we can not overwrite them */ - pThis->pWrkThrds[iMyThrdIndx].tCurrCmd = eWRKTHRD_TERMINATED; +dbgprintf("Queue 0x%lx/w%d: setting termination state\n", queueGetID(pThis), iMyThrdIndx); + qWrkrSetState(pWrkrInst, eWRKTHRD_TERMINATING); } pThis->bThrdStateChanged = 1; /* indicate change, so harverster will be called */ - pthread_cond_signal(&pThis->condThrdTrm); /* important for shutdown situation */ - dbgprintf("Queue 0x%lx/w%d: thread terminates with %d entries left in queue, %d workers running.\n", - queueGetID(pThis), iMyThrdIndx, pThis->iQueueSize, pThis->iCurNumWrkThrd); pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); @@ -1517,20 +1658,27 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ DEFiRet; rsRetVal iRetLocal; int bInitialized = 0; /* is queue already initialized? */ + int i; assert(pThis != NULL); /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ - dbgprintf("Queue 0x%lx: type %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis), pThis->qType, - pThis->bIsDA, pThis->iMaxFileSize); + dbgprintf("Queue 0x%lx: type %d, enq-only %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis), + pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ + /* initialize worker thread instances + * TODO: move to separate function + */ if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + for(i = 0 ; i < pThis->iNumWorkerThreads + 1 ; ++i) { + qWrkrConstructFinalize(&pThis->pWrkThrds[i], pThis, i); + } if(pThis->bIsDA) { /* If we are disk-assisted, we need to check if there is a QIF file @@ -1541,7 +1689,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n", queueGetID(pThis)); - queueInitDA(pThis); /* initiate DA mode */ + queueInitDA(pThis, QUEUE_MODE_ENQDEQ); /* initiate DA mode */ bInitialized = 1; /* we are done */ } else { // TODO: use logerror? -- rgerhards, 2008-01-16 @@ -1552,12 +1700,10 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ if(!bInitialized) { dbgprintf("Queue 0x%lx: queue starts up without (loading) any disk state\n", queueGetID(pThis)); - /* worker 0 is reserved for disk-assisted mode, so do not start */ - queueTellWrkThrd(pThis, 0, eWRKTHRD_STOPPED); - /* fire up the worker threads */ // TODO: preforked workers! queueStrtAllWrkThrds(pThis); } + pThis->bQueueStarted = 1; finalize_it: return iRet; @@ -1634,7 +1780,7 @@ static rsRetVal queuePersist(queue_t *pThis) finalize_it: if(psQIF != NULL) - strmDestruct(psQIF); + strmDestruct(&psQIF); return iRet; } @@ -1661,44 +1807,60 @@ rsRetVal queueChkPersist(queue_t *pThis) /* destructor for the queue object */ -rsRetVal queueDestruct(queue_t *pThis) +rsRetVal queueDestruct(queue_t **ppThis) { + queue_t *pThis; DEFiRet; - assert(pThis != NULL); + assert(ppThis != NULL); + pThis = *ppThis; + ISOBJ_TYPE_assert(pThis, queue); - /* if running DA, tell the DA workers to shut down. This saves us some CPU cycles which - * we can use to persist the remaining in-memory data to disk quicker. -- rgerhads, 2008-01-16 - * TODO: we actually need to change the queue to an "input-only" mode, that also prevents - * startup of the thread again further down in the process. None of that really hurts, so we - * leave it for the time being. -- rgerhards, 2008-01-16 +pThis->bSaveOnShutdown = 1; // TODO: Test remove + /* if running DA, switch the DA queue to enqueue-only mode. That saves us some CPU cycles as + * its workers do no longer need to run. It also prevents longer-running actions to spring into + * existence while we are draining the main (memory) queue. -- rgerhads, 2008-01-16 */ - if(pThis->qRunsDA != QRUNS_REGULAR) - queueWrkThrdReqTrm(pThis->pqDA, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0); - // DA-input only + if(pThis->qRunsDA != QRUNS_REGULAR) { + queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY); /* turn on enqueue-only mode */ + if(pThis->bSaveOnShutdown) + pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; + } /* then, terminate our own worker threads */ if(pThis->pWrkThrds != NULL) { queueShutdownWorkers(pThis); - free(pThis->pWrkThrds); - pThis->pWrkThrds = NULL; } - /* of we have now data left in in-memory queues, this data will be lost if we do not - * persist it to a disk queue. - * TODO: implement code rgerhards, 2008-01-16 + /* If we currently run in DA mode, the in-memory queue is already persisted to disk. + * If we are not in DA mode, we may have data left in in-memory queues, this data will + * be lost if we do not persist it to a disk queue. So, if configured to do so, we will + * now start DA mode just to drain our queue. -- rgerhards, 2008-01-16 + * TODO: move to persist function! */ + if(pThis->iQueueSize > 0 && pThis->bSaveOnShutdown && pThis->bIsDA) { + dbgprintf("Queue 0x%lx: in-memory queue contains %d entries after worker shutdown - using DA to save to disk\n", + queueGetID(pThis), pThis->iQueueSize); + pThis->iLowWtrMrk = 0; /* disable low water mark algo */ + queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* start DA queue in enqueue-only mode */ + pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; + queueShutdownWorkers(pThis); + } /* if running DA, terminate disk queue */ if(pThis->qRunsDA != QRUNS_REGULAR) - queueDestruct(pThis->pqDA); + queueDestruct(&pThis->pqDA); - /* persist the queue (we always do that - queuePersits() does cleanup it the queue is empty) */ + /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) */ CHKiRet_Hdlr(queuePersist(pThis)) { dbgprintf("Queue 0x%lx: error %d persisting queue - data lost!\n", (unsigned long) pThis, iRet); } /* ... then free resources */ + if(pThis->pWrkThrds != NULL) { + free(pThis->pWrkThrds); + pThis->pWrkThrds = NULL; + } pthread_mutex_destroy(pThis->mut); free(pThis->mut); pthread_cond_destroy(pThis->notFull); @@ -1713,6 +1875,7 @@ rsRetVal queueDestruct(queue_t *pThis) /* and finally delete the queue objet itself */ free(pThis); + *ppThis = NULL; return iRet; } @@ -1845,6 +2008,50 @@ finalize_it: return iRet; } + +/* set queue mode to enqueue only or not + * rgerhards, 2008-01-16 + */ +static rsRetVal +queueSetEnqOnly(queue_t *pThis, int bEnqOnly) +{ + DEFiRet; + int iCancelStateSave; + + ISOBJ_TYPE_assert(pThis, queue); + + /* for simplicity, we do one big mutex lock. This method is extremely seldom + * called, so that doesn't matter... -- rgerhards, 2008-01-16 + */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + pthread_mutex_lock(pThis->mut); + + if(bEnqOnly == pThis->bEnqOnly) + FINALIZE; /* no change, nothing to do */ + + if(pThis->bQueueStarted) { + /* we need to adjust queue operation only if we are not during initial param setup */ + if(bEnqOnly == 1) { + /* switch to enqueue-only mode */ + /* this means we need to terminate all workers - that's it... */ + dbgprintf("Queue 0x%lx: switching to enqueue-only mode, terminating all worker threads\n", + queueGetID(pThis)); + queueWrkThrdReqTrm(pThis, eWRKTHRD_SHUTDOWN_IMMEDIATE, 0); + } else { + /* switch back to regular mode */ + ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */ + } + } + + pThis->bEnqOnly = bEnqOnly; + +finalize_it: + pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + return iRet; +} + + /* some simple object access methods */ DEFpropSetMeth(queue, iPersistUpdCnt, int); DEFpropSetMeth(queue, toQShutdown, long); diff --git a/queue.h b/queue.h index 03a3517b..e68467b9 100644 --- a/queue.h +++ b/queue.h @@ -63,23 +63,31 @@ typedef struct qLinkedList_S { /* commands and states for worker threads. */ typedef enum { eWRKTHRD_STOPPED = 0, /* worker thread is not running (either actually never ran or was shut down) */ - eWRKTHRD_TERMINATED = 1,/* worker thread has shut down, but some finalzing is still needed */ + eWRKTHRD_TERMINATING = 1,/* worker thread has shut down, but some finalzing is still needed */ /* ALL active states MUST be numerically higher than eWRKTHRD_TERMINATED and NONE must be lower! */ - eWRKTHRD_RUN_INIT = 2, /* worker thread is initializing, but not yet fully running */ - eWRKTHRD_RUNNING = 3, /* worker thread is up and running and shall continue to do so */ - eWRKTHRD_SHUTDOWN = 4, /* worker thread is running but shall terminate when queue is empty */ - eWRKTHRD_SHUTDOWN_IMMEDIATE = 5/* worker thread is running but shall terminate even if queue is full */ + eWRKTHRD_RUN_CREATED = 2,/* worker thread has been created, but not yet begun initialization (prob. not yet scheduled) */ + eWRKTHRD_RUN_INIT = 3, /* worker thread is initializing, but not yet fully running */ + eWRKTHRD_RUNNING = 4, /* worker thread is up and running and shall continue to do so */ + eWRKTHRD_SHUTDOWN = 5, /* worker thread is running but shall terminate when queue is empty */ + eWRKTHRD_SHUTDOWN_IMMEDIATE = 6/* worker thread is running but shall terminate even if queue is full */ } qWrkCmd_t; typedef struct qWrkThrd_s { pthread_t thrdID; /* thread ID */ qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ + obj_t *pUsr; /* current user object being processed (or NULL if none) */ + struct queue_s *pQueue; /* my queue (important if only the work thread instance is passed! */ + int iThrd; /* my worker thread array index */ + pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */ } qWrkThrd_t; /* type for queue worker threads */ /* the queue object */ typedef struct queue_s { BEGINobjInstance; queueType_t qType; + int bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */ + int bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */ + int bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */ int iQueueSize; /* Current number of elements in the queue */ int iMaxQueueSize; /* how large can the queue grow? */ int iNumWorkerThreads;/* number of worker threads to use */ @@ -135,6 +143,7 @@ typedef struct queue_s { pthread_mutex_t mutDA; /* mutex for low water mark algo */ pthread_cond_t condDA; /* and its matching condition */ struct queue_s *pqDA; /* queue for disk-assisted modes */ + int bDAEnqOnly; /* EnqOnly setting for DA queue */ /* now follow queueing mode specific data elements */ union { /* different data elements based on queue type (qType) */ struct { @@ -152,8 +161,19 @@ typedef struct queue_s { } tVars; } queue_t; +/* some symbolic constants for easier reference */ +#define QUEUE_MODE_ENQDEQ 0 +#define QUEUE_MODE_ENQONLY 1 + +/* the define below is an "eternal" timeout for the timeout settings which require a value. + * It is one day, which is not really eternal, but comes close to it if we think about + * rsyslog (e.g.: do you want to wait on shutdown for more than a day? ;)) + * rgerhards, 2008-01-17 + */ +#define QUEUE_TIMEOUT_ETERNAL 24 * 60 * 60 * 1000 + /* prototypes */ -rsRetVal queueDestruct(queue_t *pThis); +rsRetVal queueDestruct(queue_t **ppThis); rsRetVal queueEnqObj(queue_t *pThis, void *pUsr); rsRetVal queueStart(queue_t *pThis); rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize); diff --git a/rsyslog.h b/rsyslog.h index e37263e3..619836d8 100644 --- a/rsyslog.h +++ b/rsyslog.h @@ -128,7 +128,7 @@ typedef enum rsRetVal_ rsRetVal; /**< friendly type for global return value */ #define CHKiRet_Hdlr(code) if((iRet = code) != RS_RET_OK) /* macro below is used in conjunction with CHKiRet_Hdlr, else use ABORT_FINALIZE */ #define FINALIZE goto finalize_it; -#if 0 /* DEV debug: set to 1 to get a rough call trace -- rgerhards, 2008-01-13 */ +#if 1 /* DEV debug: set to 1 to get a rough call trace -- rgerhards, 2008-01-13 */ # define DEFiRet dbgprintf("Entering %s, line %d\n", __FILE__, __LINE__); rsRetVal iRet = RS_RET_OK #else # define DEFiRet rsRetVal iRet = RS_RET_OK diff --git a/stream.c b/stream.c index 27fc8a41..15d9dcf6 100644 --- a/stream.c +++ b/stream.c @@ -324,10 +324,13 @@ finalize_it: /* destructor for the strm object */ -rsRetVal strmDestruct(strm_t *pThis) +rsRetVal strmDestruct(strm_t **ppThis) { + strm_t *pThis; DEFiRet; + assert(ppThis != NULL); + pThis = *ppThis; ISOBJ_TYPE_assert(pThis, strm); if(pThis->tOperationsMode == STREAMMODE_WRITE) @@ -342,6 +345,7 @@ rsRetVal strmDestruct(strm_t *pThis) /* and finally delete the strm objet itself */ free(pThis); + *ppThis = NULL; return iRet; } diff --git a/stream.h b/stream.h index 34eb78e2..0ac32b12 100644 --- a/stream.h +++ b/stream.h @@ -91,7 +91,7 @@ typedef struct strm_s { /* prototypes */ rsRetVal strmConstruct(strm_t **ppThis); rsRetVal strmConstructFinalize(strm_t __attribute__((unused)) *pThis); -rsRetVal strmDestruct(strm_t *pThis); +rsRetVal strmDestruct(strm_t **ppThis); rsRetVal strmSetMaxFileSize(strm_t *pThis, size_t iMaxFileSize); rsRetVal strmSetFileName(strm_t *pThis, uchar *pszName, size_t iLenName); rsRetVal strmReadChar(strm_t *pThis, uchar *pC); diff --git a/syslogd.c b/syslogd.c index 99f90402..f62737cc 100644 --- a/syslogd.c +++ b/syslogd.c @@ -1729,7 +1729,7 @@ static rsRetVal callAction(msg_t *pMsg, action_t *pAction) pAction->f_prevcount, time(NULL) - pAction->f_time, repeatinterval[pAction->f_repeatcount]); /* use current message, so we have the new timestamp (means we need to discard previous one) */ - MsgDestruct(pAction->f_pMsg); + MsgDestruct(&pAction->f_pMsg); pAction->f_pMsg = MsgAddRef(pMsg); /* If domark would have logged this by now, flush it now (so we don't hold * isolated messages), but back off so we'll flush less often in the future. @@ -1749,7 +1749,7 @@ static rsRetVal callAction(msg_t *pMsg, action_t *pAction) /* we do not care about iRet above - I think it's right but if we have * some troubles, you know where to look at ;) -- rgerhards, 2007-08-01 */ - MsgDestruct(pAction->f_pMsg); + MsgDestruct(&pAction->f_pMsg); } pAction->f_pMsg = MsgAddRef(pMsg); /* call the output driver */ @@ -1848,7 +1848,7 @@ msgConsumer(void *pUsr) assert(pMsg != NULL); processMsg(pMsg); - MsgDestruct(pMsg); + MsgDestruct(&pMsg); return RS_RET_OK; } @@ -2267,14 +2267,14 @@ logmsg(int pri, msg_t *pMsg, int flags) dbgprintf("Message has syslog-protocol format.\n"); setProtocolVersion(pMsg, 1); if(parseRFCSyslogMsg(pMsg, flags) == 1) { - MsgDestruct(pMsg); + MsgDestruct(&pMsg); return; } } else { /* we have legacy syslog */ dbgprintf("Message has legacy syslog format.\n"); setProtocolVersion(pMsg, 0); if(parseLegacySyslogMsg(pMsg, flags) == 1) { - MsgDestruct(pMsg); + MsgDestruct(&pMsg); return; } } @@ -2428,7 +2428,7 @@ finalize_it: * message object will be discarded by our callers, so this is nothing * of our business. rgerhards, 2007-07-10 */ - MsgDestruct(pAction->f_pMsg); + MsgDestruct(&pAction->f_pMsg); pAction->f_pMsg = pMsgSave; /* restore it */ } @@ -2617,7 +2617,7 @@ die(int sig) /* drain queue (if configured so) and stop main queue worker thread pool */ dbgprintf("Terminating main queue...\n"); - queueDestruct(pMsgQueue); + queueDestruct(&pMsgQueue); pMsgQueue = NULL; /* Free ressources and close connections. This includes flushing any remaining @@ -3296,7 +3296,7 @@ init(void) /* delete the message queue, which also flushes all messages left over */ if(pMsgQueue != NULL) { dbgprintf("deleting main message queue\n"); - queueDestruct(pMsgQueue); /* delete pThis here! */ + queueDestruct(&pMsgQueue); /* delete pThis here! */ pMsgQueue = NULL; } -- cgit