diff options
author | Simo Sorce <simo@redhat.com> | 2012-01-17 19:19:44 -0500 |
---|---|---|
committer | Simo Sorce <simo@redhat.com> | 2012-01-17 23:51:17 -0500 |
commit | 1d62ecb4261c30c8312f765f81ad9b4c75334a33 (patch) | |
tree | 0b3ac8467a000d0f3d59b1ff756346f7d3fce992 | |
parent | 6e78f9028693fa17bbdc89dfd64111c76c2c9981 (diff) | |
download | gss-proxy-1d62ecb4261c30c8312f765f81ad9b4c75334a33.tar.gz gss-proxy-1d62ecb4261c30c8312f765f81ad9b4c75334a33.tar.xz gss-proxy-1d62ecb4261c30c8312f765f81ad9b4c75334a33.zip |
Add worker threads
-rw-r--r-- | proxy/Makefile.am | 1 | ||||
-rw-r--r-- | proxy/configure.ac | 7 | ||||
-rw-r--r-- | proxy/external/ax_pthread.m4 | 309 | ||||
-rw-r--r-- | proxy/src/gp_config.c | 2 | ||||
-rw-r--r-- | proxy/src/gp_socket.c | 17 | ||||
-rw-r--r-- | proxy/src/gp_utils.h | 16 | ||||
-rw-r--r-- | proxy/src/gp_workers.c | 398 | ||||
-rw-r--r-- | proxy/src/gssproxy.c | 7 |
8 files changed, 750 insertions, 7 deletions
diff --git a/proxy/Makefile.am b/proxy/Makefile.am index d1e4940..29ecfa1 100644 --- a/proxy/Makefile.am +++ b/proxy/Makefile.am @@ -85,6 +85,7 @@ gssproxy_SOURCES = \ src/gp_config.c \ src/gp_init.c \ src/gp_socket.c \ + src/gp_workers.c \ src/gssproxy.c gssproxy_LDADD = \ $(GSS_PROXY_LIBS) diff --git a/proxy/configure.ac b/proxy/configure.ac index 0ac439a..009a5ea 100644 --- a/proxy/configure.ac +++ b/proxy/configure.ac @@ -73,6 +73,7 @@ m4_include([external/sizes.m4]) m4_include([external/selinux.m4]) m4_include([external/libkeyutils.m4]) m4_include([external/systemd.m4]) +m4_include([external/ax_pthread.m4]) PKG_CHECK_MODULES([VERTO], [libverto >= 0.2.2], [have_libverto=1], [have_libverto=]) if test x$have_libverto = x; then @@ -88,6 +89,12 @@ AC_CHECK_HEADERS([iniparser.h], AC_SUBST(INI_LIBS) +AX_PTHREAD(,[AC_MSG_ERROR([Could not find Pthreads support])]) + +LIBS="$PTHREAD_LIBS $LIBS" +LIBS="$PTHREAD_CFLAGS $CFLAGS" +CC="$PTHREAD_CC" + WITH_INITSCRIPT if test x$initscript = xsystemd; then WITH_SYSTEMD_UNIT_DIR diff --git a/proxy/external/ax_pthread.m4 b/proxy/external/ax_pthread.m4 new file mode 100644 index 0000000..e20a388 --- /dev/null +++ b/proxy/external/ax_pthread.m4 @@ -0,0 +1,309 @@ +# =========================================================================== +# http://www.gnu.org/software/autoconf-archive/ax_pthread.html +# =========================================================================== +# +# SYNOPSIS +# +# AX_PTHREAD([ACTION-IF-FOUND[, ACTION-IF-NOT-FOUND]]) +# +# DESCRIPTION +# +# This macro figures out how to build C programs using POSIX threads. It +# sets the PTHREAD_LIBS output variable to the threads library and linker +# flags, and the PTHREAD_CFLAGS output variable to any special C compiler +# flags that are needed. (The user can also force certain compiler +# flags/libs to be tested by setting these environment variables.) +# +# Also sets PTHREAD_CC to any special C compiler that is needed for +# multi-threaded programs (defaults to the value of CC otherwise). (This +# is necessary on AIX to use the special cc_r compiler alias.) +# +# NOTE: You are assumed to not only compile your program with these flags, +# but also link it with them as well. e.g. you should link with +# $PTHREAD_CC $CFLAGS $PTHREAD_CFLAGS $LDFLAGS ... $PTHREAD_LIBS $LIBS +# +# If you are only building threads programs, you may wish to use these +# variables in your default LIBS, CFLAGS, and CC: +# +# LIBS="$PTHREAD_LIBS $LIBS" +# CFLAGS="$CFLAGS $PTHREAD_CFLAGS" +# CC="$PTHREAD_CC" +# +# In addition, if the PTHREAD_CREATE_JOINABLE thread-attribute constant +# has a nonstandard name, defines PTHREAD_CREATE_JOINABLE to that name +# (e.g. PTHREAD_CREATE_UNDETACHED on AIX). +# +# Also HAVE_PTHREAD_PRIO_INHERIT is defined if pthread is found and the +# PTHREAD_PRIO_INHERIT symbol is defined when compiling with +# PTHREAD_CFLAGS. +# +# ACTION-IF-FOUND is a list of shell commands to run if a threads library +# is found, and ACTION-IF-NOT-FOUND is a list of commands to run it if it +# is not found. If ACTION-IF-FOUND is not specified, the default action +# will define HAVE_PTHREAD. +# +# Please let the authors know if this macro fails on any platform, or if +# you have any other suggestions or comments. This macro was based on work +# by SGJ on autoconf scripts for FFTW (http://www.fftw.org/) (with help +# from M. Frigo), as well as ac_pthread and hb_pthread macros posted by +# Alejandro Forero Cuervo to the autoconf macro repository. We are also +# grateful for the helpful feedback of numerous users. +# +# Updated for Autoconf 2.68 by Daniel Richard G. +# +# LICENSE +# +# Copyright (c) 2008 Steven G. Johnson <stevenj@alum.mit.edu> +# Copyright (c) 2011 Daniel Richard G. <skunk@iSKUNK.ORG> +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program. If not, see <http://www.gnu.org/licenses/>. +# +# As a special exception, the respective Autoconf Macro's copyright owner +# gives unlimited permission to copy, distribute and modify the configure +# scripts that are the output of Autoconf when processing the Macro. You +# need not follow the terms of the GNU General Public License when using +# or distributing such scripts, even though portions of the text of the +# Macro appear in them. The GNU General Public License (GPL) does govern +# all other use of the material that constitutes the Autoconf Macro. +# +# This special exception to the GPL applies to versions of the Autoconf +# Macro released by the Autoconf Archive. When you make and distribute a +# modified version of the Autoconf Macro, you may extend this special +# exception to the GPL to apply to your modified version as well. + +#serial 17 + +AU_ALIAS([ACX_PTHREAD], [AX_PTHREAD]) +AC_DEFUN([AX_PTHREAD], [ +AC_REQUIRE([AC_CANONICAL_HOST]) +AC_LANG_PUSH([C]) +ax_pthread_ok=no + +# We used to check for pthread.h first, but this fails if pthread.h +# requires special compiler flags (e.g. on True64 or Sequent). +# It gets checked for in the link test anyway. + +# First of all, check if the user has set any of the PTHREAD_LIBS, +# etcetera environment variables, and if threads linking works using +# them: +if test x"$PTHREAD_LIBS$PTHREAD_CFLAGS" != x; then + save_CFLAGS="$CFLAGS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + save_LIBS="$LIBS" + LIBS="$PTHREAD_LIBS $LIBS" + AC_MSG_CHECKING([for pthread_join in LIBS=$PTHREAD_LIBS with CFLAGS=$PTHREAD_CFLAGS]) + AC_TRY_LINK_FUNC(pthread_join, ax_pthread_ok=yes) + AC_MSG_RESULT($ax_pthread_ok) + if test x"$ax_pthread_ok" = xno; then + PTHREAD_LIBS="" + PTHREAD_CFLAGS="" + fi + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" +fi + +# We must check for the threads library under a number of different +# names; the ordering is very important because some systems +# (e.g. DEC) have both -lpthread and -lpthreads, where one of the +# libraries is broken (non-POSIX). + +# Create a list of thread flags to try. Items starting with a "-" are +# C compiler flags, and other items are library names, except for "none" +# which indicates that we try without any flags at all, and "pthread-config" +# which is a program returning the flags for the Pth emulation library. + +ax_pthread_flags="pthreads none -Kthread -kthread lthread -pthread -pthreads -mthreads pthread --thread-safe -mt pthread-config" + +# The ordering *is* (sometimes) important. Some notes on the +# individual items follow: + +# pthreads: AIX (must check this before -lpthread) +# none: in case threads are in libc; should be tried before -Kthread and +# other compiler flags to prevent continual compiler warnings +# -Kthread: Sequent (threads in libc, but -Kthread needed for pthread.h) +# -kthread: FreeBSD kernel threads (preferred to -pthread since SMP-able) +# lthread: LinuxThreads port on FreeBSD (also preferred to -pthread) +# -pthread: Linux/gcc (kernel threads), BSD/gcc (userland threads) +# -pthreads: Solaris/gcc +# -mthreads: Mingw32/gcc, Lynx/gcc +# -mt: Sun Workshop C (may only link SunOS threads [-lthread], but it +# doesn't hurt to check since this sometimes defines pthreads too; +# also defines -D_REENTRANT) +# ... -mt is also the pthreads flag for HP/aCC +# pthread: Linux, etcetera +# --thread-safe: KAI C++ +# pthread-config: use pthread-config program (for GNU Pth library) + +case "${host_cpu}-${host_os}" in + *solaris*) + + # On Solaris (at least, for some versions), libc contains stubbed + # (non-functional) versions of the pthreads routines, so link-based + # tests will erroneously succeed. (We need to link with -pthreads/-mt/ + # -lpthread.) (The stubs are missing pthread_cleanup_push, or rather + # a function called by this macro, so we could check for that, but + # who knows whether they'll stub that too in a future libc.) So, + # we'll just look for -pthreads and -lpthread first: + + ax_pthread_flags="-pthreads pthread -mt -pthread $ax_pthread_flags" + ;; + + *-darwin*) + ax_pthread_flags="-pthread $ax_pthread_flags" + ;; +esac + +if test x"$ax_pthread_ok" = xno; then +for flag in $ax_pthread_flags; do + + case $flag in + none) + AC_MSG_CHECKING([whether pthreads work without any flags]) + ;; + + -*) + AC_MSG_CHECKING([whether pthreads work with $flag]) + PTHREAD_CFLAGS="$flag" + ;; + + pthread-config) + AC_CHECK_PROG(ax_pthread_config, pthread-config, yes, no) + if test x"$ax_pthread_config" = xno; then continue; fi + PTHREAD_CFLAGS="`pthread-config --cflags`" + PTHREAD_LIBS="`pthread-config --ldflags` `pthread-config --libs`" + ;; + + *) + AC_MSG_CHECKING([for the pthreads library -l$flag]) + PTHREAD_LIBS="-l$flag" + ;; + esac + + save_LIBS="$LIBS" + save_CFLAGS="$CFLAGS" + LIBS="$PTHREAD_LIBS $LIBS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + + # Check for various functions. We must include pthread.h, + # since some functions may be macros. (On the Sequent, we + # need a special flag -Kthread to make this header compile.) + # We check for pthread_join because it is in -lpthread on IRIX + # while pthread_create is in libc. We check for pthread_attr_init + # due to DEC craziness with -lpthreads. We check for + # pthread_cleanup_push because it is one of the few pthread + # functions on Solaris that doesn't have a non-functional libc stub. + # We try pthread_create on general principles. + AC_LINK_IFELSE([AC_LANG_PROGRAM([#include <pthread.h> + static void routine(void *a) { a = 0; } + static void *start_routine(void *a) { return a; }], + [pthread_t th; pthread_attr_t attr; + pthread_create(&th, 0, start_routine, 0); + pthread_join(th, 0); + pthread_attr_init(&attr); + pthread_cleanup_push(routine, 0); + pthread_cleanup_pop(0) /* ; */])], + [ax_pthread_ok=yes], + []) + + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" + + AC_MSG_RESULT($ax_pthread_ok) + if test "x$ax_pthread_ok" = xyes; then + break; + fi + + PTHREAD_LIBS="" + PTHREAD_CFLAGS="" +done +fi + +# Various other checks: +if test "x$ax_pthread_ok" = xyes; then + save_LIBS="$LIBS" + LIBS="$PTHREAD_LIBS $LIBS" + save_CFLAGS="$CFLAGS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + + # Detect AIX lossage: JOINABLE attribute is called UNDETACHED. + AC_MSG_CHECKING([for joinable pthread attribute]) + attr_name=unknown + for attr in PTHREAD_CREATE_JOINABLE PTHREAD_CREATE_UNDETACHED; do + AC_LINK_IFELSE([AC_LANG_PROGRAM([#include <pthread.h>], + [int attr = $attr; return attr /* ; */])], + [attr_name=$attr; break], + []) + done + AC_MSG_RESULT($attr_name) + if test "$attr_name" != PTHREAD_CREATE_JOINABLE; then + AC_DEFINE_UNQUOTED(PTHREAD_CREATE_JOINABLE, $attr_name, + [Define to necessary symbol if this constant + uses a non-standard name on your system.]) + fi + + AC_MSG_CHECKING([if more special flags are required for pthreads]) + flag=no + case "${host_cpu}-${host_os}" in + *-aix* | *-freebsd* | *-darwin*) flag="-D_THREAD_SAFE";; + *-osf* | *-hpux*) flag="-D_REENTRANT";; + *solaris*) + if test "$GCC" = "yes"; then + flag="-D_REENTRANT" + else + flag="-mt -D_REENTRANT" + fi + ;; + esac + AC_MSG_RESULT(${flag}) + if test "x$flag" != xno; then + PTHREAD_CFLAGS="$flag $PTHREAD_CFLAGS" + fi + + AC_CACHE_CHECK([for PTHREAD_PRIO_INHERIT], + ax_cv_PTHREAD_PRIO_INHERIT, [ + AC_LINK_IFELSE([ + AC_LANG_PROGRAM([[#include <pthread.h>]], [[int i = PTHREAD_PRIO_INHERIT;]])], + [ax_cv_PTHREAD_PRIO_INHERIT=yes], + [ax_cv_PTHREAD_PRIO_INHERIT=no]) + ]) + AS_IF([test "x$ax_cv_PTHREAD_PRIO_INHERIT" = "xyes"], + AC_DEFINE([HAVE_PTHREAD_PRIO_INHERIT], 1, [Have PTHREAD_PRIO_INHERIT.])) + + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" + + # More AIX lossage: must compile with xlc_r or cc_r + if test x"$GCC" != xyes; then + AC_CHECK_PROGS(PTHREAD_CC, xlc_r cc_r, ${CC}) + else + PTHREAD_CC=$CC + fi +else + PTHREAD_CC="$CC" +fi + +AC_SUBST(PTHREAD_LIBS) +AC_SUBST(PTHREAD_CFLAGS) +AC_SUBST(PTHREAD_CC) + +# Finally, execute ACTION-IF-FOUND/ACTION-IF-NOT-FOUND: +if test x"$ax_pthread_ok" = xyes; then + ifelse([$1],,AC_DEFINE(HAVE_PTHREAD,1,[Define if you have POSIX threads libraries and header files.]),[$1]) + : +else + ax_pthread_ok=no + $2 +fi +AC_LANG_POP +])dnl AX_PTHREAD diff --git a/proxy/src/gp_config.c b/proxy/src/gp_config.c index 552539d..c0b4b47 100644 --- a/proxy/src/gp_config.c +++ b/proxy/src/gp_config.c @@ -61,6 +61,8 @@ int load_config(struct gp_config *cfg) } } + cfg->num_workers = iniparser_getint(d, "gssproxy:worker threads", 0); + done: iniparser_freedict(d); return ret; diff --git a/proxy/src/gp_socket.c b/proxy/src/gp_socket.c index 8b360ed..3c80142 100644 --- a/proxy/src/gp_socket.c +++ b/proxy/src/gp_socket.c @@ -233,6 +233,7 @@ static void gp_socket_read(verto_ctx *vctx, verto_ev *ev) uint32_t size; bool header = false; size_t rn; + int ret; int fd; fd = verto_get_fd(ev); @@ -299,10 +300,18 @@ static void gp_socket_read(verto_ctx *vctx, verto_ev *ev) rbuf->pos += rn; if (rbuf->pos == rbuf->size) { - /* got all data hand over packet */ - /* TODO */ - ret = ENOENT; - goto done; + /* got all data, hand over packet */ + ret = gp_query_new(rbuf->conn->gpctx->workers, rbuf->conn, + rbuf->data, rbuf->size); + if (ret != 0) { + /* internal error, not much we can do */ + goto done; + } + + /* we successfully handed over the data */ + rbuf->data = NULL; + gp_buffer_free(rbuf); + return; } ret = EAGAIN; diff --git a/proxy/src/gp_utils.h b/proxy/src/gp_utils.h index 09f3f06..47d766d 100644 --- a/proxy/src/gp_utils.h +++ b/proxy/src/gp_utils.h @@ -34,13 +34,17 @@ #define _(STRING) gettext(STRING) struct gp_config { - char *config_file; - bool daemonize; - char *socket_name; + char *config_file; /* gssproxy configuration file */ + bool daemonize; /* let gssproxy daemonize */ + char *socket_name; /* the socket name to use for */ + int num_workers; /* number of worker threads */ }; +struct gp_workers; + struct gssproxy_ctx { struct gp_config *config; + struct gp_workers *workers; }; struct gp_conn; @@ -60,4 +64,10 @@ void gp_conn_free(struct gp_conn *conn); void gp_socket_send_data(verto_ctx *vctx, struct gp_conn *conn, uint8_t *buffer, size_t buflen); +/* from gp_workers.c */ +struct gp_workers *gp_workers_init(verto_ctx *vctx, struct gp_config *cfg); +void gp_workers_free(struct gp_workers *w); +int gp_query_new(struct gp_workers *w, struct gp_conn *conn, + uint8_t *buffer, size_t buflen); + #endif /* _SRV_UTILS_H_ */ diff --git a/proxy/src/gp_workers.c b/proxy/src/gp_workers.c new file mode 100644 index 0000000..5ccf20f --- /dev/null +++ b/proxy/src/gp_workers.c @@ -0,0 +1,398 @@ +/* + GSS-PROXY + + Copyright (C) 2011 Red Hat, Inc. + Copyright (C) 2011 Simo Sorce <simo.sorce@redhat.com> + + Permission is hereby granted, free of charge, to any person obtaining a + copy of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom the + Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + DEALINGS IN THE SOFTWARE. +*/ + +#include "config.h" +#include <pthread.h> +#include <stdint.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <fcntl.h> +#include <syslog.h> +#include <errno.h> +#include "gp_utils.h" + +#define DEFAULT_WORKER_THREADS_NUM 5 + +#define GP_QUERY_IN 0 +#define GP_QUERY_OUT 1 +#define GP_QUERY_ERR 2 + +struct gp_query { + struct gp_query *next; + + struct gp_conn *conn; + uint8_t *buffer; + size_t buflen; + + int status; +}; + +struct gp_thread { + struct gp_workers *pool; + pthread_t tid; + + /* if query is assigned, then the thread is busy */ + struct gp_query *query; + pthread_mutex_t cond_mutex; + pthread_cond_t cond_wakeup; +}; + +struct gp_workers { + pthread_mutex_t lock; + bool shutdown; + struct gp_query *wait_list; + struct gp_query *reply_list; + struct gp_thread *threads; + int num_threads; + int sig_pipe[2]; +}; + +static void *gp_worker_main(void *pvt); +static void gp_handle_query(struct gp_workers *w, struct gp_query *q); +static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev); + +/** DISPATCHER FUNCTIONS **/ + +struct gp_workers *gp_workers_init(verto_ctx *vctx, struct gp_config *cfg) +{ + struct gp_workers *w; + pthread_attr_t attr; + verto_ev *ev; + int vflags; + int ret; + int i; + + w = calloc(1, sizeof(struct gp_workers)); + if (!w) { + return NULL; + } + + /* init global queue mutex */ + ret = pthread_mutex_init(&w->lock, NULL); + if (ret) { + free(w); + return NULL; + } + + if (cfg->num_workers > 0) { + w->num_threads = cfg->num_workers; + } else { + w->num_threads = DEFAULT_WORKER_THREADS_NUM; + } + + w->threads = calloc(w->num_threads, sizeof(struct gp_thread)); + if (!w->threads) { + ret = -1; + goto done; + } + + /* make thread joinable (portability) */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + /* init all workers */ + for (i = 0; i < w->num_threads; i++) { + ret = pthread_cond_init(&w->threads[i].cond_wakeup, NULL); + if (ret) { + goto done; + } + ret = pthread_mutex_init(&w->threads[i].cond_mutex, NULL); + if (ret) { + goto done; + } + ret = pthread_create(&w->threads[i].tid, &attr, + gp_worker_main, &w->threads[i]); + if (ret) { + goto done; + } + w->threads[i].pool = w; + } + + /* add wakeup pipe, so that threads can hand back replies to the + * dispatcher */ + ret = pipe2(w->sig_pipe, O_NONBLOCK | O_CLOEXEC); + if (ret == -1) { + goto done; + } + + vflags = VERTO_EV_FLAG_PERSIST | VERTO_EV_FLAG_IO_READ; + ev = verto_add_io(vctx, vflags, gp_handle_reply, w->sig_pipe[0]); + if (!ev) { + ret = -1; + goto done; + } + verto_set_private(ev, w, NULL); + + ret = 0; + +done: + if (ret) { + gp_workers_free(w); + } + return w; +} + +void gp_workers_free(struct gp_workers *w) +{ + int ret; + int i; + void *retval; + + ret = pthread_mutex_lock(&w->lock); + if (ret) { + syslog(LOG_CRIT, "Couldn't get mutex!"); + return; + } + + w->shutdown = true; + + ret = pthread_mutex_unlock(&w->lock); + if (ret) { + syslog(LOG_CRIT, "Can't release mutex?!"); + return; + } + + if (w->threads) { + for (i = 0; i < w->num_threads; i++) { + /* wake up threads, then join them */ + /* ======> COND_MUTEX */ + pthread_mutex_lock(&w->threads[i].cond_mutex); + pthread_cond_signal(&w->threads[i].cond_wakeup); + /* <====== COND_MUTEX */ + pthread_mutex_unlock(&w->threads[i].cond_mutex); + + ret = pthread_join(w->threads[i].tid, &retval); + } + + free(w->threads); + w->threads = NULL; + } + + ret = pthread_mutex_destroy(&w->lock); + if (ret) { + syslog(LOG_CRIT, "Failed to destroy mutex?!"); + return; + } + + free(w); +} + +static void gp_query_assign(struct gp_workers *w, struct gp_query *q) +{ + int i; + /* then either find a free thread or queue in the wait list */ + + for (i = 0; q != NULL && i < w->num_threads; i++) { + if (w->threads[i].query != NULL) continue; + + /* ======> COND_MUTEX */ + pthread_mutex_lock(&w->threads[i].cond_mutex); + + if (w->threads[i].query == NULL) { + /* hand over the query */ + w->threads[i].query = q; + q = NULL; + pthread_cond_signal(&w->threads[i].cond_wakeup); + } + + /* <====== COND_MUTEX */ + pthread_mutex_unlock(&w->threads[i].cond_mutex); + } + + if (q) { + /* all threads are busy, store in wait list */ + + /* only the dispatcher handles wait_list + * so we do not need to lock around it */ + q->next = w->wait_list; + w->wait_list = q; + q = NULL; + } +} + +static void gp_query_free(struct gp_query *q, bool free_buffer) +{ + if (!q) { + return; + } + + if (free_buffer) { + free(q->buffer); + } + + free(q); +} + +int gp_query_new(struct gp_workers *w, struct gp_conn *conn, + uint8_t *buffer, size_t buflen) +{ + struct gp_query *q; + + /* create query struct */ + q = calloc(1, sizeof(struct gp_query)); + if (!q) { + return ENOMEM; + } + + q->conn = conn; + q->buffer = buffer; + q->buflen = buflen; + + gp_query_assign(w, q); + + return 0; +} + +static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev) +{ + struct gp_workers *w; + struct gp_query *q = NULL; + char dummy; + int ret; + + w = verto_get_private(ev); + + /* first read out the dummy so the pipe doesn't get clogged */ + ret = read(w->sig_pipe[0], &dummy, 1); + if (ret) { + /* ignore errors */ + } + + /* grab a query reply if any */ + if (w->reply_list) { + /* ======> POOL LOCK */ + pthread_mutex_lock(&w->lock); + + if (w->reply_list != NULL) { + q = w->reply_list; + w->reply_list = q->next; + } + + /* <====== POOL LOCK */ + pthread_mutex_unlock(&w->lock); + } + + if (q) { + switch (q->status) { + case GP_QUERY_IN: + /* ?! fallback and kill client conn */ + case GP_QUERY_ERR: + gp_conn_free(q->conn); + gp_query_free(q, true); + break; + + case GP_QUERY_OUT: + gp_socket_send_data(vctx, q->conn, q->buffer, q->buflen); + gp_query_free(q, false); + break; + } + } + + /* while we are at it, check if there is anything in the wait list + * we need to process, as one thread just got free :-) */ + + q = NULL; + + if (w->wait_list) { + /* only the dispatcher handles wait_list + * so we do not need to lock around it */ + if (w->wait_list) { + q = w->wait_list; + w->wait_list = q->next; + q->next = NULL; + } + } + + if (q) { + gp_query_assign(w, q); + } +} + + +/** WORKER THREADS **/ + +static void *gp_worker_main(void *pvt) +{ + struct gp_thread *t = (struct gp_thread *)pvt; + struct gp_query *q = NULL; + char dummy = 0; + int ret; + + while (!t->pool->shutdown) { + + /* wait for next query */ + if (t->query == NULL) { + /* ======> COND_MUTEX */ + pthread_mutex_lock(&t->cond_mutex); + while (t->query == NULL) { + pthread_cond_wait(&t->cond_wakeup, &t->cond_mutex); + if (t->pool->shutdown) { + pthread_exit(NULL); + } + } + + /* grab the query off the shared pointer */ + q = t->query; + t->query = NULL; + + /* <====== COND_MUTEX */ + pthread_mutex_unlock(&t->cond_mutex); + } + + /* handle the client request */ + gp_handle_query(t->pool, q); + + /* now get lock on main queue, to play with the reply list */ + /* ======> POOL LOCK */ + pthread_mutex_lock(&t->pool->lock); + + /* put back query so that dispatcher can send reply */ + q->next = t->pool->reply_list; + t->pool->reply_list = q; + + /* <====== POOL LOCK */ + pthread_mutex_unlock(&t->pool->lock); + + /* and wake up dispatcher so it will handle it */ + ret = write(t->pool->sig_pipe[1], &dummy, 1); + if (ret == -1) { + syslog(LOG_ERR, "Failed to signal dispatcher!"); + } + } + + pthread_exit(NULL); +} + +static void gp_handle_query(struct gp_workers *w, struct gp_query *q) +{ + /* TODO */ + + free(q->buffer); + q->buffer = strdup("WHATS UP?"); + q->buflen = strlen(q->buffer); + q->status = GP_QUERY_OUT; +} + diff --git a/proxy/src/gssproxy.c b/proxy/src/gssproxy.c index ba11fa2..9bfad08 100644 --- a/proxy/src/gssproxy.c +++ b/proxy/src/gssproxy.c @@ -107,8 +107,15 @@ int main(int argc, const char *argv[]) } verto_set_private(ev, gpctx, NULL); + gpctx->workers = gp_workers_init(vctx, gpctx->config); + if (!gpctx->workers) { + exit(EXIT_FAILURE); + } + verto_run(vctx); + gp_workers_free(gpctx->workers); + fini_server(); poptFreeContext(pc); |