summaryrefslogtreecommitdiffstats
path: root/proxy/src
diff options
context:
space:
mode:
Diffstat (limited to 'proxy/src')
-rw-r--r--proxy/src/gp_config.c2
-rw-r--r--proxy/src/gp_socket.c17
-rw-r--r--proxy/src/gp_utils.h16
-rw-r--r--proxy/src/gp_workers.c398
-rw-r--r--proxy/src/gssproxy.c7
5 files changed, 433 insertions, 7 deletions
diff --git a/proxy/src/gp_config.c b/proxy/src/gp_config.c
index 552539d..c0b4b47 100644
--- a/proxy/src/gp_config.c
+++ b/proxy/src/gp_config.c
@@ -61,6 +61,8 @@ int load_config(struct gp_config *cfg)
}
}
+ cfg->num_workers = iniparser_getint(d, "gssproxy:worker threads", 0);
+
done:
iniparser_freedict(d);
return ret;
diff --git a/proxy/src/gp_socket.c b/proxy/src/gp_socket.c
index 8b360ed..3c80142 100644
--- a/proxy/src/gp_socket.c
+++ b/proxy/src/gp_socket.c
@@ -233,6 +233,7 @@ static void gp_socket_read(verto_ctx *vctx, verto_ev *ev)
uint32_t size;
bool header = false;
size_t rn;
+ int ret;
int fd;
fd = verto_get_fd(ev);
@@ -299,10 +300,18 @@ static void gp_socket_read(verto_ctx *vctx, verto_ev *ev)
rbuf->pos += rn;
if (rbuf->pos == rbuf->size) {
- /* got all data hand over packet */
- /* TODO */
- ret = ENOENT;
- goto done;
+ /* got all data, hand over packet */
+ ret = gp_query_new(rbuf->conn->gpctx->workers, rbuf->conn,
+ rbuf->data, rbuf->size);
+ if (ret != 0) {
+ /* internal error, not much we can do */
+ goto done;
+ }
+
+ /* we successfully handed over the data */
+ rbuf->data = NULL;
+ gp_buffer_free(rbuf);
+ return;
}
ret = EAGAIN;
diff --git a/proxy/src/gp_utils.h b/proxy/src/gp_utils.h
index 09f3f06..47d766d 100644
--- a/proxy/src/gp_utils.h
+++ b/proxy/src/gp_utils.h
@@ -34,13 +34,17 @@
#define _(STRING) gettext(STRING)
struct gp_config {
- char *config_file;
- bool daemonize;
- char *socket_name;
+ char *config_file; /* gssproxy configuration file */
+ bool daemonize; /* let gssproxy daemonize */
+ char *socket_name; /* the socket name to use for */
+ int num_workers; /* number of worker threads */
};
+struct gp_workers;
+
struct gssproxy_ctx {
struct gp_config *config;
+ struct gp_workers *workers;
};
struct gp_conn;
@@ -60,4 +64,10 @@ void gp_conn_free(struct gp_conn *conn);
void gp_socket_send_data(verto_ctx *vctx, struct gp_conn *conn,
uint8_t *buffer, size_t buflen);
+/* from gp_workers.c */
+struct gp_workers *gp_workers_init(verto_ctx *vctx, struct gp_config *cfg);
+void gp_workers_free(struct gp_workers *w);
+int gp_query_new(struct gp_workers *w, struct gp_conn *conn,
+ uint8_t *buffer, size_t buflen);
+
#endif /* _SRV_UTILS_H_ */
diff --git a/proxy/src/gp_workers.c b/proxy/src/gp_workers.c
new file mode 100644
index 0000000..5ccf20f
--- /dev/null
+++ b/proxy/src/gp_workers.c
@@ -0,0 +1,398 @@
+/*
+ GSS-PROXY
+
+ Copyright (C) 2011 Red Hat, Inc.
+ Copyright (C) 2011 Simo Sorce <simo.sorce@redhat.com>
+
+ Permission is hereby granted, free of charge, to any person obtaining a
+ copy of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom the
+ Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ DEALINGS IN THE SOFTWARE.
+*/
+
+#include "config.h"
+#include <pthread.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <fcntl.h>
+#include <syslog.h>
+#include <errno.h>
+#include "gp_utils.h"
+
+#define DEFAULT_WORKER_THREADS_NUM 5
+
+#define GP_QUERY_IN 0
+#define GP_QUERY_OUT 1
+#define GP_QUERY_ERR 2
+
+struct gp_query {
+ struct gp_query *next;
+
+ struct gp_conn *conn;
+ uint8_t *buffer;
+ size_t buflen;
+
+ int status;
+};
+
+struct gp_thread {
+ struct gp_workers *pool;
+ pthread_t tid;
+
+ /* if query is assigned, then the thread is busy */
+ struct gp_query *query;
+ pthread_mutex_t cond_mutex;
+ pthread_cond_t cond_wakeup;
+};
+
+struct gp_workers {
+ pthread_mutex_t lock;
+ bool shutdown;
+ struct gp_query *wait_list;
+ struct gp_query *reply_list;
+ struct gp_thread *threads;
+ int num_threads;
+ int sig_pipe[2];
+};
+
+static void *gp_worker_main(void *pvt);
+static void gp_handle_query(struct gp_workers *w, struct gp_query *q);
+static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev);
+
+/** DISPATCHER FUNCTIONS **/
+
+struct gp_workers *gp_workers_init(verto_ctx *vctx, struct gp_config *cfg)
+{
+ struct gp_workers *w;
+ pthread_attr_t attr;
+ verto_ev *ev;
+ int vflags;
+ int ret;
+ int i;
+
+ w = calloc(1, sizeof(struct gp_workers));
+ if (!w) {
+ return NULL;
+ }
+
+ /* init global queue mutex */
+ ret = pthread_mutex_init(&w->lock, NULL);
+ if (ret) {
+ free(w);
+ return NULL;
+ }
+
+ if (cfg->num_workers > 0) {
+ w->num_threads = cfg->num_workers;
+ } else {
+ w->num_threads = DEFAULT_WORKER_THREADS_NUM;
+ }
+
+ w->threads = calloc(w->num_threads, sizeof(struct gp_thread));
+ if (!w->threads) {
+ ret = -1;
+ goto done;
+ }
+
+ /* make thread joinable (portability) */
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ /* init all workers */
+ for (i = 0; i < w->num_threads; i++) {
+ ret = pthread_cond_init(&w->threads[i].cond_wakeup, NULL);
+ if (ret) {
+ goto done;
+ }
+ ret = pthread_mutex_init(&w->threads[i].cond_mutex, NULL);
+ if (ret) {
+ goto done;
+ }
+ ret = pthread_create(&w->threads[i].tid, &attr,
+ gp_worker_main, &w->threads[i]);
+ if (ret) {
+ goto done;
+ }
+ w->threads[i].pool = w;
+ }
+
+ /* add wakeup pipe, so that threads can hand back replies to the
+ * dispatcher */
+ ret = pipe2(w->sig_pipe, O_NONBLOCK | O_CLOEXEC);
+ if (ret == -1) {
+ goto done;
+ }
+
+ vflags = VERTO_EV_FLAG_PERSIST | VERTO_EV_FLAG_IO_READ;
+ ev = verto_add_io(vctx, vflags, gp_handle_reply, w->sig_pipe[0]);
+ if (!ev) {
+ ret = -1;
+ goto done;
+ }
+ verto_set_private(ev, w, NULL);
+
+ ret = 0;
+
+done:
+ if (ret) {
+ gp_workers_free(w);
+ }
+ return w;
+}
+
+void gp_workers_free(struct gp_workers *w)
+{
+ int ret;
+ int i;
+ void *retval;
+
+ ret = pthread_mutex_lock(&w->lock);
+ if (ret) {
+ syslog(LOG_CRIT, "Couldn't get mutex!");
+ return;
+ }
+
+ w->shutdown = true;
+
+ ret = pthread_mutex_unlock(&w->lock);
+ if (ret) {
+ syslog(LOG_CRIT, "Can't release mutex?!");
+ return;
+ }
+
+ if (w->threads) {
+ for (i = 0; i < w->num_threads; i++) {
+ /* wake up threads, then join them */
+ /* ======> COND_MUTEX */
+ pthread_mutex_lock(&w->threads[i].cond_mutex);
+ pthread_cond_signal(&w->threads[i].cond_wakeup);
+ /* <====== COND_MUTEX */
+ pthread_mutex_unlock(&w->threads[i].cond_mutex);
+
+ ret = pthread_join(w->threads[i].tid, &retval);
+ }
+
+ free(w->threads);
+ w->threads = NULL;
+ }
+
+ ret = pthread_mutex_destroy(&w->lock);
+ if (ret) {
+ syslog(LOG_CRIT, "Failed to destroy mutex?!");
+ return;
+ }
+
+ free(w);
+}
+
+static void gp_query_assign(struct gp_workers *w, struct gp_query *q)
+{
+ int i;
+ /* then either find a free thread or queue in the wait list */
+
+ for (i = 0; q != NULL && i < w->num_threads; i++) {
+ if (w->threads[i].query != NULL) continue;
+
+ /* ======> COND_MUTEX */
+ pthread_mutex_lock(&w->threads[i].cond_mutex);
+
+ if (w->threads[i].query == NULL) {
+ /* hand over the query */
+ w->threads[i].query = q;
+ q = NULL;
+ pthread_cond_signal(&w->threads[i].cond_wakeup);
+ }
+
+ /* <====== COND_MUTEX */
+ pthread_mutex_unlock(&w->threads[i].cond_mutex);
+ }
+
+ if (q) {
+ /* all threads are busy, store in wait list */
+
+ /* only the dispatcher handles wait_list
+ * so we do not need to lock around it */
+ q->next = w->wait_list;
+ w->wait_list = q;
+ q = NULL;
+ }
+}
+
+static void gp_query_free(struct gp_query *q, bool free_buffer)
+{
+ if (!q) {
+ return;
+ }
+
+ if (free_buffer) {
+ free(q->buffer);
+ }
+
+ free(q);
+}
+
+int gp_query_new(struct gp_workers *w, struct gp_conn *conn,
+ uint8_t *buffer, size_t buflen)
+{
+ struct gp_query *q;
+
+ /* create query struct */
+ q = calloc(1, sizeof(struct gp_query));
+ if (!q) {
+ return ENOMEM;
+ }
+
+ q->conn = conn;
+ q->buffer = buffer;
+ q->buflen = buflen;
+
+ gp_query_assign(w, q);
+
+ return 0;
+}
+
+static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev)
+{
+ struct gp_workers *w;
+ struct gp_query *q = NULL;
+ char dummy;
+ int ret;
+
+ w = verto_get_private(ev);
+
+ /* first read out the dummy so the pipe doesn't get clogged */
+ ret = read(w->sig_pipe[0], &dummy, 1);
+ if (ret) {
+ /* ignore errors */
+ }
+
+ /* grab a query reply if any */
+ if (w->reply_list) {
+ /* ======> POOL LOCK */
+ pthread_mutex_lock(&w->lock);
+
+ if (w->reply_list != NULL) {
+ q = w->reply_list;
+ w->reply_list = q->next;
+ }
+
+ /* <====== POOL LOCK */
+ pthread_mutex_unlock(&w->lock);
+ }
+
+ if (q) {
+ switch (q->status) {
+ case GP_QUERY_IN:
+ /* ?! fallback and kill client conn */
+ case GP_QUERY_ERR:
+ gp_conn_free(q->conn);
+ gp_query_free(q, true);
+ break;
+
+ case GP_QUERY_OUT:
+ gp_socket_send_data(vctx, q->conn, q->buffer, q->buflen);
+ gp_query_free(q, false);
+ break;
+ }
+ }
+
+ /* while we are at it, check if there is anything in the wait list
+ * we need to process, as one thread just got free :-) */
+
+ q = NULL;
+
+ if (w->wait_list) {
+ /* only the dispatcher handles wait_list
+ * so we do not need to lock around it */
+ if (w->wait_list) {
+ q = w->wait_list;
+ w->wait_list = q->next;
+ q->next = NULL;
+ }
+ }
+
+ if (q) {
+ gp_query_assign(w, q);
+ }
+}
+
+
+/** WORKER THREADS **/
+
+static void *gp_worker_main(void *pvt)
+{
+ struct gp_thread *t = (struct gp_thread *)pvt;
+ struct gp_query *q = NULL;
+ char dummy = 0;
+ int ret;
+
+ while (!t->pool->shutdown) {
+
+ /* wait for next query */
+ if (t->query == NULL) {
+ /* ======> COND_MUTEX */
+ pthread_mutex_lock(&t->cond_mutex);
+ while (t->query == NULL) {
+ pthread_cond_wait(&t->cond_wakeup, &t->cond_mutex);
+ if (t->pool->shutdown) {
+ pthread_exit(NULL);
+ }
+ }
+
+ /* grab the query off the shared pointer */
+ q = t->query;
+ t->query = NULL;
+
+ /* <====== COND_MUTEX */
+ pthread_mutex_unlock(&t->cond_mutex);
+ }
+
+ /* handle the client request */
+ gp_handle_query(t->pool, q);
+
+ /* now get lock on main queue, to play with the reply list */
+ /* ======> POOL LOCK */
+ pthread_mutex_lock(&t->pool->lock);
+
+ /* put back query so that dispatcher can send reply */
+ q->next = t->pool->reply_list;
+ t->pool->reply_list = q;
+
+ /* <====== POOL LOCK */
+ pthread_mutex_unlock(&t->pool->lock);
+
+ /* and wake up dispatcher so it will handle it */
+ ret = write(t->pool->sig_pipe[1], &dummy, 1);
+ if (ret == -1) {
+ syslog(LOG_ERR, "Failed to signal dispatcher!");
+ }
+ }
+
+ pthread_exit(NULL);
+}
+
+static void gp_handle_query(struct gp_workers *w, struct gp_query *q)
+{
+ /* TODO */
+
+ free(q->buffer);
+ q->buffer = strdup("WHATS UP?");
+ q->buflen = strlen(q->buffer);
+ q->status = GP_QUERY_OUT;
+}
+
diff --git a/proxy/src/gssproxy.c b/proxy/src/gssproxy.c
index ba11fa2..9bfad08 100644
--- a/proxy/src/gssproxy.c
+++ b/proxy/src/gssproxy.c
@@ -107,8 +107,15 @@ int main(int argc, const char *argv[])
}
verto_set_private(ev, gpctx, NULL);
+ gpctx->workers = gp_workers_init(vctx, gpctx->config);
+ if (!gpctx->workers) {
+ exit(EXIT_FAILURE);
+ }
+
verto_run(vctx);
+ gp_workers_free(gpctx->workers);
+
fini_server();
poptFreeContext(pc);