summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimo Sorce <simo@redhat.com>2012-01-17 19:19:44 -0500
committerSimo Sorce <simo@redhat.com>2012-01-17 23:51:17 -0500
commit1d62ecb4261c30c8312f765f81ad9b4c75334a33 (patch)
tree0b3ac8467a000d0f3d59b1ff756346f7d3fce992
parent6e78f9028693fa17bbdc89dfd64111c76c2c9981 (diff)
downloadgss-proxy-1d62ecb4261c30c8312f765f81ad9b4c75334a33.tar.gz
gss-proxy-1d62ecb4261c30c8312f765f81ad9b4c75334a33.tar.xz
gss-proxy-1d62ecb4261c30c8312f765f81ad9b4c75334a33.zip
Add worker threads
-rw-r--r--proxy/Makefile.am1
-rw-r--r--proxy/configure.ac7
-rw-r--r--proxy/external/ax_pthread.m4309
-rw-r--r--proxy/src/gp_config.c2
-rw-r--r--proxy/src/gp_socket.c17
-rw-r--r--proxy/src/gp_utils.h16
-rw-r--r--proxy/src/gp_workers.c398
-rw-r--r--proxy/src/gssproxy.c7
8 files changed, 750 insertions, 7 deletions
diff --git a/proxy/Makefile.am b/proxy/Makefile.am
index d1e4940..29ecfa1 100644
--- a/proxy/Makefile.am
+++ b/proxy/Makefile.am
@@ -85,6 +85,7 @@ gssproxy_SOURCES = \
src/gp_config.c \
src/gp_init.c \
src/gp_socket.c \
+ src/gp_workers.c \
src/gssproxy.c
gssproxy_LDADD = \
$(GSS_PROXY_LIBS)
diff --git a/proxy/configure.ac b/proxy/configure.ac
index 0ac439a..009a5ea 100644
--- a/proxy/configure.ac
+++ b/proxy/configure.ac
@@ -73,6 +73,7 @@ m4_include([external/sizes.m4])
m4_include([external/selinux.m4])
m4_include([external/libkeyutils.m4])
m4_include([external/systemd.m4])
+m4_include([external/ax_pthread.m4])
PKG_CHECK_MODULES([VERTO], [libverto >= 0.2.2], [have_libverto=1], [have_libverto=])
if test x$have_libverto = x; then
@@ -88,6 +89,12 @@ AC_CHECK_HEADERS([iniparser.h],
AC_SUBST(INI_LIBS)
+AX_PTHREAD(,[AC_MSG_ERROR([Could not find Pthreads support])])
+
+LIBS="$PTHREAD_LIBS $LIBS"
+LIBS="$PTHREAD_CFLAGS $CFLAGS"
+CC="$PTHREAD_CC"
+
WITH_INITSCRIPT
if test x$initscript = xsystemd; then
WITH_SYSTEMD_UNIT_DIR
diff --git a/proxy/external/ax_pthread.m4 b/proxy/external/ax_pthread.m4
new file mode 100644
index 0000000..e20a388
--- /dev/null
+++ b/proxy/external/ax_pthread.m4
@@ -0,0 +1,309 @@
+# ===========================================================================
+# http://www.gnu.org/software/autoconf-archive/ax_pthread.html
+# ===========================================================================
+#
+# SYNOPSIS
+#
+# AX_PTHREAD([ACTION-IF-FOUND[, ACTION-IF-NOT-FOUND]])
+#
+# DESCRIPTION
+#
+# This macro figures out how to build C programs using POSIX threads. It
+# sets the PTHREAD_LIBS output variable to the threads library and linker
+# flags, and the PTHREAD_CFLAGS output variable to any special C compiler
+# flags that are needed. (The user can also force certain compiler
+# flags/libs to be tested by setting these environment variables.)
+#
+# Also sets PTHREAD_CC to any special C compiler that is needed for
+# multi-threaded programs (defaults to the value of CC otherwise). (This
+# is necessary on AIX to use the special cc_r compiler alias.)
+#
+# NOTE: You are assumed to not only compile your program with these flags,
+# but also link it with them as well. e.g. you should link with
+# $PTHREAD_CC $CFLAGS $PTHREAD_CFLAGS $LDFLAGS ... $PTHREAD_LIBS $LIBS
+#
+# If you are only building threads programs, you may wish to use these
+# variables in your default LIBS, CFLAGS, and CC:
+#
+# LIBS="$PTHREAD_LIBS $LIBS"
+# CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+# CC="$PTHREAD_CC"
+#
+# In addition, if the PTHREAD_CREATE_JOINABLE thread-attribute constant
+# has a nonstandard name, defines PTHREAD_CREATE_JOINABLE to that name
+# (e.g. PTHREAD_CREATE_UNDETACHED on AIX).
+#
+# Also HAVE_PTHREAD_PRIO_INHERIT is defined if pthread is found and the
+# PTHREAD_PRIO_INHERIT symbol is defined when compiling with
+# PTHREAD_CFLAGS.
+#
+# ACTION-IF-FOUND is a list of shell commands to run if a threads library
+# is found, and ACTION-IF-NOT-FOUND is a list of commands to run it if it
+# is not found. If ACTION-IF-FOUND is not specified, the default action
+# will define HAVE_PTHREAD.
+#
+# Please let the authors know if this macro fails on any platform, or if
+# you have any other suggestions or comments. This macro was based on work
+# by SGJ on autoconf scripts for FFTW (http://www.fftw.org/) (with help
+# from M. Frigo), as well as ac_pthread and hb_pthread macros posted by
+# Alejandro Forero Cuervo to the autoconf macro repository. We are also
+# grateful for the helpful feedback of numerous users.
+#
+# Updated for Autoconf 2.68 by Daniel Richard G.
+#
+# LICENSE
+#
+# Copyright (c) 2008 Steven G. Johnson <stevenj@alum.mit.edu>
+# Copyright (c) 2011 Daniel Richard G. <skunk@iSKUNK.ORG>
+#
+# This program is free software: you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
+# Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# As a special exception, the respective Autoconf Macro's copyright owner
+# gives unlimited permission to copy, distribute and modify the configure
+# scripts that are the output of Autoconf when processing the Macro. You
+# need not follow the terms of the GNU General Public License when using
+# or distributing such scripts, even though portions of the text of the
+# Macro appear in them. The GNU General Public License (GPL) does govern
+# all other use of the material that constitutes the Autoconf Macro.
+#
+# This special exception to the GPL applies to versions of the Autoconf
+# Macro released by the Autoconf Archive. When you make and distribute a
+# modified version of the Autoconf Macro, you may extend this special
+# exception to the GPL to apply to your modified version as well.
+
+#serial 17
+
+AU_ALIAS([ACX_PTHREAD], [AX_PTHREAD])
+AC_DEFUN([AX_PTHREAD], [
+AC_REQUIRE([AC_CANONICAL_HOST])
+AC_LANG_PUSH([C])
+ax_pthread_ok=no
+
+# We used to check for pthread.h first, but this fails if pthread.h
+# requires special compiler flags (e.g. on True64 or Sequent).
+# It gets checked for in the link test anyway.
+
+# First of all, check if the user has set any of the PTHREAD_LIBS,
+# etcetera environment variables, and if threads linking works using
+# them:
+if test x"$PTHREAD_LIBS$PTHREAD_CFLAGS" != x; then
+ save_CFLAGS="$CFLAGS"
+ CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+ save_LIBS="$LIBS"
+ LIBS="$PTHREAD_LIBS $LIBS"
+ AC_MSG_CHECKING([for pthread_join in LIBS=$PTHREAD_LIBS with CFLAGS=$PTHREAD_CFLAGS])
+ AC_TRY_LINK_FUNC(pthread_join, ax_pthread_ok=yes)
+ AC_MSG_RESULT($ax_pthread_ok)
+ if test x"$ax_pthread_ok" = xno; then
+ PTHREAD_LIBS=""
+ PTHREAD_CFLAGS=""
+ fi
+ LIBS="$save_LIBS"
+ CFLAGS="$save_CFLAGS"
+fi
+
+# We must check for the threads library under a number of different
+# names; the ordering is very important because some systems
+# (e.g. DEC) have both -lpthread and -lpthreads, where one of the
+# libraries is broken (non-POSIX).
+
+# Create a list of thread flags to try. Items starting with a "-" are
+# C compiler flags, and other items are library names, except for "none"
+# which indicates that we try without any flags at all, and "pthread-config"
+# which is a program returning the flags for the Pth emulation library.
+
+ax_pthread_flags="pthreads none -Kthread -kthread lthread -pthread -pthreads -mthreads pthread --thread-safe -mt pthread-config"
+
+# The ordering *is* (sometimes) important. Some notes on the
+# individual items follow:
+
+# pthreads: AIX (must check this before -lpthread)
+# none: in case threads are in libc; should be tried before -Kthread and
+# other compiler flags to prevent continual compiler warnings
+# -Kthread: Sequent (threads in libc, but -Kthread needed for pthread.h)
+# -kthread: FreeBSD kernel threads (preferred to -pthread since SMP-able)
+# lthread: LinuxThreads port on FreeBSD (also preferred to -pthread)
+# -pthread: Linux/gcc (kernel threads), BSD/gcc (userland threads)
+# -pthreads: Solaris/gcc
+# -mthreads: Mingw32/gcc, Lynx/gcc
+# -mt: Sun Workshop C (may only link SunOS threads [-lthread], but it
+# doesn't hurt to check since this sometimes defines pthreads too;
+# also defines -D_REENTRANT)
+# ... -mt is also the pthreads flag for HP/aCC
+# pthread: Linux, etcetera
+# --thread-safe: KAI C++
+# pthread-config: use pthread-config program (for GNU Pth library)
+
+case "${host_cpu}-${host_os}" in
+ *solaris*)
+
+ # On Solaris (at least, for some versions), libc contains stubbed
+ # (non-functional) versions of the pthreads routines, so link-based
+ # tests will erroneously succeed. (We need to link with -pthreads/-mt/
+ # -lpthread.) (The stubs are missing pthread_cleanup_push, or rather
+ # a function called by this macro, so we could check for that, but
+ # who knows whether they'll stub that too in a future libc.) So,
+ # we'll just look for -pthreads and -lpthread first:
+
+ ax_pthread_flags="-pthreads pthread -mt -pthread $ax_pthread_flags"
+ ;;
+
+ *-darwin*)
+ ax_pthread_flags="-pthread $ax_pthread_flags"
+ ;;
+esac
+
+if test x"$ax_pthread_ok" = xno; then
+for flag in $ax_pthread_flags; do
+
+ case $flag in
+ none)
+ AC_MSG_CHECKING([whether pthreads work without any flags])
+ ;;
+
+ -*)
+ AC_MSG_CHECKING([whether pthreads work with $flag])
+ PTHREAD_CFLAGS="$flag"
+ ;;
+
+ pthread-config)
+ AC_CHECK_PROG(ax_pthread_config, pthread-config, yes, no)
+ if test x"$ax_pthread_config" = xno; then continue; fi
+ PTHREAD_CFLAGS="`pthread-config --cflags`"
+ PTHREAD_LIBS="`pthread-config --ldflags` `pthread-config --libs`"
+ ;;
+
+ *)
+ AC_MSG_CHECKING([for the pthreads library -l$flag])
+ PTHREAD_LIBS="-l$flag"
+ ;;
+ esac
+
+ save_LIBS="$LIBS"
+ save_CFLAGS="$CFLAGS"
+ LIBS="$PTHREAD_LIBS $LIBS"
+ CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+
+ # Check for various functions. We must include pthread.h,
+ # since some functions may be macros. (On the Sequent, we
+ # need a special flag -Kthread to make this header compile.)
+ # We check for pthread_join because it is in -lpthread on IRIX
+ # while pthread_create is in libc. We check for pthread_attr_init
+ # due to DEC craziness with -lpthreads. We check for
+ # pthread_cleanup_push because it is one of the few pthread
+ # functions on Solaris that doesn't have a non-functional libc stub.
+ # We try pthread_create on general principles.
+ AC_LINK_IFELSE([AC_LANG_PROGRAM([#include <pthread.h>
+ static void routine(void *a) { a = 0; }
+ static void *start_routine(void *a) { return a; }],
+ [pthread_t th; pthread_attr_t attr;
+ pthread_create(&th, 0, start_routine, 0);
+ pthread_join(th, 0);
+ pthread_attr_init(&attr);
+ pthread_cleanup_push(routine, 0);
+ pthread_cleanup_pop(0) /* ; */])],
+ [ax_pthread_ok=yes],
+ [])
+
+ LIBS="$save_LIBS"
+ CFLAGS="$save_CFLAGS"
+
+ AC_MSG_RESULT($ax_pthread_ok)
+ if test "x$ax_pthread_ok" = xyes; then
+ break;
+ fi
+
+ PTHREAD_LIBS=""
+ PTHREAD_CFLAGS=""
+done
+fi
+
+# Various other checks:
+if test "x$ax_pthread_ok" = xyes; then
+ save_LIBS="$LIBS"
+ LIBS="$PTHREAD_LIBS $LIBS"
+ save_CFLAGS="$CFLAGS"
+ CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+
+ # Detect AIX lossage: JOINABLE attribute is called UNDETACHED.
+ AC_MSG_CHECKING([for joinable pthread attribute])
+ attr_name=unknown
+ for attr in PTHREAD_CREATE_JOINABLE PTHREAD_CREATE_UNDETACHED; do
+ AC_LINK_IFELSE([AC_LANG_PROGRAM([#include <pthread.h>],
+ [int attr = $attr; return attr /* ; */])],
+ [attr_name=$attr; break],
+ [])
+ done
+ AC_MSG_RESULT($attr_name)
+ if test "$attr_name" != PTHREAD_CREATE_JOINABLE; then
+ AC_DEFINE_UNQUOTED(PTHREAD_CREATE_JOINABLE, $attr_name,
+ [Define to necessary symbol if this constant
+ uses a non-standard name on your system.])
+ fi
+
+ AC_MSG_CHECKING([if more special flags are required for pthreads])
+ flag=no
+ case "${host_cpu}-${host_os}" in
+ *-aix* | *-freebsd* | *-darwin*) flag="-D_THREAD_SAFE";;
+ *-osf* | *-hpux*) flag="-D_REENTRANT";;
+ *solaris*)
+ if test "$GCC" = "yes"; then
+ flag="-D_REENTRANT"
+ else
+ flag="-mt -D_REENTRANT"
+ fi
+ ;;
+ esac
+ AC_MSG_RESULT(${flag})
+ if test "x$flag" != xno; then
+ PTHREAD_CFLAGS="$flag $PTHREAD_CFLAGS"
+ fi
+
+ AC_CACHE_CHECK([for PTHREAD_PRIO_INHERIT],
+ ax_cv_PTHREAD_PRIO_INHERIT, [
+ AC_LINK_IFELSE([
+ AC_LANG_PROGRAM([[#include <pthread.h>]], [[int i = PTHREAD_PRIO_INHERIT;]])],
+ [ax_cv_PTHREAD_PRIO_INHERIT=yes],
+ [ax_cv_PTHREAD_PRIO_INHERIT=no])
+ ])
+ AS_IF([test "x$ax_cv_PTHREAD_PRIO_INHERIT" = "xyes"],
+ AC_DEFINE([HAVE_PTHREAD_PRIO_INHERIT], 1, [Have PTHREAD_PRIO_INHERIT.]))
+
+ LIBS="$save_LIBS"
+ CFLAGS="$save_CFLAGS"
+
+ # More AIX lossage: must compile with xlc_r or cc_r
+ if test x"$GCC" != xyes; then
+ AC_CHECK_PROGS(PTHREAD_CC, xlc_r cc_r, ${CC})
+ else
+ PTHREAD_CC=$CC
+ fi
+else
+ PTHREAD_CC="$CC"
+fi
+
+AC_SUBST(PTHREAD_LIBS)
+AC_SUBST(PTHREAD_CFLAGS)
+AC_SUBST(PTHREAD_CC)
+
+# Finally, execute ACTION-IF-FOUND/ACTION-IF-NOT-FOUND:
+if test x"$ax_pthread_ok" = xyes; then
+ ifelse([$1],,AC_DEFINE(HAVE_PTHREAD,1,[Define if you have POSIX threads libraries and header files.]),[$1])
+ :
+else
+ ax_pthread_ok=no
+ $2
+fi
+AC_LANG_POP
+])dnl AX_PTHREAD
diff --git a/proxy/src/gp_config.c b/proxy/src/gp_config.c
index 552539d..c0b4b47 100644
--- a/proxy/src/gp_config.c
+++ b/proxy/src/gp_config.c
@@ -61,6 +61,8 @@ int load_config(struct gp_config *cfg)
}
}
+ cfg->num_workers = iniparser_getint(d, "gssproxy:worker threads", 0);
+
done:
iniparser_freedict(d);
return ret;
diff --git a/proxy/src/gp_socket.c b/proxy/src/gp_socket.c
index 8b360ed..3c80142 100644
--- a/proxy/src/gp_socket.c
+++ b/proxy/src/gp_socket.c
@@ -233,6 +233,7 @@ static void gp_socket_read(verto_ctx *vctx, verto_ev *ev)
uint32_t size;
bool header = false;
size_t rn;
+ int ret;
int fd;
fd = verto_get_fd(ev);
@@ -299,10 +300,18 @@ static void gp_socket_read(verto_ctx *vctx, verto_ev *ev)
rbuf->pos += rn;
if (rbuf->pos == rbuf->size) {
- /* got all data hand over packet */
- /* TODO */
- ret = ENOENT;
- goto done;
+ /* got all data, hand over packet */
+ ret = gp_query_new(rbuf->conn->gpctx->workers, rbuf->conn,
+ rbuf->data, rbuf->size);
+ if (ret != 0) {
+ /* internal error, not much we can do */
+ goto done;
+ }
+
+ /* we successfully handed over the data */
+ rbuf->data = NULL;
+ gp_buffer_free(rbuf);
+ return;
}
ret = EAGAIN;
diff --git a/proxy/src/gp_utils.h b/proxy/src/gp_utils.h
index 09f3f06..47d766d 100644
--- a/proxy/src/gp_utils.h
+++ b/proxy/src/gp_utils.h
@@ -34,13 +34,17 @@
#define _(STRING) gettext(STRING)
struct gp_config {
- char *config_file;
- bool daemonize;
- char *socket_name;
+ char *config_file; /* gssproxy configuration file */
+ bool daemonize; /* let gssproxy daemonize */
+ char *socket_name; /* the socket name to use for */
+ int num_workers; /* number of worker threads */
};
+struct gp_workers;
+
struct gssproxy_ctx {
struct gp_config *config;
+ struct gp_workers *workers;
};
struct gp_conn;
@@ -60,4 +64,10 @@ void gp_conn_free(struct gp_conn *conn);
void gp_socket_send_data(verto_ctx *vctx, struct gp_conn *conn,
uint8_t *buffer, size_t buflen);
+/* from gp_workers.c */
+struct gp_workers *gp_workers_init(verto_ctx *vctx, struct gp_config *cfg);
+void gp_workers_free(struct gp_workers *w);
+int gp_query_new(struct gp_workers *w, struct gp_conn *conn,
+ uint8_t *buffer, size_t buflen);
+
#endif /* _SRV_UTILS_H_ */
diff --git a/proxy/src/gp_workers.c b/proxy/src/gp_workers.c
new file mode 100644
index 0000000..5ccf20f
--- /dev/null
+++ b/proxy/src/gp_workers.c
@@ -0,0 +1,398 @@
+/*
+ GSS-PROXY
+
+ Copyright (C) 2011 Red Hat, Inc.
+ Copyright (C) 2011 Simo Sorce <simo.sorce@redhat.com>
+
+ Permission is hereby granted, free of charge, to any person obtaining a
+ copy of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom the
+ Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ DEALINGS IN THE SOFTWARE.
+*/
+
+#include "config.h"
+#include <pthread.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <fcntl.h>
+#include <syslog.h>
+#include <errno.h>
+#include "gp_utils.h"
+
+#define DEFAULT_WORKER_THREADS_NUM 5
+
+#define GP_QUERY_IN 0
+#define GP_QUERY_OUT 1
+#define GP_QUERY_ERR 2
+
+struct gp_query {
+ struct gp_query *next;
+
+ struct gp_conn *conn;
+ uint8_t *buffer;
+ size_t buflen;
+
+ int status;
+};
+
+struct gp_thread {
+ struct gp_workers *pool;
+ pthread_t tid;
+
+ /* if query is assigned, then the thread is busy */
+ struct gp_query *query;
+ pthread_mutex_t cond_mutex;
+ pthread_cond_t cond_wakeup;
+};
+
+struct gp_workers {
+ pthread_mutex_t lock;
+ bool shutdown;
+ struct gp_query *wait_list;
+ struct gp_query *reply_list;
+ struct gp_thread *threads;
+ int num_threads;
+ int sig_pipe[2];
+};
+
+static void *gp_worker_main(void *pvt);
+static void gp_handle_query(struct gp_workers *w, struct gp_query *q);
+static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev);
+
+/** DISPATCHER FUNCTIONS **/
+
+struct gp_workers *gp_workers_init(verto_ctx *vctx, struct gp_config *cfg)
+{
+ struct gp_workers *w;
+ pthread_attr_t attr;
+ verto_ev *ev;
+ int vflags;
+ int ret;
+ int i;
+
+ w = calloc(1, sizeof(struct gp_workers));
+ if (!w) {
+ return NULL;
+ }
+
+ /* init global queue mutex */
+ ret = pthread_mutex_init(&w->lock, NULL);
+ if (ret) {
+ free(w);
+ return NULL;
+ }
+
+ if (cfg->num_workers > 0) {
+ w->num_threads = cfg->num_workers;
+ } else {
+ w->num_threads = DEFAULT_WORKER_THREADS_NUM;
+ }
+
+ w->threads = calloc(w->num_threads, sizeof(struct gp_thread));
+ if (!w->threads) {
+ ret = -1;
+ goto done;
+ }
+
+ /* make thread joinable (portability) */
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ /* init all workers */
+ for (i = 0; i < w->num_threads; i++) {
+ ret = pthread_cond_init(&w->threads[i].cond_wakeup, NULL);
+ if (ret) {
+ goto done;
+ }
+ ret = pthread_mutex_init(&w->threads[i].cond_mutex, NULL);
+ if (ret) {
+ goto done;
+ }
+ ret = pthread_create(&w->threads[i].tid, &attr,
+ gp_worker_main, &w->threads[i]);
+ if (ret) {
+ goto done;
+ }
+ w->threads[i].pool = w;
+ }
+
+ /* add wakeup pipe, so that threads can hand back replies to the
+ * dispatcher */
+ ret = pipe2(w->sig_pipe, O_NONBLOCK | O_CLOEXEC);
+ if (ret == -1) {
+ goto done;
+ }
+
+ vflags = VERTO_EV_FLAG_PERSIST | VERTO_EV_FLAG_IO_READ;
+ ev = verto_add_io(vctx, vflags, gp_handle_reply, w->sig_pipe[0]);
+ if (!ev) {
+ ret = -1;
+ goto done;
+ }
+ verto_set_private(ev, w, NULL);
+
+ ret = 0;
+
+done:
+ if (ret) {
+ gp_workers_free(w);
+ }
+ return w;
+}
+
+void gp_workers_free(struct gp_workers *w)
+{
+ int ret;
+ int i;
+ void *retval;
+
+ ret = pthread_mutex_lock(&w->lock);
+ if (ret) {
+ syslog(LOG_CRIT, "Couldn't get mutex!");
+ return;
+ }
+
+ w->shutdown = true;
+
+ ret = pthread_mutex_unlock(&w->lock);
+ if (ret) {
+ syslog(LOG_CRIT, "Can't release mutex?!");
+ return;
+ }
+
+ if (w->threads) {
+ for (i = 0; i < w->num_threads; i++) {
+ /* wake up threads, then join them */
+ /* ======> COND_MUTEX */
+ pthread_mutex_lock(&w->threads[i].cond_mutex);
+ pthread_cond_signal(&w->threads[i].cond_wakeup);
+ /* <====== COND_MUTEX */
+ pthread_mutex_unlock(&w->threads[i].cond_mutex);
+
+ ret = pthread_join(w->threads[i].tid, &retval);
+ }
+
+ free(w->threads);
+ w->threads = NULL;
+ }
+
+ ret = pthread_mutex_destroy(&w->lock);
+ if (ret) {
+ syslog(LOG_CRIT, "Failed to destroy mutex?!");
+ return;
+ }
+
+ free(w);
+}
+
+static void gp_query_assign(struct gp_workers *w, struct gp_query *q)
+{
+ int i;
+ /* then either find a free thread or queue in the wait list */
+
+ for (i = 0; q != NULL && i < w->num_threads; i++) {
+ if (w->threads[i].query != NULL) continue;
+
+ /* ======> COND_MUTEX */
+ pthread_mutex_lock(&w->threads[i].cond_mutex);
+
+ if (w->threads[i].query == NULL) {
+ /* hand over the query */
+ w->threads[i].query = q;
+ q = NULL;
+ pthread_cond_signal(&w->threads[i].cond_wakeup);
+ }
+
+ /* <====== COND_MUTEX */
+ pthread_mutex_unlock(&w->threads[i].cond_mutex);
+ }
+
+ if (q) {
+ /* all threads are busy, store in wait list */
+
+ /* only the dispatcher handles wait_list
+ * so we do not need to lock around it */
+ q->next = w->wait_list;
+ w->wait_list = q;
+ q = NULL;
+ }
+}
+
+static void gp_query_free(struct gp_query *q, bool free_buffer)
+{
+ if (!q) {
+ return;
+ }
+
+ if (free_buffer) {
+ free(q->buffer);
+ }
+
+ free(q);
+}
+
+int gp_query_new(struct gp_workers *w, struct gp_conn *conn,
+ uint8_t *buffer, size_t buflen)
+{
+ struct gp_query *q;
+
+ /* create query struct */
+ q = calloc(1, sizeof(struct gp_query));
+ if (!q) {
+ return ENOMEM;
+ }
+
+ q->conn = conn;
+ q->buffer = buffer;
+ q->buflen = buflen;
+
+ gp_query_assign(w, q);
+
+ return 0;
+}
+
+static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev)
+{
+ struct gp_workers *w;
+ struct gp_query *q = NULL;
+ char dummy;
+ int ret;
+
+ w = verto_get_private(ev);
+
+ /* first read out the dummy so the pipe doesn't get clogged */
+ ret = read(w->sig_pipe[0], &dummy, 1);
+ if (ret) {
+ /* ignore errors */
+ }
+
+ /* grab a query reply if any */
+ if (w->reply_list) {
+ /* ======> POOL LOCK */
+ pthread_mutex_lock(&w->lock);
+
+ if (w->reply_list != NULL) {
+ q = w->reply_list;
+ w->reply_list = q->next;
+ }
+
+ /* <====== POOL LOCK */
+ pthread_mutex_unlock(&w->lock);
+ }
+
+ if (q) {
+ switch (q->status) {
+ case GP_QUERY_IN:
+ /* ?! fallback and kill client conn */
+ case GP_QUERY_ERR:
+ gp_conn_free(q->conn);
+ gp_query_free(q, true);
+ break;
+
+ case GP_QUERY_OUT:
+ gp_socket_send_data(vctx, q->conn, q->buffer, q->buflen);
+ gp_query_free(q, false);
+ break;
+ }
+ }
+
+ /* while we are at it, check if there is anything in the wait list
+ * we need to process, as one thread just got free :-) */
+
+ q = NULL;
+
+ if (w->wait_list) {
+ /* only the dispatcher handles wait_list
+ * so we do not need to lock around it */
+ if (w->wait_list) {
+ q = w->wait_list;
+ w->wait_list = q->next;
+ q->next = NULL;
+ }
+ }
+
+ if (q) {
+ gp_query_assign(w, q);
+ }
+}
+
+
+/** WORKER THREADS **/
+
+static void *gp_worker_main(void *pvt)
+{
+ struct gp_thread *t = (struct gp_thread *)pvt;
+ struct gp_query *q = NULL;
+ char dummy = 0;
+ int ret;
+
+ while (!t->pool->shutdown) {
+
+ /* wait for next query */
+ if (t->query == NULL) {
+ /* ======> COND_MUTEX */
+ pthread_mutex_lock(&t->cond_mutex);
+ while (t->query == NULL) {
+ pthread_cond_wait(&t->cond_wakeup, &t->cond_mutex);
+ if (t->pool->shutdown) {
+ pthread_exit(NULL);
+ }
+ }
+
+ /* grab the query off the shared pointer */
+ q = t->query;
+ t->query = NULL;
+
+ /* <====== COND_MUTEX */
+ pthread_mutex_unlock(&t->cond_mutex);
+ }
+
+ /* handle the client request */
+ gp_handle_query(t->pool, q);
+
+ /* now get lock on main queue, to play with the reply list */
+ /* ======> POOL LOCK */
+ pthread_mutex_lock(&t->pool->lock);
+
+ /* put back query so that dispatcher can send reply */
+ q->next = t->pool->reply_list;
+ t->pool->reply_list = q;
+
+ /* <====== POOL LOCK */
+ pthread_mutex_unlock(&t->pool->lock);
+
+ /* and wake up dispatcher so it will handle it */
+ ret = write(t->pool->sig_pipe[1], &dummy, 1);
+ if (ret == -1) {
+ syslog(LOG_ERR, "Failed to signal dispatcher!");
+ }
+ }
+
+ pthread_exit(NULL);
+}
+
+static void gp_handle_query(struct gp_workers *w, struct gp_query *q)
+{
+ /* TODO */
+
+ free(q->buffer);
+ q->buffer = strdup("WHATS UP?");
+ q->buflen = strlen(q->buffer);
+ q->status = GP_QUERY_OUT;
+}
+
diff --git a/proxy/src/gssproxy.c b/proxy/src/gssproxy.c
index ba11fa2..9bfad08 100644
--- a/proxy/src/gssproxy.c
+++ b/proxy/src/gssproxy.c
@@ -107,8 +107,15 @@ int main(int argc, const char *argv[])
}
verto_set_private(ev, gpctx, NULL);
+ gpctx->workers = gp_workers_init(vctx, gpctx->config);
+ if (!gpctx->workers) {
+ exit(EXIT_FAILURE);
+ }
+
verto_run(vctx);
+ gp_workers_free(gpctx->workers);
+
fini_server();
poptFreeContext(pc);