From d4a9e0445a1e423292ea7f3a91103af2a73c9544 Mon Sep 17 00:00:00 2001 From: David Troy Date: Sat, 1 Apr 2006 16:12:19 +0000 Subject: Populating trunk git-svn-id: http://svncommunity.digium.com/svn/astmanproxy/trunk@7 f02b47b9-160a-0410-81a6-dc3441afb0ec --- astmanproxy.c | 664 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 664 insertions(+) create mode 100644 astmanproxy.c (limited to 'astmanproxy.c') diff --git a/astmanproxy.c b/astmanproxy.c new file mode 100644 index 0000000..c0a8470 --- /dev/null +++ b/astmanproxy.c @@ -0,0 +1,664 @@ +/* Asterisk Manager Proxy + Copyright (c) 2005 David C. Troy + + This program is free software, distributed under the terms of + the GNU General Public License. + +*/ + +#include "astmanproxy.h" + +extern int LoadHandlers( void ); +extern void ReadConfig( void ); +extern FILE *OpenLogfile( void ); +extern int SetProcUID( void ); + +extern void *proxyaction_do(char *proxyaction, struct message *m, struct mansession *s); +extern void *ProxyLogin(struct mansession *s); +extern void *ProxyLogoff(struct mansession *s); + +int ConnectAsterisk(struct mansession *s); + +struct proxyconfig pc; +struct mansession *sessions = NULL; +struct iohandler *iohandlers = NULL; + +pthread_mutex_t sessionlock; +pthread_mutex_t serverlock; +pthread_mutex_t loglock; +pthread_mutex_t debuglock; +static int asock = -1; +FILE *proxylog; +int debug = 0; + +void hup(int sig) { + if (proxylog) { + fflush(proxylog); + fclose(proxylog); + } + proxylog = OpenLogfile(); + logmsg("Received HUP -- reopened log"); +} + +void leave(int sig) { + struct mansession *c; + struct message sm, cm; + struct iohandler *io; + struct ast_server *srv; + char iabuf[INET_ADDRSTRLEN]; + + /* Message to send to servers */ + memset(&sm, 0, sizeof(struct message)); + AddHeader(&sm, "Action: Logoff"); + + /* Message to send to clients */ + memset(&cm, 0, sizeof(struct message)); + AddHeader(&cm, PROXY_SHUTDOWN); + + if (debug) + debugmsg("Notifying and closing sessions"); + pthread_mutex_lock (&sessionlock); + while (sessions) { + c = sessions; + sessions = sessions->next; + + if (c->server) { + if (debug) + debugmsg("asterisk@%s: closing session", ast_inet_ntoa(iabuf, sizeof(iabuf), c->sin.sin_addr)); + c->output->write(c, &sm); + logmsg("Shutdown, closed server %s", ast_inet_ntoa(iabuf, sizeof(iabuf), c->sin.sin_addr)); + } else { + if (debug) + debugmsg("client@%s: closing session", ast_inet_ntoa(iabuf, sizeof(iabuf), c->sin.sin_addr)); + c->output->write(c, &cm); + logmsg("Shutdown, closed client %s", ast_inet_ntoa(iabuf, sizeof(iabuf), c->sin.sin_addr)); + } + close(c->fd); + pthread_mutex_destroy(&c->lock); + free(c); + } + pthread_mutex_unlock (&sessionlock); + + /* unload server list */ + while (pc.serverlist) { + srv = pc.serverlist; + pc.serverlist = srv->next; + if (debug) + debugmsg("asterisk@%s: forgetting", srv->ast_host); + free(srv); + } + + if (debug) + debugmsg("Closing listener socket"); + close(asock); + + /* unload io handlers */ + while (iohandlers) { + io = iohandlers; + iohandlers = iohandlers->next; + if (debug) + debugmsg("unloading: %s", io->formatname); + dlclose(io->dlhandle); + free(io); + } + + if(debug) + debugmsg("Done!\n"); + logmsg("Proxy stopped; shutting down."); + + fclose(proxylog); + pthread_mutex_destroy(&sessionlock); + pthread_mutex_destroy(&loglock); + pthread_mutex_destroy(&debuglock); + exit(sig); +} + +void Version( void ) +{ + printf("astmanproxy: Version %s, (C) David C. Troy\n", PROXY_VERSION); + return; +} + +void Usage( void ) +{ + printf("Usage: astmanproxy [-d|-h|-v]\n"); + printf(" -d : Start in Debug Mode\n"); + printf(" -h : Displays this message\n"); + printf(" -v : Displays version information\n"); + printf("Start with no options to run as daemon\n"); + return; +} + +void destroy_session(struct mansession *s) +{ + struct mansession *cur, *prev = NULL; + char iabuf[INET_ADDRSTRLEN]; + + pthread_mutex_lock(&sessionlock); + cur = sessions; + while(cur) { + if (cur == s) + break; + prev = cur; + cur = cur->next; + } + if (cur) { + if (prev) + prev->next = cur->next; + else + sessions = cur->next; + debugmsg("Connection closed: %s", ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr)); + if (s->fd > -1) + close(s->fd); + pthread_mutex_destroy(&s->lock); + free(s); + } else + debugmsg("Trying to delete non-existent session %p?\n", s); + pthread_mutex_unlock(&sessionlock); + + /* If there are no servers and no clients, why are we here? */ + if (!sessions) { + logmsg("Cannot connect to any servers! Leaving!"); + leave(0); + } +} + +int WriteClients(struct message *m) { + struct mansession *c; + char *actionid; + + c = sessions; + while (c) { + if ( !c->server && m->hdrcount>1 ) { + if (c->autofilter && c->actionid) { + actionid = astman_get_header(m, "ActionID"); + if ( !strcmp(actionid, c->actionid) ) { + c->output->write(c, m); + } + } else + c->output->write(c, m); + if ( c->input->autodisconnect && c->input->autodisconnect() ) + close(c->fd); + } + c = c->next; + } + return 1; +} + +int WriteAsterisk(struct message *m) { + int i; + char outstring[MAX_LEN], *dest; + struct mansession *s, *first; + + first = NULL; + dest = NULL; + + s = sessions; + + dest = astman_get_header(m, "Server"); + if (debug && *dest) debugmsg("set destination: %s", dest); + while ( s ) { + if ( s->server && (s->connected > 0) ) { + if ( !first ) + first = s; + if (*dest && !strcasecmp(dest, s->server->ast_host) ) + break; + } + s = s->next; + } + + if (!s) + s = first; + + /* Check for no servers and empty block -- Don't pester Asterisk if it is one*/ + if (!s || !s->server || (!m->hdrcount && !m->headers[0][0]) ) + return 1; + + debugmsg("writing block to %s", s->server->ast_host); + + pthread_mutex_lock(&s->lock); + for (i=0; ihdrcount; i++) { + if (strcasecmp(m->headers[i], "Server:") ) { + sprintf(outstring, "%s\r\n", m->headers[i]); + write(s->fd, outstring, strlen(outstring) ); + } + } + write(s->fd, "\r\n", 2); + pthread_mutex_unlock(&s->lock); + return 1; +} + +void *setactionid(char *actionid, struct message *m, struct mansession *s) +{ + pthread_mutex_lock(&s->lock); + strncpy(s->actionid, actionid, MAX_LEN); + pthread_mutex_unlock(&s->lock); + + return 0; +} + +/* Handles proxy client sessions; closely based on session_do from asterisk's manager.c */ +void *session_do(struct mansession *s) +{ + struct message m; + int res; + char *proxyaction, *actionid, *action, *key; + + if (s->input->onconnect) + s->input->onconnect(s, &m); + + for (;;) { + /* Get a complete message block from input handler */ + memset(&m, 0, sizeof(struct message) ); + res = s->input->read(s, &m); + m.session = s; + + if (res > 0) { + /* Check for anything that requires proxy-side processing */ + if (pc.key && !s->authenticated) { + key = astman_get_header(&m, "ProxyKey"); + if (!strcmp(key, pc.key) ) { + pthread_mutex_lock(&s->lock); + s->authenticated = 1; + pthread_mutex_unlock(&s->lock); + } else + break; + } + + proxyaction = astman_get_header(&m, "ProxyAction"); + actionid = astman_get_header(&m, "ActionID"); + action = astman_get_header(&m, "Action"); + if ( !strcasecmp(action, "Login") ) + ProxyLogin(s); + else if ( !strcasecmp(action, "Logoff") ) + ProxyLogoff(s); + else if ( !(*proxyaction == '\0') ) + proxyaction_do(proxyaction, &m, s); + else { + if ( !(*actionid == '\0') ) + setactionid(actionid, &m, s); + if ( !WriteAsterisk(&m) ) + break; + } + } else if (res < 0) + break; + } + + destroy_session(s); + if (debug) + debugmsg("Exiting session_do thread"); + pthread_exit(NULL); + return NULL; +} + +void *HandleAsterisk(struct mansession *s) +{ + struct message m; + int res,i; + char iabuf[INET_ADDRSTRLEN]; + + if (ConnectAsterisk(s)) + goto leave; + for (;;) { + + debugmsg("asterisk@%s: attempting read...", ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr)); + memset(&m, 0, sizeof(struct message) ); + res = s->input->read(s, &m); + m.session = s; + + if (res > 0) { + if (debug) { + for(i=0; isin.sin_addr), m.headers[i]); + } + } + + if (!s->connected) { + if ( !strcmp("Authentication accepted", astman_get_header(&m, "Message")) ) { + s->connected = 1; + if (debug) + debugmsg("asterisk@%s: connected successfully!", ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr) ); + } + if ( !strcmp("Authentication failed", astman_get_header(&m, "Message")) ) { + s->connected = -1; + } + } + + m.session = s; + AddHeader(&m, "Server: %s", m.session->server->ast_host); + + if (!WriteClients(&m)) + break; + /* TODO: does it make any sense to abort * conn if we cannot write to clients? I don't think so. + Do we even get a return code back that means anything? I don't think so. */ + } else if (res < 0) { + if (debug) + debugmsg("asterisk@%s: Not connected", ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr)); + if ( ConnectAsterisk(s) ) + break; + } + } + +leave: + if (debug) + debugmsg("asterisk@%s: Giving up and exiting thread", ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr) ); + destroy_session(s); + pthread_exit(NULL); + return NULL; +} + +int ConnectAsterisk(struct mansession *s) { + char iabuf[INET_ADDRSTRLEN]; + int r = 1, res = 0; + struct message m; + + /* Don't try to do this if auth has already failed! */ + if (s->connected < 0 ) + return 1; + else + s->connected = 0; + + if (debug) + debugmsg("asterisk@%s: Connecting (u=%s, p=%s)", ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr), + s->server->ast_user, s->server->ast_pass); + + /* Construct auth message just once */ + memset( &m, 0, sizeof(struct message) ); + AddHeader(&m, "Action: Login"); + AddHeader(&m, "Username: %s", s->server->ast_user); + AddHeader(&m, "Secret: %s", s->server->ast_pass); + AddHeader(&m, "Events: %s", s->server->ast_events); + + for ( ;; ) { + if ( connect_nonb(s->fd, (struct sockaddr *) &s->sin, sizeof(s->sin), 2) < 0 ) { + if (errno == EISCONN) { + pthread_mutex_lock(&s->lock); + s->fd = socket(AF_INET, SOCK_STREAM, 0); + pthread_mutex_unlock(&s->lock); + } + if (debug) + debugmsg("asterisk@%s: Connect failed, Retrying (%d) %s", + ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr), r, strerror(errno) ); + if (pc.maxretries && (++r>pc.maxretries) ) { + res = 1; + break; + } else + sleep(pc.retryinterval); + } else { + /* Send login */ + s->output->write(s, &m); + res = 0; + break; + } + } + + return res; +} + +int StartServer(struct ast_server *srv) { + + struct mansession *s; + struct hostent *ast_hostent; + + char iabuf[INET_ADDRSTRLEN]; + pthread_attr_t attr; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + + ast_hostent = gethostbyname(srv->ast_host); + if (!ast_hostent) { + logmsg("Cannot resolve host %s, cannot add!", srv->ast_host); + debugmsg("Cannot resolve host %s, cannot add!", srv->ast_host); + return 1; + } + + s = malloc(sizeof(struct mansession)); + if ( !s ) { + logmsg("Failed to allocate server session: %s\n", strerror(errno)); + debugmsg("Failed to allocate server session: %s\n", strerror(errno)); + return 1; + } + + memset(s, 0, sizeof(struct mansession)); + SetIOHandlers(s, "standard", "standard"); + s->server = srv; + + bzero((char *) &s->sin,sizeof(s->sin)); + s->sin.sin_family = AF_INET; + memcpy( &s->sin.sin_addr.s_addr, ast_hostent->h_addr, ast_hostent->h_length ); + s->sin.sin_port = htons(atoi(s->server->ast_port)); + s->fd = socket(AF_INET, SOCK_STREAM, 0); + + pthread_mutex_lock(&sessionlock); + s->next = sessions; + sessions = s; + pthread_mutex_unlock(&sessionlock); + + logmsg("Allocated Asterisk server session for %s", ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr)); + if (debug) { + debugmsg("asterisk@%s: Allocated server session", ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr)); + debugmsg("Set %s input format to %s", ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr), s->input->formatname); + debugmsg("Set %s output format to %s", ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr), s->output->formatname); + } + + + if (pthread_create(&s->t, &attr, (void *)HandleAsterisk, s)) + destroy_session(s); + else + debugmsg("launched ast %s thread!", s->server->ast_host); + + pthread_attr_destroy(&attr); + return 0; +} + +int LaunchAsteriskThreads() { + + struct ast_server *srv; + + srv = pc.serverlist; + while (srv) { + StartServer(srv); + srv = srv->next; + } + return 0; +} + +int SetIOHandlers(struct mansession *s, char *ifmt, char *ofmt) +{ + int res = 0; + struct iohandler *io; + + io = iohandlers; + pthread_mutex_lock(&s->lock); + while (io) { + if ( !strcasecmp(io->formatname, ifmt) ) + s->input = io; + + if ( !strcasecmp(io->formatname, ofmt) ) + s->output = io; + + io = io->next; + } + + if (!s->output) { + /* TODO: Output debug that default/first handler was used */ + s->output = iohandlers; + res = 1; + } + + if (!s->input) { + /* TODO: Output debug that default/first handler was used */ + s->input = iohandlers; + res = 1; + } + pthread_mutex_unlock(&s->lock); + + return res; +} + +static void *accept_thread() +{ + int as; + struct sockaddr_in sin; + int sinlen; + struct mansession *s; + struct protoent *p; + int arg = 1; + int flags; + pthread_attr_t attr; + char iabuf[INET_ADDRSTRLEN]; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + + for (;;) { + sinlen = sizeof(sin); + as = accept(asock, (struct sockaddr *)&sin, &sinlen); + if (as < 0) { + logmsg("Accept returned -1: %s\n", strerror(errno)); + continue; + } + p = (struct protoent *)getprotobyname("tcp"); + if( p ) { + if( setsockopt(as, p->p_proto, TCP_NODELAY, (char *)&arg, sizeof(arg) ) < 0 ) { + logmsg("Failed to set listener tcp connection to TCP_NODELAY mode: %s\n", strerror(errno)); + } + } + s = malloc(sizeof(struct mansession)); + if ( !s ) { + logmsg("Failed to allocate listener session: %s\n", strerror(errno)); + continue; + } + memset(s, 0, sizeof(struct mansession)); + memcpy(&s->sin, &sin, sizeof(sin)); + + /* For safety, make sure socket is non-blocking */ + flags = fcntl(as, F_GETFL); + fcntl(as, F_SETFL, flags | O_NONBLOCK); + + pthread_mutex_init(&s->lock, NULL); + s->fd = as; + SetIOHandlers(s, pc.inputformat, pc.outputformat); + s->autofilter = pc.autofilter; + s->inputcomplete = 0; + s->server = NULL; + + pthread_mutex_lock(&sessionlock); + s->next = sessions; + sessions = s; + pthread_mutex_unlock(&sessionlock); + + logmsg("Connection received from %s", ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr)); + if (debug) { + debugmsg("Connection received from %s", ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr)); + debugmsg("Set %s input format to %s", ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr), s->input->formatname); + debugmsg("Set %s output format to %s", ast_inet_ntoa(iabuf, sizeof(iabuf), s->sin.sin_addr), s->output->formatname); + } + + if (pthread_create(&s->t, &attr, (void *)session_do, s)) + destroy_session(s); + } + pthread_attr_destroy(&attr); + return NULL; +} + +int main(int argc, char *argv[]) +{ + struct sockaddr_in serv_sock_addr, client_sock_addr; + int cli_addrlen; + struct linger lingerstruct; /* for socket reuse */ + int flag; /* for socket reuse */ + pid_t pid; + char i; + + /* Figure out if we are in debug mode, handle other switches */ + while (( i = getopt( argc, argv, "dhv" ) ) != EOF ) + { + switch( i ) { + case 'd': + debug = 1; + break; + case 'h': + Usage(); + exit(0); + case 'v': + Version(); + exit(0); + case '?': + Usage(); + exit(1); + } + } + + + ReadConfig(); + proxylog = OpenLogfile(); + LoadHandlers(); + + if (SetProcUID()) { + fprintf(stderr,"Cannot set user/group! Check proc_user and proc_group config setting!\n"); + exit(1); + } + + /* If we are not in debug mode, then fork to background */ + if (!debug) { + if ( (pid = fork()) < 0) + exit( 1 ); + else if ( pid > 0) + exit( 0 ); + } + + /* Setup signal handlers */ + (void) signal(SIGINT,leave); + (void) signal(SIGHUP,hup); + (void) signal(SIGTERM,leave); + (void) signal(SIGPIPE, SIG_IGN); + + /* Initialize global mutexes */ + pthread_mutex_init(&sessionlock, NULL); + pthread_mutex_init(&loglock, NULL); + pthread_mutex_init(&debuglock, NULL); + + /* Initialize global client/server list */ + sessions = NULL; + LaunchAsteriskThreads(); + + /* Setup listener socket to setup new sessions... */ + if ((asock = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + fprintf(stderr,"Cannot create listener socket!\n"); + exit(1); + } + bzero((char *) &serv_sock_addr, sizeof serv_sock_addr ); + serv_sock_addr.sin_family = AF_INET; + + if ( !strcmp(pc.listen_addr,"*") ) + serv_sock_addr.sin_addr.s_addr = htonl(INADDR_ANY); + else + serv_sock_addr.sin_addr.s_addr = inet_addr( pc.listen_addr); + serv_sock_addr.sin_port = htons((short)pc.listen_port); + + /* Set listener socket re-use options */ + setsockopt(asock, SOL_SOCKET, SO_REUSEADDR, (void *)&flag, sizeof(flag)); + lingerstruct.l_onoff = 1; + lingerstruct.l_linger = 5; + setsockopt(asock, SOL_SOCKET, SO_LINGER, (void *)&lingerstruct, sizeof(lingerstruct)); + + if (bind(asock, (struct sockaddr *) &serv_sock_addr, sizeof serv_sock_addr ) < 0) { + fprintf(stderr,"Cannot bind to listener socket!\n"); + exit(1); + } + + listen(asock, 5); + cli_addrlen = sizeof(client_sock_addr); + if (debug) + debugmsg("Listening for connections"); + logmsg("Proxy Started: Listening for connections"); + + /* Launch listener thread */ + accept_thread(); + + pthread_exit(NULL); + exit(0); +} + -- cgit