/* threads.c
*
* This file implements threading support helpers (and maybe the thread object)
* for rsyslog.
*
* File begun on 2007-12-14 by RGerhards
*
* Copyright 2007, 2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
* Rsyslog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Rsyslog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Rsyslog. If not, see .
*
* A copy of the GPL can be found in the file "COPYING" in this distribution.
*/
#include "config.h"
#include
#include
#include
#include
#include
#include
#include "rsyslog.h"
#include "dirty.h"
#include "linkedlist.h"
#include "threads.h"
#include "srUtils.h"
/* linked list of currently-known threads */
static linkedList_t llThrds;
/* methods */
/* Construct a new thread object
*/
static rsRetVal
thrdConstruct(thrdInfo_t **ppThis)
{
DEFiRet;
thrdInfo_t *pThis;
assert(ppThis != NULL);
CHKmalloc(pThis = calloc(1, sizeof(thrdInfo_t)));
pthread_mutex_init(&pThis->mutThrd, NULL);
pthread_cond_init(&pThis->condThrdTerm, NULL);
*ppThis = pThis;
finalize_it:
RETiRet;
}
/* Destructs a thread object. The object must not be linked to the
* linked list of threads. Please note that the thread should have been
* stopped before. If not, we try to do it.
*/
static rsRetVal thrdDestruct(thrdInfo_t *pThis)
{
DEFiRet;
assert(pThis != NULL);
if(pThis->bIsActive == 1) {
thrdTerminate(pThis);
}
pthread_mutex_destroy(&pThis->mutThrd);
pthread_cond_destroy(&pThis->condThrdTerm);
free(pThis);
RETiRet;
}
/* terminate a thread via the non-cancel interface
* This is a separate function as it involves a bit more of code.
* rgerhads, 2009-10-15
*/
static inline rsRetVal
thrdTerminateNonCancel(thrdInfo_t *pThis)
{
struct timespec tTimeout;
int ret;
DEFiRet;
assert(pThis != NULL);
DBGPRINTF("request term via SIGTTIN for input thread 0x%x\n", (unsigned) pThis->thrdID);
pThis->bShallStop = TRUE;
do {
d_pthread_mutex_lock(&pThis->mutThrd);
pthread_kill(pThis->thrdID, SIGTTIN);
timeoutComp(&tTimeout, 10); /* a fixed 10ms timeout, do after lock (may take long!) */
ret = d_pthread_cond_timedwait(&pThis->condThrdTerm, &pThis->mutThrd, &tTimeout);
d_pthread_mutex_unlock(&pThis->mutThrd);
if(Debug) {
if(ret == ETIMEDOUT) {
dbgprintf("input thread term: had a timeout waiting on thread termination\n");
} else if(ret == 0) {
dbgprintf("input thread term: thread returned normally and is terminated\n");
} else {
char errStr[1024];
int err = errno;
rs_strerror_r(err, errStr, sizeof(errStr));
dbgprintf("input thread term: cond_wait returned with error %d: %s\n",
err, errStr);
}
}
} while(pThis->bIsActive);
DBGPRINTF("non-cancel input thread termination succeeded for thread 0x%x\n", (unsigned) pThis->thrdID);
RETiRet;
}
/* terminate a thread gracefully.
*/
rsRetVal thrdTerminate(thrdInfo_t *pThis)
{
DEFiRet;
assert(pThis != NULL);
if(pThis->bNeedsCancel) {
DBGPRINTF("request term via canceling for input thread 0x%x\n", (unsigned) pThis->thrdID);
pthread_cancel(pThis->thrdID);
pThis->bIsActive = 0;
} else {
thrdTerminateNonCancel(pThis);
}
pthread_join(pThis->thrdID, NULL); /* wait for input thread to complete */
/* call cleanup function, if any */
if(pThis->pAfterRun != NULL)
pThis->pAfterRun(pThis);
RETiRet;
}
/* terminate all known threads gracefully.
*/
rsRetVal thrdTerminateAll(void)
{
DEFiRet;
llDestroy(&llThrds);
RETiRet;
}
/* This is an internal wrapper around the user thread function. Its
* purpose is to handle all the necessary housekeeping stuff so that the
* user function needs not to be aware of the threading calls. The user
* function call has just "normal", non-threading semantics.
* rgerhards, 2007-12-17
*/
static void* thrdStarter(void *arg)
{
DEFiRet;
thrdInfo_t *pThis = (thrdInfo_t*) arg;
assert(pThis != NULL);
assert(pThis->pUsrThrdMain != NULL);
/* block all signals */
sigset_t sigSet;
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
/* but ignore SIGTTN, which we (ab)use to signal the thread to shutdown -- rgerhards, 2009-07-20 */
sigemptyset(&sigSet);
sigaddset(&sigSet, SIGTTIN);
pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL);
/* setup complete, we are now ready to execute the user code. We will not
* regain control until the user code is finished, in which case we terminate
* the thread.
*/
iRet = pThis->pUsrThrdMain(pThis);
dbgprintf("thrdStarter: usrThrdMain 0x%lx returned with iRet %d, exiting now.\n", (unsigned long) pThis->thrdID, iRet);
/* signal master control that we exit (we do the mutex lock mostly to
* keep the thread debugger happer, it would not really be necessary with
* the logic we employ...)
*/
pThis->bIsActive = 0;
d_pthread_mutex_lock(&pThis->mutThrd);
pthread_cond_signal(&pThis->condThrdTerm);
d_pthread_mutex_unlock(&pThis->mutThrd);
ENDfunc
pthread_exit(0);
}
/* Start a new thread and add it to the list of currently
* executing threads. It is added at the end of the list.
* rgerhards, 2007-12-14
*/
rsRetVal thrdCreate(rsRetVal (*thrdMain)(thrdInfo_t*), rsRetVal(*afterRun)(thrdInfo_t *), sbool bNeedsCancel)
{
DEFiRet;
thrdInfo_t *pThis;
int i;
assert(thrdMain != NULL);
CHKiRet(thrdConstruct(&pThis));
pThis->bIsActive = 1;
pThis->pUsrThrdMain = thrdMain;
pThis->pAfterRun = afterRun;
pThis->bNeedsCancel = bNeedsCancel;
i = pthread_create(&pThis->thrdID, NULL, thrdStarter, pThis);
CHKiRet(llAppend(&llThrds, NULL, pThis));
finalize_it:
RETiRet;
}
/* initialize the thread-support subsystem
* must be called once at the start of the program
*/
rsRetVal thrdInit(void)
{
DEFiRet;
iRet = llInit(&llThrds, thrdDestruct, NULL, NULL);
RETiRet;
}
/* de-initialize the thread subsystem
* must be called once at the end of the program
*/
rsRetVal thrdExit(void)
{
DEFiRet;
iRet = llDestroy(&llThrds);
RETiRet;
}
/* vi:set ai:
*/