diff options
Diffstat (limited to 'ldap/servers/slapd/connection.c')
-rw-r--r-- | ldap/servers/slapd/connection.c | 2485 |
1 files changed, 2485 insertions, 0 deletions
diff --git a/ldap/servers/slapd/connection.c b/ldap/servers/slapd/connection.c new file mode 100644 index 00000000..4cf7869b --- /dev/null +++ b/ldap/servers/slapd/connection.c @@ -0,0 +1,2485 @@ +/** BEGIN COPYRIGHT BLOCK + * Copyright 2001 Sun Microsystems, Inc. + * Portions copyright 1999, 2001-2003 Netscape Communications Corporation. + * All rights reserved. + * END COPYRIGHT BLOCK **/ +#include <stdio.h> +#include <string.h> +#include <sys/types.h> +#ifndef _WIN32 +#include <sys/time.h> +#include <sys/socket.h> +#include <stdlib.h> +#endif +#define TCPLEN_T int +#include <signal.h> +#include "slap.h" +#include "prcvar.h" +#include "prlog.h" /* for PR_ASSERT */ +#include "fe.h" +#include <sasl.h> +#if defined(LINUX) +#include <netinet/tcp.h> /* for TCP_CORK */ +#endif + + +static void connection_threadmain( void ); +static void add_pb( Slapi_PBlock * ); +static Slapi_PBlock *get_pb( void ); +static void connection_add_operation(Connection* conn, Operation *op); +static void connection_free_private_buffer(Connection *conn); +static void op_copy_identity(Connection *conn, Operation *op); +static int is_ber_too_big(const Connection *conn,unsigned long ber_len); +static void log_ber_too_big_error(const Connection *conn, + unsigned long ber_len, unsigned long maxbersize); + +/* + * We maintain a global work queue of Slapi_PBlock's that have not yet + * been handed off to an operation thread. + */ +struct Slapi_PBlock_q +{ + Slapi_PBlock *pb; + struct Slapi_PBlock_q *next_pb; + int pb_fd; +}; + +static struct Slapi_PBlock_q *first_pb= NULL; /* global work queue head */ +static struct Slapi_PBlock_q *last_pb= NULL; /* global work queue tail */ +static PRLock *pb_q_lock=NULL; /* protects first_pb & last_pb */ + +static PRCondVar *op_thread_cv; /* used by operation threads to wait for work */ +static PRLock *op_thread_lock; /* associated with op_thread_cv */ +static int op_shutdown= 0; /* if non-zero, server is shutting down */ + +#define LDAP_SOCKET_IO_BUFFER_SIZE 512 /* Size of the buffer we give to the I/O system for reads */ + + +/* + * We really are done with this connection. Get rid of everything. + * + * Note: this function should be called with conn->c_mutex already locked + * or at a time when multiple threads are not in play that might touch the + * connection structure. + */ +void +connection_done(Connection *conn) +{ + connection_cleanup(conn); + /* free the private content, the buffer has been freed by above connection_cleanup */ + slapi_ch_free((void**)&conn->c_private); + if (NULL != conn->c_sb) + { + ber_sockbuf_free(conn->c_sb); + } + if (NULL != conn->c_mutex) + { + PR_DestroyLock(conn->c_mutex); + } + if (NULL != conn->c_pdumutex) + { + PR_DestroyLock(conn->c_pdumutex); + } +} + +/* + * We're going to be making use of this connection again. + * So, get rid of everything we can't make use of. + * + * Note: this function should be called with conn->c_mutex already locked + * or at a time when multiple threads are not in play that might touch the + * connection structure. + */ +void +connection_cleanup(Connection *conn) +{ + bind_credentials_clear( conn, PR_FALSE /* do not lock conn */, + PR_TRUE /* clear external creds. */ ); + slapi_ch_free((void**)&conn->c_authtype); + + /* Call the plugin extension destructors */ + factory_destroy_extension(connection_type,conn,NULL/*Parent*/,&(conn->c_extension)); + /* + * We hang onto these, since we can reuse them. + * Sockbuf *c_sb; + * PRLock *c_mutex; + * PRLock *c_pdumutex; + * Conn_private *c_private; + */ + +#ifdef _WIN32 + if (conn->c_prfd && (conn->c_flags & CONN_FLAG_SSL)) + { + LDAPDebug( LDAP_DEBUG_CONNS, + "conn=%d fd=%d closed now\n", + conn->c_connid, conn->c_sd,0); + PR_Close(conn->c_prfd); + } + else if (conn->c_sd) + { + LDAPDebug( LDAP_DEBUG_CONNS, + "conn=%d fd=%d closed now\n", + conn->c_connid, conn->c_sd,0); + closesocket(conn->c_sd); + } +#else + if (conn->c_prfd) + { + PR_Close(conn->c_prfd); + } +#endif + + conn->c_sd= SLAPD_INVALID_SOCKET; + conn->c_ldapversion= 0; + + conn->c_isreplication_session = 0; + slapi_ch_free((void**)&conn->cin_addr ); + slapi_ch_free((void**)&conn->cin_destaddr ); + if ( conn->c_domain != NULL ) + { + ber_bvecfree( conn->c_domain ); + conn->c_domain = NULL; + } + /* conn->c_ops= NULL; */ + conn->c_gettingber= 0; + conn->c_currentber= NULL; + conn->c_starttime= 0; + conn->c_connid= 0; + conn->c_opsinitiated= 0; + conn->c_opscompleted= 0; + conn->c_threadnumber= 0; + conn->c_refcnt= 0; + conn->c_idlesince= 0; + conn->c_flags= 0; + conn->c_needpw= 0; + conn->c_prfd= NULL; + /* c_ci stays as it is */ + conn->c_fdi= SLAPD_INVALID_SOCKET_INDEX; + conn->c_next= NULL; + conn->c_prev= NULL; + conn->c_extension= NULL; + /* remove any SASL I/O from the connection */ + sasl_io_cleanup(conn); + sasl_dispose((sasl_conn_t**)&conn->c_sasl_conn); + + /* free the connection socket buffer */ + connection_free_private_buffer(conn); +} + +/* + * Callers of connection_reset() must hold the conn->c_mutex lock. + */ +void +connection_reset(Connection* conn, int ns, PRNetAddr * from, int fromLen, int is_SSL) +{ + char * pTmp = is_SSL ? "SSL " : ""; + TCPLEN_T l_fromLen = (TCPLEN_T)fromLen; + TCPLEN_T addrlen, destaddrlen; + struct sockaddr_in addr, destaddr; + char *str_ip, *str_destip, buf_ip[ 256 ], buf_destip[ 256 ]; + char *str_unknown = "unknown"; + int in_referral_mode = config_check_referral_mode(); + + LDAPDebug( LDAP_DEBUG_CONNS, "new %sconnection on %d\n", pTmp, conn->c_sd, 0 ); + + /* bump our count of connections and update SNMP stats */ + PR_Lock( num_conns_mutex ); + conn->c_connid = num_conns++; + PR_Unlock( num_conns_mutex ); + + if (! in_referral_mode) { + PR_AtomicIncrement(g_get_global_snmp_vars()->ops_tbl.dsConnectionSeq); + PR_AtomicIncrement(g_get_global_snmp_vars()->ops_tbl.dsConnections); + } + + /* get peer address (IP address of this client) */ + addrlen = sizeof( addr ); + memset( &addr, 0, addrlen ); + + if ( ((from->ipv6.ip.pr_s6_addr32[0] != 0) || + (from->ipv6.ip.pr_s6_addr32[1] != 0) || + (from->ipv6.ip.pr_s6_addr32[2] != 0) || + (from->ipv6.ip.pr_s6_addr32[3] != 0)) || + ((conn->c_prfd != NULL) && (PR_GetPeerName( conn->c_prfd, from ) == 0)) ) { + conn->cin_addr = (PRNetAddr *) slapi_ch_malloc( sizeof( PRNetAddr ) ); + memcpy( conn->cin_addr, from, sizeof( PRNetAddr ) ); + + if ( PR_IsNetAddrType( conn->cin_addr, PR_IpAddrV4Mapped ) ) { + PRNetAddr v4addr; + memset( &v4addr, 0, sizeof( v4addr ) ); + v4addr.inet.family = PR_AF_INET; + v4addr.inet.ip = conn->cin_addr->ipv6.ip.pr_s6_addr32[3]; + PR_NetAddrToString( &v4addr, buf_ip, sizeof( buf_ip ) ); + } else { + PR_NetAddrToString( conn->cin_addr, buf_ip, sizeof( buf_ip ) ); + } + buf_ip[ sizeof( buf_ip ) - 1 ] = '\0'; + str_ip = buf_ip; + + } else if ( (conn->c_prfd == NULL) && + (getpeername( conn->c_sd, (struct sockaddr*)&addr, &addrlen ) == 0) ) { + conn->cin_addr = (PRNetAddr *)slapi_ch_malloc( sizeof( PRNetAddr ) ); + + if ( PR_SetNetAddr(PR_IpAddrNull, PR_AF_INET6, addr.sin_port, conn->cin_addr) + != PR_SUCCESS ) { + int oserr = PR_GetError(); + LDAPDebug( LDAP_DEBUG_ANY, "PR_SetNetAddr() failed, " + SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", + oserr, slapd_pr_strerror(oserr), 0 ); + } else { + PR_ConvertIPv4AddrToIPv6(addr.sin_addr.s_addr, &(conn->cin_addr->ipv6.ip)); + } + + /* copy string equivalent of address into a buffer to use for + * logging since each call to inet_ntoa() returns a pointer to a + * single thread-specific buffer (which prevents us from calling + * inet_ntoa() twice in one call to slapi_log_access()). + */ + str_ip = inet_ntoa( addr.sin_addr ); + strncpy( buf_ip, str_ip, sizeof( buf_ip ) - 1 ); + buf_ip[ sizeof( buf_ip ) - 1 ] = '\0'; + str_ip = buf_ip; + + } else { + str_ip = str_unknown; + } + + + /* + * get destination address (server IP address this client connected to) + */ + destaddrlen = sizeof( destaddr ); + memset( &destaddr, 0, destaddrlen ); + + + if ( conn->c_prfd != NULL ) { + conn->cin_destaddr = (PRNetAddr *) slapi_ch_malloc( sizeof( PRNetAddr ) ); + if (PR_GetSockName( conn->c_prfd, conn->cin_destaddr ) == 0) { + if ( PR_IsNetAddrType( conn->cin_destaddr, PR_IpAddrV4Mapped ) ) { + PRNetAddr v4destaddr; + memset( &v4destaddr, 0, sizeof( v4destaddr ) ); + v4destaddr.inet.family = PR_AF_INET; + v4destaddr.inet.ip = conn->cin_destaddr->ipv6.ip.pr_s6_addr32[3]; + PR_NetAddrToString( &v4destaddr, buf_destip, sizeof( buf_destip ) ); + } else { + PR_NetAddrToString( conn->cin_destaddr, buf_destip, sizeof( buf_destip ) ); + } + buf_destip[ sizeof( buf_destip ) - 1 ] = '\0'; + str_destip = buf_destip; + } else { + str_destip = str_unknown; + } + } else if ( (conn->c_prfd == NULL) && + (getsockname( conn->c_sd, (struct sockaddr*)&destaddr, &destaddrlen ) == 0) ) { + conn->cin_destaddr = (PRNetAddr *)slapi_ch_malloc( sizeof( PRNetAddr ) ); + + if ( PR_SetNetAddr(PR_IpAddrNull, PR_AF_INET6, destaddr.sin_port, conn->cin_destaddr) + != PR_SUCCESS ) { + int oserr = PR_GetError(); + LDAPDebug( LDAP_DEBUG_ANY, "PR_SetNetAddr() failed, " + SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", + oserr, slapd_pr_strerror(oserr), 0 ); + } else { + PR_ConvertIPv4AddrToIPv6(destaddr.sin_addr.s_addr, &(conn->cin_destaddr->ipv6.ip)); + } + + /* copy string equivalent of address into a buffer to use for + * logging since each call to inet_ntoa() returns a pointer to a + * single thread-specific buffer (which prevents us from calling + * inet_ntoa() twice in one call to slapi_log_access()). + */ + str_destip = inet_ntoa( destaddr.sin_addr ); + strncpy( buf_destip, str_destip, sizeof( buf_destip ) - 1 ); + buf_destip[ sizeof( buf_destip ) - 1 ] = '\0'; + str_destip = buf_destip; + + } else { + str_destip = str_unknown; + } + + + if ( !in_referral_mode ) { + /* create a sasl connection */ + ids_sasl_server_new(conn); + } + + /* log useful stuff to our access log */ + slapi_log_access( LDAP_DEBUG_STATS, + "conn=%d fd=%d slot=%d %sconnection from %s to %s\n", + conn->c_connid, conn->c_sd, ns, pTmp, str_ip, str_destip ); + + /* initialize the remaining connection fields */ + conn->c_ldapversion = LDAP_VERSION3; + conn->c_starttime = current_time(); + conn->c_idlesince = conn->c_starttime; + conn->c_flags = is_SSL ? CONN_FLAG_SSL : 0; + conn->c_authtype = slapi_ch_strdup(SLAPD_AUTH_NONE); +} + +/* Create a pool of threads for handling the operations */ +void +init_op_threads() +{ + int i; + PRErrorCode errorCode; + int max_threads = config_get_threadnumber(); + /* Initialize the locks and cv */ + + if ((pb_q_lock = PR_NewLock()) == NULL ) { + errorCode = PR_GetError(); + LDAPDebug( LDAP_DEBUG_ANY, + "init_op_threads: PR_NewLock failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", + errorCode, slapd_pr_strerror(errorCode), 0 ); + exit(-1); + } + + if ((op_thread_lock = PR_NewLock()) == NULL ) { + errorCode = PR_GetError(); + LDAPDebug( LDAP_DEBUG_ANY, + "init_op_threads: PR_NewLock failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", + errorCode, slapd_pr_strerror(errorCode), 0 ); + exit(-1); + } + + if ((op_thread_cv = PR_NewCondVar( op_thread_lock )) == NULL) { + errorCode = PR_GetError(); + LDAPDebug( LDAP_DEBUG_ANY, "init_op_threads: PR_NewCondVar failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", + errorCode, slapd_pr_strerror(errorCode), 0 ); + exit(-1); + } + + /* start the operation threads */ + for (i=0; i < max_threads; i++) { + PR_SetConcurrency(4); + if (PR_CreateThread (PR_USER_THREAD, + (VFP) (void *) connection_threadmain, NULL, + PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, + PR_UNJOINABLE_THREAD, + SLAPD_DEFAULT_THREAD_STACKSIZE + ) == NULL ) { + int prerr = PR_GetError(); + LDAPDebug( LDAP_DEBUG_ANY, "PR_CreateThread failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", + prerr, slapd_pr_strerror( prerr ), 0 ); + } else { + PR_AtomicIncrement(&active_threads); + } + } +} + +static void +referral_mode_reply(Slapi_PBlock *pb) +{ + struct slapdplugin *plugin; + plugin = (struct slapdplugin *) slapi_ch_calloc(1, sizeof(struct slapdplugin)); + if (plugin!=NULL) + { + struct berval *urls[2], url; + char *refer; + refer = config_get_referral_mode(); + pb->pb_plugin = plugin; + set_db_default_result_handlers(pb); + urls[0] = &url; + urls[1] = NULL; + url.bv_val = refer; + url.bv_len = refer ? strlen(refer) : 0; + slapi_send_ldap_result(pb, LDAP_REFERRAL, NULL, NULL, 0, urls); + slapi_ch_free((void **)&plugin); + slapi_ch_free((void **)&refer); + } +} + +static int +connection_need_new_password(const Connection *conn, const Operation *op, Slapi_PBlock *pb) +{ + int r= 0; + /* + * add tag != LDAP_REQ_SEARCH to allow admin server 3.5 to do + * searches when the user needs to reset + * the pw the first time logon. + * LP: 22 Dec 2000: Removing LDAP_REQ_SEARCH. It's very unlikely that AS 3.5 will + * be used to manage DS5.0 + */ + + if ( conn->c_needpw && op->o_tag != LDAP_REQ_MODIFY && + op->o_tag != LDAP_REQ_BIND && op->o_tag != LDAP_REQ_UNBIND && + op->o_tag != LDAP_REQ_ABANDON ) + { + add_pwd_control ( pb, LDAP_CONTROL_PWEXPIRED, 0); + slapi_log_access( LDAP_DEBUG_STATS, "conn=%d op=%d %s\n", + pb->pb_conn->c_connid, pb->pb_op->o_opid, + "need new password" ); + send_ldap_result( pb, LDAP_UNWILLING_TO_PERFORM, + NULL, NULL, 0, NULL ); + r= 1; + } + return r; +} + + +static void +connection_dispatch_operation(Connection *conn, Operation *op, Slapi_PBlock *pb) +{ + /* Copy the Connection DN into the operation struct */ + op_copy_identity( conn, op ); + + /* process the operation */ + + switch ( op->o_tag ) { + case LDAP_REQ_BIND: + operation_set_type(op,SLAPI_OPERATION_BIND); + do_bind( pb ); + break; + + case LDAP_REQ_UNBIND: + operation_set_type(op,SLAPI_OPERATION_UNBIND); + do_unbind( pb ); + break; + + case LDAP_REQ_ADD: + operation_set_type(op,SLAPI_OPERATION_ADD); + do_add( pb ); + break; + + case LDAP_REQ_DELETE: + operation_set_type(op,SLAPI_OPERATION_DELETE); + do_delete( pb ); + break; + + case LDAP_REQ_MODRDN: + operation_set_type(op,SLAPI_OPERATION_MODRDN); + do_modrdn( pb ); + break; + + case LDAP_REQ_MODIFY: + operation_set_type(op,SLAPI_OPERATION_MODIFY); + do_modify( pb ); + break; + + case LDAP_REQ_COMPARE: + operation_set_type(op,SLAPI_OPERATION_COMPARE); + do_compare( pb ); + break; + + case LDAP_REQ_SEARCH: + operation_set_type(op,SLAPI_OPERATION_SEARCH); + + + /* On Linux we can use TCP_CORK to get us 5-10% speed benefit when one entry is returned */ + /* Nagle needs to be turned _off_, the default is off on linux, in daemon.c */ +#if defined(LINUX) + { + int i = 1; + int ret = 0; + /* Set TCP_CORK here */ + ret = setsockopt(conn->c_sd,IPPROTO_TCP,TCP_CORK,&i,sizeof(i)); + if (ret < 0) { + LDAPDebug(LDAP_DEBUG_ANY, "Failed to set TCP_CORK on connection %d\n",conn->c_connid, 0, 0); + } +#endif + + do_search( pb ); + +#if defined(LINUX) + /* Clear TCP_CORK to flush any unsent data */ + i = 0; + ret = setsockopt(conn->c_sd,IPPROTO_TCP,TCP_CORK,&i,sizeof(i)); + if (ret < 0) { + LDAPDebug(LDAP_DEBUG_ANY, "Failed to clear TCP_CORK on connection %d\n",conn->c_connid, 0, 0); + } + } +#endif + break; + + /* for some strange reason, the console is using this old obsolete + * value for ABANDON so we have to support it until the console + * get fixed + * otherwise the console has VERY BAD performances when a fair amount + * of entries are created in the DIT + */ + case LDAP_REQ_ABANDON_30: + case LDAP_REQ_ABANDON: + operation_set_type(op,SLAPI_OPERATION_ABANDON); + do_abandon( pb ); + break; + + case LDAP_REQ_EXTENDED: + operation_set_type(op,SLAPI_OPERATION_EXTENDED); + do_extended( pb ); + break; + + default: + LDAPDebug( LDAP_DEBUG_ANY, + "ignoring unknown LDAP request (conn=%d, tag=0x%lx)\n", + conn->c_connid, op->o_tag, 0 ); + break; + } +} + +/* this function should be called under c_mutex */ +int connection_release_nolock (Connection *conn) +{ + if (conn->c_refcnt <= 0) + { + slapi_log_error(SLAPI_LOG_FATAL, "connection", + "conn=%d fd=%d Attempt to release connection that is not aquired\n", + conn->c_connid, conn->c_sd); + PR_ASSERT (PR_FALSE); + return -1; + } + else + { + conn->c_refcnt--; + + return 0; + } +} + +/* this function should be called under c_mutex */ +int connection_acquire_nolock (Connection *conn) +{ + /* connection in the closing state can't be acquired */ + if (conn->c_flags & CONN_FLAG_CLOSING) + { + /* This may happen while other threads are still working on this connection */ + slapi_log_error(SLAPI_LOG_FATAL, "connection", + "conn=%d fd=%d Attempt to acquire connection in the closing state\n", + conn->c_connid, conn->c_sd); + return -1; + } + else + { + conn->c_refcnt++; + return 0; + } +} + +/* returns non-0 if connection can be reused and 0 otherwise */ +int connection_is_free (Connection *conn) +{ + int rc; + + PR_Lock(conn->c_mutex); + rc = conn->c_sd == SLAPD_INVALID_SOCKET && conn->c_refcnt == 0 && + !(conn->c_flags & CONN_FLAG_CLOSING); + PR_Unlock(conn->c_mutex); + + return rc; +} + +int connection_is_active_nolock (Connection *conn) +{ + return (conn->c_sd != SLAPD_INVALID_SOCKET) && + !(conn->c_flags & CONN_FLAG_CLOSING); +} + +/* returns non-0 if this is an active connection meaning it is in use + and not in the closing mode */ + +#if defined LDAP_IOCP +/* + * IO Completion ports are currently only available on NT. + */ + +typedef enum {read_data, write_data, new_connection} work_type; +static int wait_on_new_work(Connection **ppConn, work_type *type); +static int issue_new_read(Connection *conn); +static int finished_chomping(Connection *conn); +static int read_the_data(Connection *op, int *process_op); +static int is_new_operation(Connection *conn); +static int process_operation(Connection *conn, Operation *op); +static int connection_operation_new(Connection *conn, Operation **ppOp); +Operation *get_current_op(Connection *conn); +static int handle_read_data(Connection *conn,Operation **op, + int * connection_referenced); + +static void inc_op_count(Connection* conn) +{ + PR_AtomicIncrement(&conn->c_opscompleted); + PR_AtomicIncrement(&ops_completed); +} + +static int connection_increment_reference(Connection *conn) +{ + int rc = 0; + PR_Lock( conn->c_mutex ); + rc = connection_acquire_nolock (conn); + PR_Unlock( conn->c_mutex ); + return rc; +} + +static void connection_decrement_reference(Connection *conn) +{ + PR_Lock( conn->c_mutex ); + connection_release_nolock (conn); + PR_Unlock( conn->c_mutex ); +} + +static void +connection_threadmain() +{ + /* + * OK, so this is the thread main routine for the thread pool. + * This is the general idea : wait on the i/o completion port. + * then get some data. There are three cases here: + * 1) This is the first piece of data read for a new LDAP op. + * 2) This is a subsequent, but not final, piece of data read in the current LDAP op on this connection + * 3) This is the last piece of the current LDAP op on the current connection. + * Note that these cases are NOT exclusive ! In particular, all three can occur for the same read. + * based on detecting these cases, we end up doing one or more of the following things: + * a) Create new structures for a new op. + * b) Read data into the BER buffer for the op. + * c) Press on to service the operation request (note that the results are currently written + * synchronously. + * We always queue a new read on the socket too. + * (Note, we need to make sure we don't issue the new read operation until we've copied + * the data from the existing one. Otherwise we'd open ourselves to getting OOO data.) + * + * The intention is that this code will be clean enough to be used for the UNIX build, + * once we fake up I/O completion ports with select and another thread. + */ + + Connection *conn = NULL; + Operation *op = NULL; + int return_value = -1; + int abandon_connection = 0; + work_type command = 0; + int connection_referenced = 0; + + /* Don't ask me, and I will tell you no lies */ +#if defined( OSF1 ) || defined( hpux ) || defined( LINUX ) + /* Arrange to ignore SIGPIPE signals. */ + SIGNAL( SIGPIPE, SIG_IGN ); +#endif + + while (1) { + + abandon_connection = 1; /* we start off assuming that we'll fail somewhere */ + conn = NULL; /* just make sure we don't step on an old connection by mistake */ + op = NULL; /* Same goes for the operation */ + + return_value = wait_on_new_work(&conn,&command); + if( op_shutdown ) + break; + if (0 == return_value) { + connection_referenced = 0; /* No outstanding ref count on connection if wait for work returned OK */ + switch (command) { + case read_data: + return_value = handle_read_data(conn,&op,&connection_referenced); + if (0 == return_value) + { + abandon_connection = 0; + } + break; + case write_data: + /* NYI, but we need to go and find the state for the connection, find the operation + * which queued the write, and then get whatever data we need to write, then write it ! */ + break; + case new_connection: + /* NYI, but this would consist of the same stuff which is currently in daemon.c. + * On NT, we'd use AcceptEx() */ + break; + default: + break; + } + finished_chomping(conn); + } else { + PR_SetError(PR_IO_ERROR, return_value); + connection_referenced = 1; /* There is an outstanding refcnt on the conn, so we get to close the right one ! */ + } + + /* If anything went wrong with the connection above, such that we need to + * disconnect it, we'll know here and shoot it in the foot. + */ + if ( (NULL != conn) && abandon_connection) { + disconnect_server(conn, conn->c_connid, op ? op->o_opid : -1, SLAPD_DISCONNECT_ABORT, 0 ); + if (connection_referenced) { + connection_decrement_reference(conn); + } + } + } + PR_AtomicDecrement(&active_threads); +} + +static int handle_read_data(Connection *conn,Operation **op, + int * connection_referenced) +{ + int return_value = 0; + int process_op = 0; /* Do we or do we not process a complete operation now ? */ + + if (is_new_operation(conn)) { + return_value = connection_operation_new(conn,op); + } else { + *op = get_current_op(conn); + } + + /* if connection is closing */ + if (return_value != 0) { + LDAPDebug(LDAP_DEBUG_CONNS, + "handle_read_data returns as conn %d closing, fd=%d\n", + conn->c_connid,conn->c_sd,0); + return return_value; + } + + return_value = read_the_data(conn,&process_op); + + if (0 == return_value) { + if (0 != process_op) + return_value = process_operation(conn,*op); + } + else + *connection_referenced = 1; + + return return_value; +} + +/* Function which does the work involved in servicing an LDAP operation. */ +static int process_operation(Connection *conn, Operation *op) +{ + Slapi_PBlock *pb = NULL; + unsigned long len, tag; + long msgid; + int return_value = 0; + int destroy_content = 1; + + + pb = (Slapi_PBlock *) slapi_ch_calloc( 1, sizeof(Slapi_PBlock) ); + pb->pb_conn = conn; + pb->pb_op = op; + /* destroy operation content when done */ + slapi_pblock_set (pb, SLAPI_DESTROY_CONTENT, &destroy_content); + + if (! config_check_referral_mode()) { + PR_AtomicIncrement(&ops_initiated); + PR_AtomicIncrement(g_get_global_snmp_vars()->ops_tbl.dsInOps); + } + + if ( (tag = ber_get_int( op->o_ber, &msgid )) + != LDAP_TAG_MSGID ) { + /* log, close and send error */ + LDAPDebug( LDAP_DEBUG_ANY, + "conn=%d unable to read tag for incoming request\n", conn->c_connid, 0, 0 ); + return_value = -1; + goto done; + } + op->o_msgid = msgid; + + tag = ber_peek_tag( op->o_ber, &len ); + switch ( tag ) { + case LBER_ERROR: + case LDAP_TAG_LDAPDN: /* optional username, for CLDAP */ + /* log, close and send error */ + LDAPDebug( LDAP_DEBUG_ANY, + "conn=%d ber_peek_tag returns 0x%lx\n", conn->c_connid, tag, 0 ); + return_value = -1; + goto done; + default: + break; + } + op->o_tag = tag; + + /* are we in referral-only mode? */ + if (config_check_referral_mode() && tag != LDAP_REQ_UNBIND) + { + referral_mode_reply(pb); + goto done; + } + + /* check if new password is required */ + if(connection_need_new_password(conn, op, pb)) + { + goto done; + } + + /* if this is a bulk import, only "add" and "import done (extop)" are + * allowed */ + if (conn->c_flags & CONN_FLAG_IMPORT) { + if ((tag != LDAP_REQ_ADD) && (tag != LDAP_REQ_EXTENDED)) { + /* no cookie for you. */ + LDAPDebug(LDAP_DEBUG_ANY, "Attempted operation %d from " + "within bulk import\n", tag, 0, 0); + slapi_send_ldap_result(pb, LDAP_PROTOCOL_ERROR, NULL, NULL, + 0, NULL); + return_value = -1; + goto done; + } + } + + /* + * Call the do_<operation> function to process this request. + */ + connection_dispatch_operation(conn, op, pb); + +done: + + /* If we're here, it means that we successfully completed an operation , so bump the counts */ + inc_op_count(conn); + + if ( !( pb->pb_op->o_flags & OP_FLAG_PS )) { + /* + * If not a persistent search, remove the operation + * from this connection's list. + */ + PR_Lock( conn->c_mutex ); + connection_remove_operation( conn, op ); + PR_Unlock( conn->c_mutex ); + + /* destroying the pblock will cause destruction of the operation + * so this must happen before releasing the connection + */ + slapi_pblock_destroy( pb ); + + PR_Lock( conn->c_mutex ); + if (connection_release_nolock (conn) != 0) + { + return_value = -1; + } + PR_Unlock( conn->c_mutex ); + + } + return return_value; +} + +/* Helper functions for the code above: */ + + +struct Conn_private { + /* First the platform-dependent part */ +#ifdef _WIN32 + OVERLAPPED c_overlapped; + DWORD c_buffer_size; + char *c_buffer; + DWORD c_number_of_async_bytes_read; + DWORD c_buffer_offset; +#else +#endif + /* Now the platform independent part */ + Operation *c_current_op; + int c_flags; +}; + +static void connection_free_private_buffer(Connection *conn) +{ +#ifdef _WIN32 + if (NULL != conn->c_private) { + slapi_ch_free( (void**)&conn->c_private->c_buffer); + } +#else +#endif +} + +#define FLAG_CONN_HAD_SOME 1 /* Set when we've read the first piece of data already, means we don't need to allocate a new op */ +#define FLAG_CONN_COMPLETE 2 /* Set when we've read all of an LDAP operation request, means we can proceed to process it */ + + +/* Little helper functions */ + +Operation *get_current_op(Connection *conn) +{ + Operation *return_op = conn->c_private->c_current_op; + PR_ASSERT(NULL != return_op); + return return_op; +} + +static int is_new_operation(Connection *conn) +{ + if (0 == conn->c_private->c_flags) { + return 1; + } else { + return 0; + } +} + +/* Called when a new operation comes in on a connection */ +static int connection_operation_new(Connection *conn, Operation **ppOp) +{ + /* we need to make a new operation structure and chain it onto the connection */ + Operation *temp_op = NULL; + int rc; + + PR_Lock( conn->c_mutex ); + if (connection_is_active_nolock(conn) == 0) { + LDAPDebug(LDAP_DEBUG_CONNS, + "not creating a new operation when conn %d closing\n", + conn->c_connid,0,0); + PR_Unlock( conn->c_mutex ); + return -1; + } + temp_op = operation_new( plugin_build_operation_action_bitmap( 0, + plugin_get_server_plg() )); + connection_add_operation( conn, temp_op); + rc = connection_acquire_nolock (conn); + PR_Unlock( conn->c_mutex ); + /* Stash the op pointer in the connection structure for later use */ + PR_ASSERT(NULL == conn->c_private->c_current_op); + conn->c_private->c_current_op = temp_op; + *ppOp = temp_op; + return rc; +} + +/* Call this to tell the select thread to put us back into the read-ready signal set */ +static int add_to_select_set(Connection *conn) +{ + conn->c_gettingber = 0; + signal_listner(); + return 0; +} + +static int remove_from_select_set(Connection *conn) +{ + conn->c_gettingber = 1; + return 0; +} + +/* Helper functions from here on are platform-dependent */ +/* First the NT ones */ + +#ifdef _WIN32 + +static HANDLE completion_port = INVALID_HANDLE_VALUE; +#define COMPKEY_DIE ((DWORD) -1L) /* used to kill off workers */ + +static int push_back_data(Connection *conn, size_t offset, size_t length); + +/* Called when we've read from the completion queue, so there's data + * waiting for us to pickup. We're told: the number of bytes read, the + * address of the buffer, the state of this connection (new op, middle of op). + */ +static int read_the_data(Connection *conn, int *process_op) +{ + Conn_private *priv = conn->c_private; + Operation *op = NULL; + DWORD Bytes_Read = 0; + char *Buffer = NULL; + int tag = 0; + int return_value = -1; + unsigned long ber_len = 0; + unsigned long Bytes_Scanned = 0; + + op = priv->c_current_op; + Bytes_Read = priv->c_number_of_async_bytes_read; + Buffer = priv->c_buffer + priv->c_buffer_offset; + + PR_ASSERT(NULL != op->o_ber); + + /* Is this an SSL connection ? */ + if (0 == (conn->c_flags & CONN_FLAG_SSL)) { + /* Not SSL */ + + if (! config_check_referral_mode()) { + /* Update stats */ + PR_Lock( op_thread_lock ); + (*(g_get_global_snmp_vars()->ops_tbl.dsBytesRecv)) += Bytes_Read; + PR_Unlock( op_thread_lock ); + } + + /* We need to read the data into the BER buffer */ + /* This can return a tag pr LBER_DEFAULT, indicating some error condition */ + tag = ber_get_next_buffer_ext( Buffer, Bytes_Read, &ber_len, op->o_ber, &Bytes_Scanned, conn->c_sb ); + if(LBER_DEFAULT == tag) + { + if (0 == Bytes_Scanned) + { + /* Means we encountered an error---eg the client sent us pure crap--- + a bunch of bytes which we took to be a tag, length, then we ran off the + end of the buffer. The next time we get here, we'll be returned LBER_DEFAULT + This means that everything we've seen up till now is useless because it wasn't + an LDAP message. + So, we toss it away ! */ + if (errno == EMSGSIZE) { + log_ber_too_big_error(conn, ber_len, 0); + } + PR_Lock( conn->c_mutex ); + connection_remove_operation( conn, op ); + operation_free(&op, conn); + priv->c_current_op = NULL; + PR_Unlock( conn->c_mutex ); + return -1; /* Abandon Connection */ + } + } + if (is_ber_too_big(conn,ber_len)) + { + PR_Lock( conn->c_mutex ); + connection_remove_operation( conn, op ); + operation_free(&op, conn); + priv->c_current_op = NULL; + PR_Unlock( conn->c_mutex ); + return -1; /* Abandon Connection */ + } + + /* We set the flag to indicate that we'er in the middle of an op */ + priv->c_flags |= FLAG_CONN_HAD_SOME; + + /* Then we decide whether this is the last read for the current op */ + /* and set the flag accordingly */ + if (LBER_DEFAULT != tag) { /* we received a complete message */ + if (LDAP_TAG_MESSAGE == tag) { /* looks like an LDAP message */ + /* It's time to process this operation */ + *process_op = 1; + priv->c_current_op = NULL; + priv->c_flags = 0; + } else { + /* + * We received a non-LDAP message. Log and close connection. + */ + LDAPDebug( LDAP_DEBUG_ANY, + "conn=%d received a non-LDAP message" + " (tag 0x%lx, expected 0x%lx)\n", + conn->c_connid, tag, LDAP_TAG_MESSAGE ); + PR_Lock( conn->c_mutex ); + connection_remove_operation( conn, op ); + operation_free(&op, conn); + priv->c_current_op = NULL; + PR_Unlock( conn->c_mutex ); + return -1; /* Abandon Connection */ + } + } + + /* Finally, mark whether there's the beginning of another operation remaining in the buffer */ + /* If there is, queue up another I/O completion request on the port to get it handled OK */ + /* If not, issue a new read on the socket. */ + if (Bytes_Scanned != Bytes_Read) { + if (connection_increment_reference(conn) == -1) { + LDAPDebug(LDAP_DEBUG_CONNS, + "could not acquire lock in issue_new_read as conn %d closing fd=%d\n", + conn->c_connid,conn->c_sd,0); + /* XXX how to handle this error? */ + /* MAB: 25 Jan 01: let's try like this and pray this won't leak... */ + /* GB : this should be OK because an error here + * means some other thread decided to close the + * connection, which mean a fatal error happened + * in that case just forget about the remaining + * data and return + */ + return (0); + } + if ((return_value = push_back_data(conn,priv->c_overlapped.Offset + Bytes_Scanned, + Bytes_Read-Bytes_Scanned)) == -1) { + /* MAB: 25 jan 01 we need to decrement the conn refcnt before leaving... Otherwise, + * this thread will unbalance the ref_cnt inc and dec for this connection + * and the result is that the connection is never closed and instead is kept + * forever an never released -> this was causing a fd starvation on NT + */ + connection_decrement_reference(conn); + LDAPDebug(LDAP_DEBUG_CONNS, + "push_back_data failed: closing conn %d fd=%d\n", + conn->c_connid,conn->c_sd,0); + } + } else { + priv->c_overlapped.Offset = 0; + return_value = issue_new_read(conn); + } + } else { + /* SSL */ + if ( (tag = ber_get_next( conn->c_sb, &ber_len, op->o_ber )) + != LDAP_TAG_MESSAGE ) { + return( -1 ); + } + if(is_ber_too_big(conn,ber_len)) + { + return( -1 ); + } + /* Put this connection back into the read-ready signal state */ + /* priv->c_flags |= FLAG_CONN_COMPLETE; Redundant now */ + /* It's time to process this operation */ + *process_op = 1; + priv->c_current_op = NULL; + priv->c_flags = 0; + return_value = 0; + add_to_select_set(conn); + } + + return return_value; +} + +int push_back_data(Connection *conn, size_t offset, size_t length) +{ + /* Use PostQueuedCompletionStatus() to push the data back up the pipe */ + BOOL return_bool = FALSE; + + conn->c_private->c_overlapped.Offset = offset; + return_bool = PostQueuedCompletionStatus(completion_port,length,(DWORD)conn,&conn->c_private->c_overlapped); + + if (return_bool) { + return 0; + } else { + return -1; + } +} + +/* This function issues a new read operation on the connection. + * Called once we've finished reading everything from the buffer. + * VMS crusties will notice the similarity to $QIO. + */ +int issue_new_read(Connection *conn) +{ + BOOL return_bool = FALSE; + HANDLE socket = INVALID_HANDLE_VALUE; + void **buffer = NULL; + DWORD bytes_read = 0; + DWORD buffer_size = 0; + OVERLAPPED *overlapped = NULL; + + PR_ASSERT(NULL != conn); + socket = (HANDLE)conn->c_sd; + PR_ASSERT(NULL != socket); + + /* here we make sure that we have a buffer allocated */ + buffer = &conn->c_private->c_buffer; + if (NULL == *buffer) { + *buffer = (void*)slapi_ch_malloc(LDAP_SOCKET_IO_BUFFER_SIZE); + if (NULL == *buffer) { + /* memory allocation failure */ + return -1; + } + conn->c_private->c_buffer_size = LDAP_SOCKET_IO_BUFFER_SIZE; + } + + buffer_size = conn->c_private->c_buffer_size; + overlapped = &conn->c_private->c_overlapped; + + if (connection_increment_reference(conn) == -1) { + LDAPDebug(LDAP_DEBUG_CONNS, + "could not acquire lock in issue_new_read as conn %d closing fd=%d\n", + conn->c_connid,conn->c_sd,0); + /* This means that the connection is closing */ + return -1; + } + return_bool = ReadFile(socket,*buffer,buffer_size,&bytes_read,overlapped); + if ( !return_bool && ERROR_IO_PENDING != GetLastError( ) ) { + /* This means that the connection is shot for some reason */ + connection_decrement_reference(conn); + return -1; + } else { + /* Our work is done, i/o read now queued */ + return 0; + } +} + +static int wait_on_new_work(Connection **ppConn, work_type *type) +{ + /* Here, we wait on the I/O completion port for new data */ + /* because we're not sure whether the completion port has been created yet, + * we wait 'till it has been. + */ + Connection *temp_conn = NULL; + DWORD Bytes_Received = 0; + OVERLAPPED *pOverlapped = NULL; + BOOL return_bool = FALSE; + + *type = read_data; + + while ( (INVALID_HANDLE_VALUE == completion_port) && (!op_shutdown) ) { + Sleep(100); + } + while (1) { + if (op_shutdown) { + return EINTR; + } + return_bool = GetQueuedCompletionStatus(completion_port,&Bytes_Received,(DWORD*)&temp_conn,&pOverlapped,INFINITE); + if ((unsigned long)temp_conn == COMPKEY_DIE ) { + continue; /* kill this worker */ + } + if (TRUE == return_bool) { + /* we successfully completed the I/O operation */ + /* set the connection pointer the caller gave us to the one from the port */ + PR_ASSERT(NULL != pOverlapped); + PR_ASSERT(NULL != temp_conn); + *ppConn = temp_conn; + /* store the # bytes read in the connection structure */ + (*ppConn)->c_private->c_number_of_async_bytes_read = Bytes_Received; + (*ppConn)->c_private->c_buffer_offset = (*ppConn)->c_private->c_overlapped.Offset; + if( Bytes_Received == 0 ) + { + /* 0 bytes received from a completed overlapped I/O + operation means the socket's been closed. */ + break; + } + (*ppConn)->c_idlesince = current_time(); + /* If we exit here, everything is OK */ + connection_decrement_reference(temp_conn); + return 0; + } + if ( (FALSE == return_bool) && (NULL == pOverlapped) ) { + /* we timed out */ + /* slapi_log_error( SLAPI_LOG_FATAL, "connection", + "GetQueuedCompletionStatus call timed out\n");*/ + continue; + } + if ( (FALSE == return_bool) && (NULL != pOverlapped)) { + /* signifies some sort of i/o error, most likely an abortive close */ + /* slapi_log_error( SLAPI_LOG_FATAL, "connection", + "GetQueuedCompletionStatus call failed; error - %ld\n", GetLastError());*/ + if (NULL != temp_conn) { + /* If we were told the connection, return it--otherwise we can't tell which connection to close */ + *ppConn = temp_conn; + } + break; + } + } + return EPIPE; /* we failed to read for some reason */ +} + +int connection_new_private(Connection *conn) +{ + /* first add to the completion port */ + DWORD threads = 10; /* DBDB hackhack */ + HANDLE socket = INVALID_HANDLE_VALUE; + HANDLE return_port = NULL; + Conn_private *priv = NULL; + int return_value = -1; + + PR_ASSERT(NULL != conn); + + socket = (HANDLE) conn->c_sd; + + /* make the private data if it isn't already there */ + + if (NULL == conn->c_private) { + Conn_private *new_private = (Conn_private *)slapi_ch_malloc(sizeof(Conn_private)); + if (NULL == new_private) { + /* memory allocation failed */ + return -1; + } + conn->c_private = new_private; + ZeroMemory(conn->c_private,sizeof(Conn_private)); + } + priv = conn->c_private; + /* Make sure the private structure is cleared */ + /* Note: you must modify this code if the contents + * of the structure are changed---we can't simply + * zero the structure because we want to preserve the + * buffer. IMPORTANT---here we reuse the I/O buffer + * from before. This is deliberate, to avoid mallocing again */ + ZeroMemory(&(priv->c_overlapped),sizeof(OVERLAPPED)); + priv->c_number_of_async_bytes_read = 0; + priv->c_buffer_offset = 0; + priv->c_flags = 0; + priv->c_current_op = NULL; + + + if (INVALID_HANDLE_VALUE == completion_port) { + /* completion port not yet setup, we need to make it */ + completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0); + if (NULL == completion_port) { + LDAPDebug(LDAP_DEBUG_ANY,"Failed to create master I/O completion port\n",0,0,0); + return -1; + } + } + /* If the connection is SSL, don't do the right thing */ + if (0 == (conn->c_flags & CONN_FLAG_SSL)) { + return_port = CreateIoCompletionPort(socket,completion_port,(DWORD)conn,0); + if (NULL == return_port) { + LDAPDebug(LDAP_DEBUG_ANY,"Failed to associate socket with I/O completion port, fd=%d,GetLastError = %d\n",socket,GetLastError(),0); + return -1; + } + /* Now queue the initial read on this connection */ + return_value = issue_new_read(conn); + } else { + return_value = 0; + } + + return return_value; +} + +/* If all is well, this only gets called for SSL connections */ +int connection_activity(Connection *conn) +{ + /* First check that this really is an SSL connection */ + if (0 == (conn->c_flags & CONN_FLAG_SSL)) { + return -1; + } + /* Now, the plan here is to push something up the IOCP pipe */ + /* We need to fake something up so that the code which pulls + * it off the queue does the right thing. Here's what we do: + * We just call PostQueuedCompletionStatus like normal. + * The connection is marked as SSL, and it is this that the + * reading code notices. Simple ! + */ + /* Also, we need to participate in the signaling protocol to the select thread */ + remove_from_select_set(conn); + /* We hold the lock already, increment the reference count, which will + be decremented in wait_for_new_work(). */ + if (connection_acquire_nolock (conn) == -1) { + LDAPDebug(LDAP_DEBUG_CONNS, + "could not acquire lock in connection_activity as conn %d closing fd=%d\n", + conn->c_connid,conn->c_sd,0); + /* XXX how to handle this error? */ + /* MAB: 25 Jan 01: let's return on error and pray this won't leak */ + return (-1); + } + return push_back_data(conn, 0, 1); +} + +static int finished_chomping(Connection *conn) +{ + /* On NT we don't need to do anything here */ + return 0; +} + +#else /* WIN32/UNIX */ + +/* + * This is where the UNIX Helper functions would be if IO + * Completion Ports were supported on UNIX. + */ + +#endif /* WIN32/UNIX */ + +#else /* LDAP_IOCP */ + +/* + * IO Completion Ports are not available on this platform. + */ + +static int counter= 0; /* JCM Dumb Name */ + +/* The connection private structure for UNIX turbo mode */ +struct Conn_private +{ + int turbo_flag; /* set if we are currently in turbo mode */ + int previous_op_count; /* the operation counter value last time we sampled it, used to compute operation rate */ + int operation_rate; /* rate (ops/sample period) at which this connection has been processing operations */ + time_t previous_count_check_time; /* The wall clock time we last sampled the operation count */ + size_t c_buffer_size; /* size of the socket read buffer */ + char *c_buffer; /* pointer to the socket read buffer */ + size_t c_buffer_bytes; /* number of bytes currently stored in the buffer */ + size_t c_buffer_offset; /* offset to the location of new data in the buffer */ +}; + +int +connection_new_private(Connection *conn) +{ + if (NULL == conn->c_private) { + Conn_private *new_private = (Conn_private *)slapi_ch_calloc(1,sizeof(Conn_private)); + if (NULL == new_private) { + /* memory allocation failed */ + return -1; + } + conn->c_private = new_private; + } + + /* The c_buffer is supposed to be NULL here, cleaned by connection_cleanup, + double check to avoid memory leak */ + if (NULL == conn->c_private->c_buffer) { + conn->c_private->c_buffer = (char*)slapi_ch_malloc(LDAP_SOCKET_IO_BUFFER_SIZE); + if (NULL == conn->c_private->c_buffer) { + /* memory allocation failure */ + return -1; + } + conn->c_private->c_buffer_size = LDAP_SOCKET_IO_BUFFER_SIZE; + } + + /* + * Clear the private structure, preserving the buffer and length in + * case we are reusing the buffer. + */ + { + char *c_buffer = conn->c_private->c_buffer; + size_t c_buffer_size = conn->c_private->c_buffer_size;; + + memset( conn->c_private, 0, sizeof(Conn_private)); + conn->c_private->c_buffer = c_buffer; + conn->c_private->c_buffer_size = c_buffer_size; + } + + return 0; +} + +static void +connection_free_private_buffer(Connection *conn) +{ + if (NULL != conn->c_private) { + slapi_ch_free((void*)&(conn->c_private->c_buffer)); + } +} + +/* + * Turbo Mode: + * Turbo Connection Mode is designed to more efficiently + * serve a small number of highly active connections performing + * mainly search operations. It is only used on UNIX---completion + * ports on NT make it unnecessary. + * A connection can be in turbo mode, or not in turbo mode. + * For non-turbo mode, the code path is the same as was before: + * worker threads wait on a condition variable for work. + * When they awake they consult the operation queue for + * something to do, read the operation from the connection's socket, + * perform the operation and go back to waiting on the condition variable. + * In Turbo Mode, a worker thread becomes associated with a connection. + * It then waits not on the condition variable, but directly on read ready + * state on the connection's socket. When new data arrives, it decodes + * the operation and executes it, and then goes back to read another + * operation from the same socket, or block waiting on new data. + * The read is done non-blocking, wait in poll with a timeout. + * + * There is a mechanism to ensure that only the most active + * connections are in turbo mode at any time. If this were not + * the case we could starve out some client operation requests + * due to waiting on I/O in many turbo threads at the same time. + * + * Each worker thread periodically (every 10 seconds) examines + * the activity level for the connection it is processing. + * This applies regardless of whether the connection is + * currently in turbo mode or not. Activity is measured as + * the number of operations initiated since the last check was done. + * The N connections with the highest activity level are allowed + * to enter turbo mode. If the current connection is in the top N, + * then we decide to enter turbo mode. If the current connection + * is no longer in the top N, then we leave turbo mode. + * The decision to enter or leave turbo mode is taken under + * the connection mutex, preventing race conditions where + * more than one thread can change the turbo state of a connection + * concurrently. + */ + + +/* Connection status values returned by + connection_wait_for_new_pb(), connection_read_operation(), etc. */ + +#define CONN_FOUND_WORK_TO_DO 0 +#define CONN_SHUTDOWN 1 +#define CONN_NOWORK 2 +#define CONN_DONE 3 +#define CONN_TIMEDOUT 4 + +#define CONN_TURBO_TIMEOUT_INTERVAL 1000 /* milliseconds */ +#define CONN_TURBO_CHECK_INTERVAL 5 /* seconds */ +#define CONN_TURBO_PERCENTILE 50 /* proportion of threads allowed to be in turbo mode */ +#define CONN_TURBO_HYSTERESIS 0 /* avoid flip flopping in and out of turbo mode */ + +int connection_wait_for_new_pb(Slapi_PBlock **ppb, PRIntervalTime interval) +{ + int ret = CONN_FOUND_WORK_TO_DO; + + PR_Lock( op_thread_lock ); + + /* While there is no operation to do... */ + while( counter < 1) { + /* Check if we should shutdown. */ + if (op_shutdown) { + PR_Unlock( op_thread_lock ); + return CONN_SHUTDOWN; + } + PR_WaitCondVar( op_thread_cv, interval); + } + + /* There is some work to do. */ + + counter--; + PR_Unlock( op_thread_lock ); + + /* Get the next operation from the work queue. */ + + *ppb = get_pb(); + if (*ppb == NULL) { + LDAPDebug( LDAP_DEBUG_ANY, "pb is null \n", 0, 0, 0 ); + PR_Lock( op_thread_lock ); + counter++; + PR_Unlock( op_thread_lock ); + ret = CONN_NOWORK; + } + return ret; +} + +void connection_make_new_pb(Slapi_PBlock **ppb, Connection *conn) +{ + /* In the classic case, the pb is made in connection_activity() and then + queued. get_pb() dequeues it. So we can just make it ourselves here */ + + /* *ppb = (Slapi_PBlock *) slapi_ch_calloc( 1, sizeof(Slapi_PBlock) ); */ + *ppb = slapi_pblock_new(); + (*ppb)->pb_conn = conn; + (*ppb)->pb_op = operation_new( plugin_build_operation_action_bitmap( 0, + plugin_get_server_plg() )); + connection_add_operation( conn, (*ppb)->pb_op ); +} + + +/* + * Utility function called by connection_read_operation(). This is a + * small wrapper on top of libldap's ber_get_next_buffer_ext(). + */ +static int +get_next_from_buffer( void *buffer, size_t buffer_size, unsigned long *lenp, + unsigned long *tagp, BerElement *ber, Connection *conn ) +{ + PRErrorCode err = 0; + PRInt32 syserr = 0; + unsigned long bytes_scanned = 0; + + *lenp = 0; + *tagp = ber_get_next_buffer_ext( buffer, buffer_size, lenp, ber, + &bytes_scanned, conn->c_sb ); + if (LBER_DEFAULT == *tagp && 0 == bytes_scanned) { + if (errno == EMSGSIZE) { + log_ber_too_big_error(conn, *lenp, 0); + err = SLAPD_DISCONNECT_BER_TOO_BIG; + } else { + syserr = errno; + } + /* Bad stuff happened, like the client sent us some junk */ + LDAPDebug( LDAP_DEBUG_CONNS, + "ber_get_next failed for connection %d\n", conn->c_connid, 0, 0 ); + /* reset private buffer */ + conn->c_private->c_buffer_bytes = conn->c_private->c_buffer_offset = 0; + + /* drop connection */ + disconnect_server( conn, conn->c_connid, -1, err, syserr ); + return -1; + } + + /* success, or need to wait for more data */ + conn->c_private->c_buffer_offset += bytes_scanned; + return 0; +} + +/* Either read read data into the connection buffer, or fail with err set */ +static int +connection_read_ldap_data(Connection *conn, PRInt32 *err) +{ + int ret = 0; + /* Is SASL encryption enabled on this connection ? */ + if (conn->c_sasl_io) { + /* If so, call the SASL I/O layer */ + ret = sasl_recv_connection(conn,conn->c_private->c_buffer, conn->c_private->c_buffer_size,err); + } else + { + /* Otherwise, just call PRRecv() */ + ret = PR_Recv(conn->c_prfd,conn->c_private->c_buffer,conn->c_private->c_buffer_size,0,PR_INTERVAL_NO_WAIT); + if (ret < 0) { + *err = PR_GetError(); + } + } + return ret; +} + +/* Upon returning from this function, we have either: + 1. Read a PDU successfully. + 2. Detected some error condition with the connection which requires closing it. + 3. In Turbo mode, we Timed out without seeing any data. + + We also handle the case where we read ahead beyond the current PDU + by buffering the data and setting the 'remaining_data' flag. + + */ +int connection_read_operation(Connection *conn, Operation *op, unsigned long *tag, int *remaining_data) +{ + unsigned long len = 0; + int ret = 0; + int waits_done = 0; + long msgid; + int new_operation = 1; /* Are we doing the first I/O read for a new operation ? */ + char *buffer = conn->c_private->c_buffer; + PRErrorCode err = 0; + PRInt32 syserr = 0; + + /* + * if the socket is still valid, get the ber element + * waiting for us on this connection. timeout is handled + * in the low-level [secure_]read_function. + */ + if ( (conn->c_sd == SLAPD_INVALID_SOCKET) || + (conn->c_flags & CONN_FLAG_CLOSING) ) { + return CONN_DONE; + } + + /* See if we should enable SASL I/O for this connection */ + if (conn->c_enable_sasl_io) { + ret = sasl_io_setup(conn); + if (ret) { + LDAPDebug( LDAP_DEBUG_ANY, + "conn=%d unable to enable SASL I/O\n", conn->c_connid, 0, 0 ); + disconnect_server( conn, conn->c_connid, -1, SLAPD_DISCONNECT_BAD_BER_TAG, EPROTO ); + return CONN_DONE; + } + } + + *tag = LBER_DEFAULT; + /* First check to see if we have buffered data from "before" */ + if (conn->c_private->c_buffer_bytes - conn->c_private->c_buffer_offset) { + /* If so, use that data first */ + if ( 0 != get_next_from_buffer( buffer + + conn->c_private->c_buffer_offset, + conn->c_private->c_buffer_bytes + - conn->c_private->c_buffer_offset, + &len, tag, op->o_ber, conn )) { + return CONN_DONE; + } + new_operation = 0; + } + /* If we still haven't seen a complete PDU, read from the network */ + while (*tag == LBER_DEFAULT) { + int ioblocktimeout_waits = config_get_ioblocktimeout() / CONN_TURBO_TIMEOUT_INTERVAL; + /* We should never get here with data remaining in the buffer */ + PR_ASSERT( !new_operation || 0 == (conn->c_private->c_buffer_bytes - conn->c_private->c_buffer_offset) ); + /* We make a non-blocking read call */ + /* ret = PR_Recv(conn->c_prfd,conn->c_private->c_buffer,conn->c_private->c_buffer_size,0,PR_INTERVAL_NO_WAIT); */ + ret = connection_read_ldap_data(conn,&err); + if (ret <= 0) { + if (0 == ret) { + /* Connection is closed */ + PR_Lock( conn->c_mutex ); + disconnect_server_nomutex( conn, conn->c_connid, -1, SLAPD_DISCONNECT_BAD_BER_TAG, 0 ); + conn->c_gettingber = 0; + PR_Unlock( conn->c_mutex ); + signal_listner(); + return CONN_DONE; + } + /* err = PR_GetError(); */ + /* If we would block, we need to poll for a while */ + if ( SLAPD_PR_WOULD_BLOCK_ERROR( err ) ) { + struct PRPollDesc pr_pd; + PRIntervalTime timeout = PR_MillisecondsToInterval(CONN_TURBO_TIMEOUT_INTERVAL); + pr_pd.fd = (PRFileDesc *)conn->c_prfd; + pr_pd.in_flags = PR_POLL_READ; + pr_pd.out_flags = 0; + ret = PR_Poll(&pr_pd, 1, timeout); + waits_done++; + /* Did we time out ? */ + if (0 == ret) { + /* We timed out, should the server shutdown ? */ + if (op_shutdown) { + return CONN_SHUTDOWN; + } + /* We timed out, is this the first read in a PDU ? */ + if (new_operation) { + /* If so, we return */ + return CONN_TIMEDOUT; + } else { + /* Otherwise we loop, unless we exceeded the ioblock timeout */ + if (waits_done > ioblocktimeout_waits) { + LDAPDebug( LDAP_DEBUG_CONNS,"ioblock timeout expired on connection %d\n", conn->c_connid, 0, 0 ); + disconnect_server( conn, conn->c_connid, -1, + SLAPD_DISCONNECT_IO_TIMEOUT, 0 ); + return CONN_DONE; + } else { + + /* The turbo mode may cause threads starvation. + Do a yield here to reduce the starving. + */ + PR_Sleep(PR_INTERVAL_NO_WAIT); + + continue; + } + } + } + if (-1 == ret) { + /* PR_Poll call failed */ + err = PR_GetError(); + syserr = PR_GetOSError(); + LDAPDebug( LDAP_DEBUG_ANY, + "PR_Poll for connection %d returns %d (%s)\n", conn->c_connid, err, slapd_pr_strerror( err ) ); + /* If this happens we should close the connection */ + disconnect_server( conn, conn->c_connid, -1, err, syserr ); + return CONN_DONE; + } + } else { + /* Some other error, typically meaning bad stuff */ + syserr = PR_GetOSError(); + LDAPDebug( LDAP_DEBUG_CONNS, + "PR_Recv for connection %d returns %d (%s)\n", conn->c_connid, err, slapd_pr_strerror( err ) ); + /* If this happens we should close the connection */ + disconnect_server( conn, conn->c_connid, -1, err, syserr ); + return CONN_DONE; + } + } else { + /* We read some data off the network, do something with it */ + conn->c_private->c_buffer_bytes = ret; + conn->c_private->c_buffer_offset = 0; + + if ( get_next_from_buffer( buffer, + conn->c_private->c_buffer_bytes + - conn->c_private->c_buffer_offset, + &len, tag, op->o_ber, conn ) != 0 ) { + return CONN_DONE; + } + + new_operation = 0; + ret = 0; + waits_done = 0; /* got some data: reset counter */ + } + } + /* If there is remaining buffered data, set the flag to tell the caller */ + if (conn->c_private->c_buffer_bytes - conn->c_private->c_buffer_offset) { + *remaining_data = 1; + } + + if ( *tag != LDAP_TAG_MESSAGE ) { + /* + * We received a non-LDAP message. Log and close connection. + */ + LDAPDebug( LDAP_DEBUG_ANY, + "conn=%d received a non-LDAP message (tag 0x%lx, expected 0x%lx)\n", + conn->c_connid, *tag, LDAP_TAG_MESSAGE ); + disconnect_server( conn, conn->c_connid, -1, + SLAPD_DISCONNECT_BAD_BER_TAG, EPROTO ); + return CONN_DONE; + } + + if ( (*tag = ber_get_int( op->o_ber, &msgid )) + != LDAP_TAG_MSGID ) { + /* log, close and send error */ + LDAPDebug( LDAP_DEBUG_ANY, + "conn=%d unable to read tag for incoming request\n", conn->c_connid, 0, 0 ); + disconnect_server( conn, conn->c_connid, -1, SLAPD_DISCONNECT_BAD_BER_TAG, EPROTO ); + return CONN_DONE; + } + if(is_ber_too_big(conn,len)) + { + disconnect_server( conn, conn->c_connid, -1, SLAPD_DISCONNECT_BER_TOO_BIG, 0 ); + return CONN_DONE; + } + op->o_msgid = msgid; + + *tag = ber_peek_tag( op->o_ber, &len ); + switch ( *tag ) { + case LBER_ERROR: + case LDAP_TAG_LDAPDN: /* optional username, for CLDAP */ + /* log, close and send error */ + LDAPDebug( LDAP_DEBUG_ANY, + "conn=%d ber_peek_tag returns 0x%lx\n", conn->c_connid, *tag, 0 ); + disconnect_server( conn, conn->c_connid, -1, SLAPD_DISCONNECT_BER_PEEK, EPROTO ); + return CONN_DONE; + default: + break; + } + op->o_tag = *tag; + return ret; +} + +void connection_make_readable(Connection *conn) +{ + PR_Lock( conn->c_mutex ); + conn->c_gettingber = 0; + PR_Unlock( conn->c_mutex ); + signal_listner(); +} + +/* + * Figure out the operation completion rate for this connection + */ +void connection_check_activity_level(Connection *conn) +{ + int current_count = 0; + int delta_count = 0; + PR_Lock( conn->c_mutex ); + /* get the current op count */ + current_count = conn->c_opscompleted; + /* compare to the previous op count */ + delta_count = current_count - conn->c_private->previous_op_count; + /* delta is the rate, store that */ + conn->c_private->operation_rate = delta_count; + /* store current count in the previous count slot */ + conn->c_private->previous_op_count = current_count; + /* update the last checked time */ + conn->c_private->previous_count_check_time = current_time(); + PR_Unlock( conn->c_mutex ); + LDAPDebug(LDAP_DEBUG_CONNS,"conn %d activity level = %d\n",conn->c_connid,delta_count,0); +} + +typedef struct table_iterate_info_struct { + int connection_count; + int rank_count; + int our_rate; +} table_iterate_info; + +int table_iterate_function(Connection *conn, void *arg) +{ + int ret = 0; + table_iterate_info *pinfo = (table_iterate_info*)arg; + pinfo->connection_count++; + if (conn->c_private->operation_rate > pinfo->our_rate) { + pinfo->rank_count++; + } + return ret; +} + +/* + * Scan the list of active connections, evaluate our relative rank + * for connection activity. + */ +void connection_find_our_rank(Connection *conn,int *connection_count, int *our_rank) +{ + int ret = 0; + table_iterate_info info = {0}; + info.our_rate = conn->c_private->operation_rate; + ret = connection_table_iterate_active_connections(the_connection_table, &info, &table_iterate_function); + *connection_count = info.connection_count; + *our_rank = info.rank_count; +} + +/* + * Evaluate the turbo policy for this connection + */ +void connection_enter_leave_turbo(Connection *conn, int *new_turbo_flag) +{ + int current_mode = 0; + int new_mode = 0; + int connection_count = 0; + int our_rank = 0; + int threshold_rank = 0; + PR_Lock(conn->c_mutex); + /* We can already be in turbo mode, or not */ + current_mode = conn->c_private->turbo_flag; + if(conn->c_private->operation_rate == 0) { + /* The connection is ranked by the passed activities. If some other connection have more activity, + increase rank by one. The highest rank is least activity, good candidates to move out of turbo mode. + However, if no activity on all the connections, then every connection gets 0 rank, so none move out. + No bother to do so much calcuation, short-cut to non-turbo mode if no activities in passed interval */ + new_mode = 0; + } else { + connection_find_our_rank(conn,&connection_count, &our_rank); + LDAPDebug(LDAP_DEBUG_CONNS,"conn %d turbo rank = %d out of %d conns\n",conn->c_connid,our_rank,connection_count); + threshold_rank = (int)((double)active_threads * ((double)CONN_TURBO_PERCENTILE / 100.0) ); + + /* adjust threshold_rank according number of connections, + less turbo threads as more connections, + one measure to reduce thread startvation. + */ + if (connection_count > threshold_rank) { + threshold_rank -= (connection_count - threshold_rank) / 5; + } + + if (current_mode) { + /* We're currently in turbo mode */ + /* Policy says that we stay in turbo mode provided + connection activity is still high. + */ + if (our_rank - CONN_TURBO_HYSTERESIS < threshold_rank) { + /* Stay in turbo mode */ + new_mode = 1; + } else { + /* Exit turbo mode */ + new_mode = 0; + } + } else { + /* We're currently not in turbo mode */ + /* Policy says that we go into turbo mode if + recent connection activity is high. + */ + if (our_rank + CONN_TURBO_HYSTERESIS < threshold_rank) { + /* Enter turbo mode */ + new_mode = 1; + } else { + /* Stay out of turbo mode */ + new_mode = 0; + } + } + } + conn->c_private->turbo_flag = new_mode; + PR_Unlock(conn->c_mutex); + if (current_mode != new_mode) { + if (current_mode) { + LDAPDebug(LDAP_DEBUG_CONNS,"conn %d leaving turbo mode\n",conn->c_connid,0,0); + } else { + LDAPDebug(LDAP_DEBUG_CONNS,"conn %d entering turbo mode\n",conn->c_connid,0,0); + } + } + *new_turbo_flag = new_mode; +} + +static void +connection_threadmain() +{ + Slapi_PBlock *pb = NULL; + PRIntervalTime interval = PR_SecondsToInterval(10); + Connection *conn = NULL; + Operation *op; + unsigned long tag = 0; + int need_wakeup; + int thread_turbo_flag = 0; + int ret = 0; + int more_data = 0; + +#if defined( OSF1 ) || defined( hpux ) + /* Arrange to ignore SIGPIPE signals. */ + SIGNAL( SIGPIPE, SIG_IGN ); +#endif + + while (1) { + int is_timedout = 0; + + if( op_shutdown ) { + LDAPDebug( LDAP_DEBUG_TRACE, + "op_thread received shutdown signal\n", 0, 0, 0 ); + PR_AtomicDecrement(&active_threads); + return; + } + + if (!thread_turbo_flag && (NULL == pb) && !more_data) { + /* If more data is left from the previous connection_read_operation, + we should finish the op now. Client might be thinking it's + done sending the request and wait for the response forever. + [blackflag 624234] */ + ret = connection_wait_for_new_pb(&pb,interval); + switch (ret) { + case CONN_NOWORK: + continue; + case CONN_SHUTDOWN: + LDAPDebug( LDAP_DEBUG_TRACE, + "op_thread received shutdown signal\n", 0, 0, 0 ); + PR_AtomicDecrement(&active_threads); + return; + case CONN_FOUND_WORK_TO_DO: + default: + break; + } + } else if (NULL == pb) { + + /* The turbo mode may cause threads starvation. + Do a yield here to reduce the starving + */ + PR_Sleep(PR_INTERVAL_NO_WAIT); + + PR_Lock(conn->c_mutex); + /* Make our own pb in turbo mode */ + connection_make_new_pb(&pb,conn); + PR_Unlock(conn->c_mutex); + if (! config_check_referral_mode()) { + PR_AtomicIncrement(&ops_initiated); + PR_AtomicIncrement(g_get_global_snmp_vars()->ops_tbl.dsInOps); + } + } + /* Once we're here we have a pb */ + conn = pb->pb_conn; + op = pb->pb_op; + + more_data = 0; + ret = connection_read_operation(conn,op,&tag,&more_data); + +#define DB_PERF_TURBO 1 +#if defined(DB_PERF_TURBO) + /* If it's been a while since we last did it ... */ + if (current_time() - conn->c_private->previous_count_check_time > CONN_TURBO_CHECK_INTERVAL) { + int new_turbo_flag = 0; + /* Check the connection's activity level */ + connection_check_activity_level(conn); + /* And if appropriate, change into or out of turbo mode */ + connection_enter_leave_turbo(conn,&new_turbo_flag); + thread_turbo_flag = new_turbo_flag; + } + + /* turn off turbo mode immediately if any pb waiting in global queue */ + if (thread_turbo_flag && (counter > 0)) { + thread_turbo_flag = 0; + LDAPDebug(LDAP_DEBUG_CONNS,"conn %d leaving turbo mode\n",conn->c_connid,0,0); + } +#endif + + switch (ret) { + case CONN_DONE: + /* This means that the connection was closed, so clear turbo mode */ + /*FALLTHROUGH*/ + case CONN_TIMEDOUT: + thread_turbo_flag = 0; + is_timedout = 1; + /* note: + * should call connection_make_readable after the op is removed + * connection_make_readable(conn); + */ + goto done; + case CONN_SHUTDOWN: + LDAPDebug( LDAP_DEBUG_TRACE, + "op_thread received shutdown signal\n", 0, 0, 0 ); + PR_AtomicDecrement(&active_threads); + return; + default: + break; + } + + /* + * Do not put the connection back to the read ready poll list + * if the operation is unbind. Unbind will close the socket. + * Similarly, if we are in turbo mode, don't send the socket + * back to the poll set. + * more_data: [blackflag 624234] + */ + if (tag != LDAP_REQ_UNBIND && (!thread_turbo_flag) && !more_data) { + connection_make_readable(conn); + } + + /* are we in referral-only mode? */ + if (config_check_referral_mode() && tag != LDAP_REQ_UNBIND) { + referral_mode_reply(pb); + goto done; + } + + /* check if new password is required */ + if(connection_need_new_password(conn, op, pb)) { + goto done; + } + + /* if this is a bulk import, only "add" and "import done" + * are allowed */ + if (conn->c_flags & CONN_FLAG_IMPORT) { + if ((tag != LDAP_REQ_ADD) && (tag != LDAP_REQ_EXTENDED)) { + /* no cookie for you. */ + LDAPDebug(LDAP_DEBUG_ANY, "Attempted operation %d " + "from within bulk import\n", + tag, 0, 0); + slapi_send_ldap_result(pb, LDAP_PROTOCOL_ERROR, NULL, + NULL, 0, NULL); + goto done; + } + } + + /* + * Call the do_<operation> function to process this request. + */ + connection_dispatch_operation(conn, op, pb); + +done: + /* + * done with this operation. delete it from the op + * queue for this connection, delete the number of + * threads devoted to this connection, and see if + * there's more work to do right now on this conn. + */ + + /* number of ops on this connection */ + PR_AtomicIncrement(&conn->c_opscompleted); + /* total number of ops for the server */ + PR_AtomicIncrement(&ops_completed); + /* If this op isn't a persistent search, remove it */ + if ( !( pb->pb_op->o_flags & OP_FLAG_PS )) { + /* delete from connection operation queue & decr refcnt */ + PR_Lock( conn->c_mutex ); + connection_remove_operation( conn, op ); + /* destroying the pblock will cause destruction of the operation + * so this must happend before releasing the connection + */ + slapi_pblock_destroy( pb ); + + /* If we're in turbo mode, we keep our reference to the connection + alive */ + if (!thread_turbo_flag && !more_data) { + connection_release_nolock (conn); + } + PR_Unlock( conn->c_mutex ); + } + if (1 == is_timedout && !more_data) + connection_make_readable(conn); + pb = NULL; + + if (!thread_turbo_flag && !more_data) { /* Don't do this in turbo mode */ + PR_Lock( conn->c_mutex ); + /* if the threadnumber of now below the maximum, wakeup + * the listener thread so that we start polling on this + * connection again + */ + /* DBDB I think this code is bogus -- we already signaled the listener above here */ + if (conn->c_threadnumber == config_get_maxthreadsperconn()) + need_wakeup = 1; + else + need_wakeup = 0; + conn->c_threadnumber--; + PR_Unlock( conn->c_mutex ); + + if (need_wakeup) + signal_listner(); + } + + + } /* while (1) */ +} + +/* thread need to hold conn->c_mutex before calling this function */ +int +connection_activity(Connection *conn) +{ + Slapi_PBlock *pb; + + connection_make_new_pb(&pb, conn); + + /* Add pb to the end of the work queue. */ + add_pb( pb ); + + /* Check if exceed the max thread per connection. If so, increment + c_pbwait. Otherwise increment the counter and notify the cond. var. + there is work to do. */ + + if (connection_acquire_nolock (conn) == -1) { + LDAPDebug(LDAP_DEBUG_CONNS, + "could not acquire lock in connection_activity as conn %d closing fd=%d\n", + conn->c_connid,conn->c_sd,0); + /* XXX how to handle this error? */ + /* MAB: 25 Jan 01: let's return on error and pray this won't leak */ + return (-1); + } + conn->c_gettingber = 1; + conn->c_threadnumber++; + PR_Lock( op_thread_lock ); + counter++; + PR_NotifyCondVar( op_thread_cv ); + PR_Unlock( op_thread_lock ); + + if (! config_check_referral_mode()) { + PR_AtomicIncrement(&ops_initiated); + PR_AtomicIncrement(g_get_global_snmp_vars()->ops_tbl.dsInOps); + } + return 0; +} + +/* add_pb(): will add a pb to the end of the global work queue. The work queue + is implemented as a singal link list. */ + +static void +add_pb( Slapi_PBlock *pb) +{ + + struct Slapi_PBlock_q *new_pb=NULL; + + LDAPDebug( LDAP_DEBUG_TRACE, "add_pb \n", 0, 0, 0 ); + + new_pb = (struct Slapi_PBlock_q *) slapi_ch_malloc ( sizeof( struct Slapi_PBlock_q )); + new_pb->pb = pb; + new_pb->next_pb =NULL; + + PR_Lock( pb_q_lock ); + if (last_pb == NULL) { + last_pb = new_pb; + first_pb = new_pb; + } + else { + last_pb->next_pb = new_pb; + last_pb = new_pb; + } + PR_Unlock( pb_q_lock ); +} + +/* get_pb(): will get a pb from the begining of the work queue, return NULL if + the queue is empty.*/ + +static Slapi_PBlock * +get_pb() +{ + + struct Slapi_PBlock_q *tmp = NULL; + Slapi_PBlock *pb; + + LDAPDebug( LDAP_DEBUG_TRACE, "get_pb \n", 0, 0, 0 ); + PR_Lock( pb_q_lock ); + if (first_pb == NULL) { + PR_Unlock( pb_q_lock ); + LDAPDebug( LDAP_DEBUG_ANY, "get_pb: the work queue is empty.\n", + 0, 0, 0 ); + return NULL; + } + + tmp = first_pb; + if ( first_pb == last_pb ) { + last_pb = NULL; + } + first_pb = tmp->next_pb; + PR_Unlock( pb_q_lock ); + + pb = tmp->pb; + /* Free the memory used by the pb found. */ + free ((char *) tmp); + + return (pb); +} +#endif /* LDAP_IOCP */ + + +/* Helper functions common to both varieties of connection code: */ + +/* op_thread_cleanup() : This function is called by daemon thread when it gets + the slapd_shutdown signal. It will set op_shutdown to 1 and notify + all thread waiting on op_thread_cv to terminate. */ + +void +op_thread_cleanup() +{ +#ifdef _WIN32 + int i; + PRIntervalTime interval; + int max_threads = config_get_threadnumber(); + interval = PR_SecondsToInterval(3); +#endif + LDAPDebug( LDAP_DEBUG_ANY, + "slapd shutting down - signaling operation threads\n", 0, 0, 0); + + PR_Lock( op_thread_lock ); + op_shutdown = 1; + PR_NotifyAllCondVar ( op_thread_cv ); + PR_Unlock( op_thread_lock ); +#ifdef _WIN32 + LDAPDebug( LDAP_DEBUG_ANY, + "slapd shutting down - waiting for %d threads to terminate\n", + active_threads, 0, 0 ); + /* kill off each worker waiting on GetQueuedCompletionStatus */ + for ( i = 0; i < max_threads; ++ i ) + { + PostQueuedCompletionStatus( completion_port, 0, COMPKEY_DIE ,0); + } + /* don't sleep: there's no reason to do so here DS_Sleep(interval); */ /* sleep 3 seconds */ +#endif +} + +static void +connection_add_operation(Connection* conn,Operation* op) +{ + Operation **olist= &conn->c_ops; + int id= conn->c_opsinitiated++; + int connid= conn->c_connid; + Operation **tmp; + + /* slapi_ch_stop_recording(); */ + + for ( tmp = olist; *tmp != NULL; tmp = &(*tmp)->o_next ) + ; /* NULL */ + + *tmp= op; + op->o_opid = id; + op->o_connid = connid; + /* Call the plugin extension constructors */ + op->o_extension = factory_create_extension(get_operation_object_type(),op,conn); +} + +/* + * Find an Operation on the Connection, and zap it in the butt. + * Call this function with conn->c_mutex locked. + */ +void +connection_remove_operation( Connection *conn, Operation *op ) +{ + Operation **olist= &conn->c_ops; + Operation **tmp; + + for ( tmp = olist; *tmp != NULL && *tmp != op; tmp = &(*tmp)->o_next ) + ; /* NULL */ + + if ( *tmp == NULL ) + { + LDAPDebug( LDAP_DEBUG_ANY, "connection_remove_operation: can't find op %d for conn %d\n", + op->o_msgid, conn->c_connid, 0 ); + } + else + { + *tmp = (*tmp)->o_next; + } +} + + +/* + * Return a non-zero value if any operations are pending on conn. + * Operation op2ignore is ignored (okay to pass NULL). Typically, op2ignore + * is the caller's op (because the caller wants to check if all other + * ops are done). + * If test_resultsent is non-zero, operations that have already sent + * a result to the client are ignored. + * Call this function with conn->c_mutex locked. + */ +int +connection_operations_pending( Connection *conn, Operation *op2ignore, + int test_resultsent ) +{ + Operation *op; + + PR_ASSERT( conn != NULL ); + + for ( op = conn->c_ops; op != NULL; op = op->o_next ) { + if ( op == op2ignore ) { + continue; + } + if ( !test_resultsent || op->o_status != SLAPI_OP_STATUS_RESULT_SENT ) { + break; + } + } + + return( op != NULL ); +} + + +/* Copy the authorization identity from the connection struct into the + * operation struct. We do this late, because an operation might start + * before authentication is complete, at least on an SSL connection. + * We want each operation to get its authorization identity after the + * SSL software has had its chance to finish the SSL handshake; + * that is, after the first few bytes of the request are received. + * In particular, we want the first request from an LDAPS client + * to have an authorization identity derived from the initial SSL + * handshake. + */ +static void +op_copy_identity(Connection *conn, Operation *op) +{ + size_t dnlen; + size_t typelen; + + PR_Lock( conn->c_mutex ); + dnlen= conn->c_dn ? strlen (conn->c_dn) : 0; + typelen= conn->c_authtype ? strlen (conn->c_authtype) : 0; + + slapi_sdn_done(&op->o_sdn); + slapi_ch_free((void **) &(op->o_authtype)); + if (dnlen <= 0 && typelen <= 0) { + op->o_authtype = NULL; + } else { + char* id = slapi_ch_malloc (typelen + 1); + if (typelen <= 0) + id[dnlen+1] = '\0'; + else + memcpy (id, conn->c_authtype, typelen + 1); + slapi_sdn_set_dn_byval(&op->o_sdn,conn->c_dn); + op->o_authtype = id; + } + /* XXX We should also copy c_client_cert into *op here; it's + * part of the authorization identity. The operation's copy + * (not c_client_cert) should be used for access control. + */ + + /* copy isroot flag as well so root DN privileges are preserved */ + op->o_isroot = conn->c_isroot; + PR_Unlock( conn->c_mutex ); +} + + +static int +is_ber_too_big(const Connection *conn,unsigned long ber_len) +{ + unsigned long maxbersize= config_get_maxbersize(); + if(ber_len>maxbersize) + { + log_ber_too_big_error(conn, ber_len, maxbersize); + return 1; + } + return 0; +} + + +/* + * Pass 0 for maxbersize if you do not have it handy. It is also OK to pass + * 0 for ber_len, in which case a slightly less informative message is + * logged. + */ +static void +log_ber_too_big_error(const Connection *conn, unsigned long ber_len, + unsigned long maxbersize) +{ + if (0 == maxbersize) { + maxbersize= config_get_maxbersize(); + } + if (0 == ber_len) { + slapi_log_error( SLAPI_LOG_FATAL, "connection", + "conn=%d fd=%d Incoming BER Element was too long, max allowable" + " is %ld bytes. Change the nsslapd-maxbersize attribute in" + " cn=config to increase.\n", + conn->c_connid, conn->c_sd, maxbersize ); + } else { + slapi_log_error( SLAPI_LOG_FATAL, "connection", + "conn=%d fd=%d Incoming BER Element was %ld bytes, max allowable" + " is %ld bytes. Change the nsslapd-maxbersize attribute in" + " cn=config to increase.\n", + conn->c_connid, conn->c_sd, ber_len, maxbersize ); + } +} + + +void +disconnect_server( Connection *conn, int opconnid, int opid, PRErrorCode reason, PRInt32 error ) +{ + PR_Lock( conn->c_mutex ); + disconnect_server_nomutex( conn, opconnid, opid, reason, error ); + PR_Unlock( conn->c_mutex ); +} + +static ps_wakeup_all_fn_ptr ps_wakeup_all_fn = NULL; + +/* + * disconnect_server - close a connection. takes the connection to close, + * the connid associated with the operation generating the close (so we + * don't accidentally close a connection that's not ours), and the opid + * of the operation generating the close (for logging purposes). + */ + +void +disconnect_server_nomutex( Connection *conn, int opconnid, int opid, PRErrorCode reason, PRInt32 error ) +{ + if ( ( conn->c_sd != SLAPD_INVALID_SOCKET && + conn->c_connid == opconnid ) && !(conn->c_flags & CONN_FLAG_CLOSING) ) { + + /* + * PR_Close must be called before anything else is done because + * of NSPR problem on NT which requires that the socket on which + * I/O timed out is closed before any other I/O operation is + * attempted by the thread. + * WARNING : As of today the current code does not fulfill the + * requirements above. + */ + + /* Mark that the socket should be closed on this connection. + * We don't want to actually close the socket here, because + * the listener thread could be PR_Polling over it right now. + * The last thread to stop using the connection will do the closing. + */ + conn->c_flags |= CONN_FLAG_CLOSING; + g_decrement_current_conn_count(); + + /* + * Print the error captured above. + */ + if (error && (EPIPE != error) ) { + slapi_log_access( LDAP_DEBUG_STATS, + "conn=%d op=%d fd=%d closed error %d (%s) - %s\n", + conn->c_connid, opid, conn->c_sd, error, + slapd_system_strerror(error), + slapd_pr_strerror(reason)); + } else { + slapi_log_access( LDAP_DEBUG_STATS, + "conn=%d op=%d fd=%d closed - %s\n", + conn->c_connid, opid, conn->c_sd, + slapd_pr_strerror(reason)); + } + + if (! config_check_referral_mode()) { + PR_AtomicDecrement(g_get_global_snmp_vars()->ops_tbl.dsConnections); + } + + conn->c_gettingber = 0; + connection_abandon_operations( conn ); + + if (! config_check_referral_mode()) { + /* + * If any of the outstanding operations on this + * connection were persistent searches, then + * ding all the persistent searches to get them + * to notice that their operations have been abandoned. + */ + int found_ps = 0; + Operation *o; + + for ( o = conn->c_ops; !found_ps && o != NULL; o = o->o_next ) { + if ( o->o_flags & OP_FLAG_PS ) { + found_ps = 1; + } + } + if ( found_ps ) { + if ( NULL == ps_wakeup_all_fn ) { + if ( get_entry_point( ENTRY_POINT_PS_WAKEUP_ALL, + (caddr_t *)(&ps_wakeup_all_fn )) == 0 ) { + (ps_wakeup_all_fn)(); + } + } else { + (ps_wakeup_all_fn)(); + } + } + } + } +} + +void +connection_abandon_operations( Connection *c ) +{ + Operation *op; + for ( op = c->c_ops; op != NULL; op = op->o_next ) + { + /* abandon the operation only if it is not yet + completed (i.e., no result has been sent yet to + the client */ + if ( op->o_status != SLAPI_OP_STATUS_RESULT_SENT ) { + op->o_status = SLAPI_OP_STATUS_ABANDONED; + } + } +} |