From fcc3ca640bb58d0a7ab57cf135837139f408257c Mon Sep 17 00:00:00 2001 From: Rich Megginson Date: Wed, 1 May 2013 15:39:15 -0600 Subject: [PATCH] do new tcp connection accept in worker thread Each listener socket is now a pseudo Connection object, with the c_flags set to CONN_FLAG_LISTENER. Thus, we can now treat it as the other connection objects. This means we can use the c_gettingber flag (need to rename this!) to turn on or off polling on the listener socket. This is important because after we receive a poll ready on a listener socket, we must not poll on that socket again until the accept() is completed. We use c_gettingber just like with regular sockets, to dynamically add or remove the socket from the poll array. Once the accept() is complete, the c_gettingber flag is set back to 0 to allow another new connection. Instead of calling handle_new_connection directly in the polling thread, just add the new connection request to the pb queue, and call handle_new_connection in a worker thread. Added some locking to connection_table_get_connection because it can now be called from multiple threads, and added locking in a couple of other places that had previously been single threaded. added nsslapd-threaded-accept to cn=config values are 0 (THREAD_ACCEPT_OFF) - default - do the accept for the new connection in the poll thread 1 (THREAD_ACCEPT_ON) - do the accept in a worker thread, pass the read to another worker thread 2 (THREAD_ACCEPT_READ) - do the accept in a worker thread, and do the read in the same worker thread --- ldap/servers/slapd/connection.c | 177 ++++++++++++++++++++++------- ldap/servers/slapd/conntable.c | 47 +++++--- ldap/servers/slapd/daemon.c | 237 +++++++++++++++++++++++++-------------- ldap/servers/slapd/fe.h | 3 + ldap/servers/slapd/libglobs.c | 48 ++++++++- ldap/servers/slapd/proto-slap.h | 2 + ldap/servers/slapd/slap.h | 13 ++ 7 files changed, 380 insertions(+), 147 deletions(-) diff --git a/ldap/servers/slapd/connection.c b/ldap/servers/slapd/connection.c index 1d1adac..bd45369 100644 --- a/ldap/servers/slapd/connection.c +++ b/ldap/servers/slapd/connection.c @@ -52,6 +52,7 @@ #include "slap.h" #include "prcvar.h" #include "prlog.h" /* for PR_ASSERT */ +#include "private/pprio.h" #include "fe.h" #include #if defined(LINUX) @@ -102,12 +103,16 @@ static int op_shutdown= 0; /* if non-zero, server is shutting down */ void connection_done(Connection *conn) { - connection_cleanup(conn); - /* free the private content, the buffer has been freed by above connection_cleanup */ - slapi_ch_free((void**)&conn->c_private); - if (NULL != conn->c_sb) - { - ber_sockbuf_free(conn->c_sb); + if (!CONN_IS_LISTENER(conn->c_flags)) { + connection_cleanup(conn); + /* free the private content, the buffer has been freed by above connection_cleanup */ + slapi_ch_free((void**)&conn->c_private); + if (NULL != conn->c_sb) + { + ber_sockbuf_free(conn->c_sb); + } + /* PAGED_RESULTS */ + pagedresults_cleanup_all(conn, 0); } if (NULL != conn->c_mutex) { @@ -117,8 +122,6 @@ connection_done(Connection *conn) { PR_DestroyLock(conn->c_pdumutex); } - /* PAGED_RESULTS */ - pagedresults_cleanup_all(conn, 0); } /* @@ -1538,6 +1541,7 @@ static int finished_chomping(Connection *conn) */ static int counter= 0; /* JCM Dumb Name */ +static int max_counter=0; /* The connection private structure for UNIX turbo mode */ struct Conn_private @@ -2209,32 +2213,82 @@ connection_threadmain() in connection_activity when the conn is added to the work queue, setup_pr_read_pds won't add the connection prfd to the poll list */ - if (connection_call_io_layer_callbacks(pb->pb_conn)) { + if (pb->pb_conn && !CONN_IS_LISTENER(pb->pb_conn->c_flags) && + connection_call_io_layer_callbacks(pb->pb_conn)) { LDAPDebug0Args( LDAP_DEBUG_ANY, "Error: could not add/remove IO layers from connection\n" ); } default: break; } } else if (NULL == pb) { - + PR_ASSERT(!CONN_IS_LISTENER(conn->c_flags)); /* The turbo mode may cause threads starvation. Do a yield here to reduce the starving */ PR_Sleep(PR_INTERVAL_NO_WAIT); PR_Lock(conn->c_mutex); - /* Make our own pb in turbo mode */ - connection_make_new_pb(&pb,conn); + /* make and setup our own op pb in turbo mode */ if (connection_call_io_layer_callbacks(conn)) { LDAPDebug0Args( LDAP_DEBUG_ANY, "Error: could not add/remove IO layers from connection\n" ); } + if (connection_activity_ext(conn, NULL, &pb)) { + LDAPDebug2Args(LDAP_DEBUG_CONNS, + "error return from connection_activity_ext conn %" NSPRIu64 " closing fd=%d\n", + conn->c_connid,conn->c_sd); + connection_remove_operation( conn, pb->pb_op ); + PR_Unlock(conn->c_mutex); + slapi_pblock_destroy(pb); + pb = NULL; + continue; + } PR_Unlock(conn->c_mutex); - if (! config_check_referral_mode()) { - slapi_counter_increment(ops_initiated); - slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsInOps); + } + /* Once we're here we have a pb */ + /* now see if this is a pb for an op, or a pb for a new connection */ + if (pb && pb->pb_conn && CONN_IS_LISTENER(pb->pb_conn->c_flags)) { + Slapi_PBlock **newpb = NULL; + int threaded_accept = config_get_threaded_accept(); + int rc = 0; + int listenfd = PR_FileDesc2NativeHandle(pb->pb_conn->c_prfd); /* for logging only */ + + PR_ASSERT(!(thread_turbo_flag || more_data)); + /* grab the new connection */ + rc = handle_new_connection(pb->pb_ct, pb->pb_conn, &conn, 1); + /* after this - do not access pb->pb_conn - we have no ref to it any more */ + slapi_pblock_destroy(pb); /* pb for the listen conn */ + pb = NULL; + if (rc) { + LDAPDebug1Arg(LDAP_DEBUG_CONNS, "Error accepting new connection listenfd=%d\n", + listenfd); + continue; + } + PR_Lock(conn->c_mutex); + if (threaded_accept == THREADED_ACCEPT_READ) { + newpb = &pb; /* pass a real pb so connection_activity_ext will create our op */ + /* we will read here so need to do the io layer callbacks */ + if (connection_call_io_layer_callbacks(conn)) { + LDAPDebug0Args( LDAP_DEBUG_ANY, "Error: could not add/remove IO layers from connection\n" ); + } + } + rc = connection_activity_ext(conn, NULL, newpb); + if (rc) { + LDAPDebug2Args(LDAP_DEBUG_CONNS, + "error return from connection_activity_ext conn %" NSPRIu64 " closing fd=%d\n", + conn->c_connid,conn->c_sd); + connection_remove_operation( conn, pb->pb_op ); + slapi_pblock_destroy(pb); + pb = NULL; + PR_Unlock(conn->c_mutex); + continue; + } else if (threaded_accept == THREADED_ACCEPT_READ) { + connection_acquire_nolock(conn); /* we are going to process it here */ + PR_Unlock(conn->c_mutex); + } else { /* we queued the connection for another worker thread */ + PR_Unlock(conn->c_mutex); + continue; } } - /* Once we're here we have a pb */ conn = pb->pb_conn; op = pb->pb_op; @@ -2401,42 +2455,77 @@ done: } /* thread need to hold conn->c_mutex before calling this function */ +/* if conn is given, this function will assume it is setting up the + * next operation on this connection - otherwise, it assumes this is + * a new connection request that needs an accept() + */ int -connection_activity(Connection *conn) +connection_activity_ext(Connection *conn, Connection_Table *ct, Slapi_PBlock **retpb) { - Slapi_PBlock *pb; - - connection_make_new_pb(&pb, conn); - - /* Add pb to the end of the work queue. */ - add_pb( pb ); + Slapi_PBlock *pb = NULL; - /* Check if exceed the max thread per connection. If so, increment - c_pbwait. Otherwise increment the counter and notify the cond. var. - there is work to do. */ - - if (connection_acquire_nolock (conn) == -1) { - LDAPDebug(LDAP_DEBUG_CONNS, - "could not acquire lock in connection_activity as conn %" NSPRIu64 " closing fd=%d\n", - conn->c_connid,conn->c_sd,0); - /* XXX how to handle this error? */ - /* MAB: 25 Jan 01: let's return on error and pray this won't leak */ - return (-1); + if (CONN_IS_LISTENER(conn->c_flags)) { + pb = slapi_pblock_new(); + pb->pb_ct = ct; + pb->pb_conn = conn; + } else { + /* established conn - make a new pb for an operation */ + connection_make_new_pb(&pb, conn); } conn->c_gettingber = 1; - conn->c_threadnumber++; - PR_Lock( op_thread_lock ); - counter++; - PR_NotifyCondVar( op_thread_cv ); - PR_Unlock( op_thread_lock ); - - if (! config_check_referral_mode()) { - slapi_counter_increment(ops_initiated); - slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsInOps); + + /* if pblock was given, just return the pblock to the caller - otherwise, + * add it to the work queue + */ + if ( !retpb ) { + conn->c_threadnumber++; + /* Check if exceed the max thread per connection. If so, increment + c_pbwait. Otherwise increment the counter and notify the cond. var. + there is work to do. */ + /* we are putting the connection on the work queue, so acquire a reference here + * this reference will be released in connection_threadmain + */ + if (connection_acquire_nolock (conn) == -1) { + LDAPDebug(LDAP_DEBUG_CONNS, + "could not acquire lock in connection_activity as conn %" NSPRIu64 " closing fd=%d\n", + conn->c_connid,conn->c_sd,0); + slapi_pblock_destroy(pb); + pb = NULL; + return (-1); + } + + /* Add pb to the end of the work queue. */ + add_pb( pb ); + /* we have to do this in order to + * 1) wake up connection_wait_for_new_pb to read the next thing off the queue + * 2) tell a thread in turbo mode that there are pending items in the queue + */ + PR_Lock( op_thread_lock ); + counter++; + if (counter > max_counter) { + max_counter = counter; + } + PR_NotifyCondVar( op_thread_cv ); + PR_Unlock( op_thread_lock ); + } else { + *retpb = pb; } + pb = NULL; + + if (!CONN_IS_LISTENER(conn->c_flags)) { + if (! config_check_referral_mode()) { + slapi_counter_increment(ops_initiated); + slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsInOps); + } + } /* else not an operation */ return 0; } +int +connection_activity(Connection *conn) +{ + return connection_activity_ext(conn, NULL, NULL); +} /* add_pb(): will add a pb to the end of the global work queue. The work queue is implemented as a singal link list. */ @@ -2515,7 +2604,7 @@ op_thread_cleanup() interval = PR_SecondsToInterval(3); #endif LDAPDebug( LDAP_DEBUG_ANY, - "slapd shutting down - signaling operation threads\n", 0, 0, 0); + "slapd shutting down - signaling operation threads - max work q size %d\n", max_counter, 0, 0); PR_Lock( op_thread_lock ); op_shutdown = 1; diff --git a/ldap/servers/slapd/conntable.c b/ldap/servers/slapd/conntable.c index 7cf9f31..10d7458 100644 --- a/ldap/servers/slapd/conntable.c +++ b/ldap/servers/slapd/conntable.c @@ -141,7 +141,7 @@ connection_table_abandon_all_operations(Connection_Table *ct) int i; for ( i = 0; i < ct->size; i++ ) { - if ( ct->c[i].c_mutex != NULL ) + if ( ( ct->c[i].c_mutex != NULL ) && !CONN_IS_LISTENER(ct->c[i].c_flags) ) { PR_Lock( ct->c[i].c_mutex ); connection_abandon_operations( &ct->c[i] ); @@ -153,19 +153,19 @@ connection_table_abandon_all_operations(Connection_Table *ct) /* Given a file descriptor for a socket, this function will return * a slot in the connection table to use. * - * Note: this function is only called from the slapd_daemon (listener) - * thread, which means it will never be called by two threads at - * the same time. + * This function is called from the accept worker thread so the table + * must be locked * * Returns a Connection on success * Returns NULL on failure */ Connection * -connection_table_get_connection(Connection_Table *ct, int sd) +connection_table_get_connection_ext(Connection_Table *ct, int sd, unsigned int flags) { Connection *c= NULL; int index, count; + PR_Lock(ct->table_mutex); index = sd % ct->size; for( count = 0; count < ct->size; count++, index = (index + 1) % ct->size) { @@ -194,10 +194,8 @@ connection_table_get_connection(Connection_Table *ct, int sd) PR_ASSERT(c->c_extension==NULL); if ( c->c_mutex == NULL ) { - PR_Lock( ct->table_mutex ); c->c_mutex = PR_NewLock(); c->c_pdumutex = PR_NewLock(); - PR_Unlock( ct->table_mutex ); if ( c->c_mutex == NULL || c->c_pdumutex == NULL ) { c->c_mutex = NULL; @@ -207,20 +205,27 @@ connection_table_get_connection(Connection_Table *ct, int sd) } } /* Let's make sure there's no cruft left on there from the last time this connection was used. */ - /* Note: no need to lock c->c_mutex because this function is only - * called by one thread (the slapd_daemon thread), and if we got this - * far then `c' is not being used by any operation threads, etc. - */ - connection_cleanup(c); + /* have to lock c->c_mutex here because we may be called from the listener worker thread */ + if (c && !CONN_IS_LISTENER(flags)) { + PR_Lock(c->c_mutex); + connection_cleanup(c); + PR_Unlock(c->c_mutex); + } } else { /* couldn't find a Connection */ LDAPDebug( LDAP_DEBUG_CONNS, "max open connections reached\n", 0, 0, 0); } + PR_Unlock(ct->table_mutex); return c; } +Connection * +connection_table_get_connection(Connection_Table *ct, int sd) +{ + return connection_table_get_connection_ext(ct, sd, 0); +} /* active connection iteration functions */ Connection* @@ -246,9 +251,11 @@ int connection_table_iterate_active_connections(Connection_Table *ct, void* arg, PR_Lock(ct->table_mutex); current_conn = connection_table_get_first_active_connection (ct); while (current_conn) { - ret = f(current_conn, arg); - if (ret) { - break; + if (!CONN_IS_LISTENER(current_conn->c_flags)) { + ret = f(current_conn, arg); + if (ret) { + break; + } } current_conn = connection_table_get_next_active_connection (ct, current_conn); } @@ -275,7 +282,9 @@ connection_table_dump_active_connections (Connection_Table *ct) c = connection_table_get_first_active_connection (ct); while (c) { - connection_table_dump_active_connection (c); + if (!CONN_IS_LISTENER(c->c_flags)) { + connection_table_dump_active_connection (c); + } c = connection_table_get_next_active_connection (ct, c); } @@ -400,7 +409,7 @@ connection_table_as_entry(Connection_Table *ct, Slapi_Entry *e) PR_Unlock( ct->table_mutex ); PR_Lock( ct->c[i].c_mutex ); - if ( ct->c[i].c_sd != SLAPD_INVALID_SOCKET ) + if ( !CONN_IS_LISTENER(ct->c[i].c_flags) && (ct->c[i].c_sd != SLAPD_INVALID_SOCKET ) ) { char buf2[20]; int lendn = ct->c[i].c_dn ? strlen(ct->c[i].c_dn) : 6; /* "NULLDN" */ @@ -477,7 +486,7 @@ connection_table_dump_activity_to_errors_log(Connection_Table *ct) for ( i = 0; i < ct->size; i++ ) { Connection *c= &(ct->c[i]); - if ( c->c_mutex != NULL ) + if ( !CONN_IS_LISTENER(c->c_flags) && (c->c_mutex != NULL) ) { /* Find the connection we are referring to */ int j= c->c_fdi; @@ -511,7 +520,7 @@ connection_table_dump(Connection_Table *ct) fprintf(file, "=============pid=%d==================\n", getpid()); for ( i = 0; i < ct->size; i++ ) { - if ( (ct->c[i].c_sd == SLAPD_INVALID_SOCKET) && (ct->c[i].c_connid == 0) ) + if ( CONN_IS_LISTENER(ct->c[i].c_flags) || ( (ct->c[i].c_sd == SLAPD_INVALID_SOCKET) && (ct->c[i].c_connid == 0) ) ) { continue; } diff --git a/ldap/servers/slapd/daemon.c b/ldap/servers/slapd/daemon.c index 21fe644..e653ca4 100644 --- a/ldap/servers/slapd/daemon.c +++ b/ldap/servers/slapd/daemon.c @@ -114,6 +114,8 @@ int slapd_wakeup_timer = SLAPD_WAKEUP_TIMER; /* time in ms to wakeup */ short slapd_housekeeping_timer = 10; #endif /* notdef GGOODREPL */ +static PRUint32 num_accepts, num_timeouts, num_errs; + /* Do we support timeout on socket send() ? */ int have_send_timeouts = 0; @@ -130,9 +132,7 @@ void disk_monitoring_stop(); typedef struct listener_info { int idx; /* index of this listener in the ct->fd array */ - PRFileDesc *listenfd; /* the listener fd */ - int secure; - int local; + Connection *listenc; /* the listener Connection */ } listener_info; #define SLAPD_POLL_LISTEN_READY(xxflagsxx) (xxflagsxx & PR_POLL_READ) @@ -283,28 +283,53 @@ syn_scan (int sock) #endif static int -accept_and_configure(int s, PRFileDesc *pr_acceptfd, PRNetAddr *pr_netaddr, - int addrlen, int secure, int local, PRFileDesc **pr_clonefd) +accept_and_configure(Connection *listenc, PRNetAddr *pr_netaddr, + int addrlen, PRFileDesc **pr_clonefd, int ssl, int local) { int ns = 0; + int threaded_accept = config_get_threaded_accept(); - PRIntervalTime pr_timeout = PR_MillisecondsToInterval(slapd_wakeup_timer); + PRIntervalTime pr_timeout = PR_INTERVAL_NO_WAIT; #if !defined( XP_WIN32 ) /* UNIX */ - (*pr_clonefd) = PR_Accept(pr_acceptfd, pr_netaddr, pr_timeout); + if (threaded_accept != THREADED_ACCEPT_OFF) { + PR_Lock( listenc->c_mutex ); + } + (*pr_clonefd) = PR_Accept(listenc->c_prfd, pr_netaddr, pr_timeout); + if (threaded_accept != THREADED_ACCEPT_OFF) { + listenc->c_gettingber = 0; /* can accept new connections again i.e. add listener back to poll list */ + listenc->c_threadnumber--; /* thread is done with this listener conn */ + signal_listner(); /* tell poll we can now accept new connection requests */ + connection_release_nolock (listenc); /* ref was acquired when listenc was queued in connection_activity_ext */ + PR_Unlock( listenc->c_mutex ); + /* after this - do not use listenc any more - we don't have a ref on it or a lock */ + } /* otherwise, we're already in the polling thread */ if( !(*pr_clonefd) ) { PRErrorCode prerr = PR_GetError(); - LDAPDebug( LDAP_DEBUG_ANY, "PR_Accept() failed, " - SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", - prerr, slapd_pr_strerror(prerr), 0 ); + if (prerr == PR_IO_TIMEOUT_ERROR) { + LDAPDebug2Args( LDAP_DEBUG_CONNS, "PR_Accept() timed out, nothing to accept, " + SLAPI_COMPONENT_NAME_NSPR " error %d (%s) %d\n", + prerr, slapd_pr_strerror(prerr) ); + num_timeouts++; + + } else { + LDAPDebug2Args( LDAP_DEBUG_ANY, "PR_Accept() failed, " + SLAPI_COMPONENT_NAME_NSPR " error %d (%s) %d\n", + prerr, slapd_pr_strerror(prerr) ); + num_errs++; + } return(SLAPD_INVALID_SOCKET); } + num_accepts++; - ns = configure_pr_socket( pr_clonefd, secure, local ); + ns = configure_pr_socket( pr_clonefd, ssl, local ); #else /* Windows */ if( secure ) { - (*pr_clonefd) = PR_Accept(pr_acceptfd, pr_netaddr, pr_timeout); + PR_Lock( listenc->c_mutex ); + (*pr_clonefd) = PR_Accept(listenc->c_prfd, pr_netaddr, pr_timeout); + listenc->c_gettingber = 0; /* done with accept */ + PR_Unlock( listenc->c_mutex ); if( !(*pr_clonefd) ) { PRErrorCode prerr = PR_GetError(); LDAPDebug( LDAP_DEBUG_ANY, "PR_Accept() failed, " @@ -314,7 +339,7 @@ accept_and_configure(int s, PRFileDesc *pr_acceptfd, PRNetAddr *pr_netaddr, /* Bug 613324: Call PR_NT_CancelIo if an error occurs */ if( (prerr == PR_IO_TIMEOUT_ERROR ) || (prerr == PR_PENDING_INTERRUPT_ERROR) ) { - if( (PR_NT_CancelIo( pr_acceptfd )) != PR_SUCCESS) { + if( (PR_NT_CancelIo( listenc->c_prfd )) != PR_SUCCESS) { prerr = PR_GetError(); LDAPDebug( LDAP_DEBUG_ANY, "PR_NT_CancelIo() failed, " @@ -332,7 +357,10 @@ accept_and_configure(int s, PRFileDesc *pr_acceptfd, PRNetAddr *pr_netaddr, struct sockaddr *addr; /* NOT IPv6 enabled */ addr = (struct sockaddr *) slapi_ch_malloc( sizeof(struct sockaddr) ); - ns = accept (s, addr, (TCPLEN_T *)&addrlen); + PR_Lock( listenc->c_mutex ); + ns = accept (listenc->c_sd, addr, (TCPLEN_T *)&addrlen); + listenc->c_gettingber = 0; /* done with accept */ + PR_Unlock( listenc->c_mutex ); if (ns == SLAPD_INVALID_SOCKET) { int oserr = errno; @@ -374,7 +402,6 @@ static void set_timeval_ms(struct timeval *t, int ms); #endif /* GGOODREPL static void handle_timeout( void ); */ static void handle_pr_read_ready(Connection_Table *ct, PRIntn num_poll); -static int handle_new_connection(Connection_Table *ct, int tcps, PRFileDesc *pr_acceptfd, int secure, int local ); #ifdef _WIN32 static void unfurl_banners(Connection_Table *ct,daemon_ports_t *ports, int n_tcps, PRFileDesc *s_tcps); #else @@ -922,19 +949,31 @@ static void handle_listeners(Connection_Table *ct, listener_info *listener_idxs, int n_listeners) { int idx; + int rc; + int threaded_accept = config_get_threaded_accept(); for (idx = 0; idx < n_listeners; ++idx) { int fdidx = listener_idxs[idx].idx; - PRFileDesc *listenfd = listener_idxs[idx].listenfd; - int secure = listener_idxs[idx].secure; - int local = listener_idxs[idx].local; - if (fdidx && listenfd) { + Connection *listenc = listener_idxs[idx].listenc; + if (fdidx && listenc) { if (SLAPD_POLL_LISTEN_READY(ct->fd[fdidx].out_flags)) { - /* accept() the new connection, put it on the active list for handle_pr_read_ready */ - int rc = handle_new_connection(ct, SLAPD_INVALID_SOCKET, listenfd, secure, local); - if (rc) { - LDAPDebug1Arg(LDAP_DEBUG_CONNS, "Error accepting new connection listenfd=%d\n", - PR_FileDesc2NativeHandle(listenfd)); - continue; + if (threaded_accept == THREADED_ACCEPT_OFF) { + /* accept() the new connection, put it on the active list for handle_pr_read_ready */ + rc = handle_new_connection(ct, listenc, NULL, 0); + if (rc) { + LDAPDebug1Arg(LDAP_DEBUG_CONNS, "Error accepting new connection listenfd=%d\n", + PR_FileDesc2NativeHandle(listenc->c_prfd)); + continue; + } + } else { + /* queue the accept for a worker thread */ + PR_Lock(listenc->c_mutex); + rc = connection_activity_ext(listenc, ct, NULL); + PR_Unlock(listenc->c_mutex); + if (rc) { + LDAPDebug1Arg(LDAP_DEBUG_CONNS, "Error queueing new connection listenfd=%d\n", + PR_FileDesc2NativeHandle(listenc->c_prfd)); + continue; + } } } } @@ -1160,7 +1199,6 @@ void slapd_daemon( daemon_ports_t *ports ) int oserr; #endif int select_return = 0; - #ifndef _WIN32 PRErrorCode prerr; #endif @@ -1368,6 +1406,8 @@ void slapd_daemon( daemon_ports_t *ports ) time_shutdown = 1; PR_JoinThread( time_thread_p ); + LDAPDebug( LDAP_DEBUG_ANY, "slapd_daemon finished num_accepts [%d] num_timeouts [%d] num_errs [%d]\n", + num_accepts, num_timeouts, num_errs ); #ifdef _WIN32 WSACleanup(); #else @@ -1588,6 +1628,7 @@ setup_pr_read_pds(Connection_Table *ct, PRFileDesc **n_tcps, PRFileDesc **s_tcps ct->fd[count].fd = NULL; #endif count++; + listen_addr_count = count; /* The fds entry for n_tcps starts with n_tcps and less than n_tcpe */ ct->n_tcps = count; @@ -1595,14 +1636,18 @@ setup_pr_read_pds(Connection_Table *ct, PRFileDesc **n_tcps, PRFileDesc **s_tcps { PRFileDesc **fdesc = NULL; for (fdesc = n_tcps; fdesc && *fdesc; fdesc++, count++) { + Connection *c = NULL; + int lfd = PR_FileDesc2NativeHandle(*fdesc); ct->fd[count].fd = *fdesc; ct->fd[count].in_flags = SLAPD_POLL_FLAGS; ct->fd[count].out_flags = 0; - listener_idxs[n_listeners].listenfd = *fdesc; - listener_idxs[n_listeners].idx = count; - n_listeners++; LDAPDebug( LDAP_DEBUG_HOUSE, "listening for connections on %d\n", socketdesc, 0, 0 ); + c = connection_table_get_connection_ext(ct, lfd, CONN_FLAG_LISTENER); + c->c_sd = lfd; + c->c_prfd = *fdesc; + c->c_flags |= CONN_FLAG_LISTENER; + connection_table_move_connection_on_to_active_list(ct, c); } } else { ct->fd[count].fd = NULL; @@ -1616,15 +1661,18 @@ setup_pr_read_pds(Connection_Table *ct, PRFileDesc **n_tcps, PRFileDesc **s_tcps { PRFileDesc **fdesc = NULL; for (fdesc = s_tcps; fdesc && *fdesc; fdesc++, count++) { + Connection *c = NULL; + int lfd = PR_FileDesc2NativeHandle(*fdesc); ct->fd[count].fd = *fdesc; ct->fd[count].in_flags = SLAPD_POLL_FLAGS; ct->fd[count].out_flags = 0; - listener_idxs[n_listeners].listenfd = *fdesc; - listener_idxs[n_listeners].idx = count; - listener_idxs[n_listeners].secure = 1; - n_listeners++; LDAPDebug( LDAP_DEBUG_HOUSE, "listening for SSL connections on %d\n", socketdesc, 0, 0 ); + c = connection_table_get_connection_ext(ct, lfd, CONN_FLAG_LISTENER); + c->c_sd = lfd; + c->c_prfd = *fdesc; + c->c_flags |= CONN_FLAG_LISTENER|CONN_FLAG_SSL; + connection_table_move_connection_on_to_active_list(ct, c); } } else { ct->fd[count].fd = NULL; @@ -1641,15 +1689,19 @@ setup_pr_read_pds(Connection_Table *ct, PRFileDesc **n_tcps, PRFileDesc **s_tcps { PRFileDesc **fdesc = NULL; for (fdesc = i_unix; fdesc && *fdesc; fdesc++, count++) { + Connection *c = NULL; + int lfd = PR_FileDesc2NativeHandle(*fdesc); ct->fd[count].fd = *fdesc; ct->fd[count].in_flags = SLAPD_POLL_FLAGS; ct->fd[count].out_flags = 0; - listener_idxs[n_listeners].listenfd = *fdesc; - listener_idxs[n_listeners].idx = count; - listener_idxs[n_listeners].local = 1; - n_listeners++; LDAPDebug( LDAP_DEBUG_HOUSE, "listening for LDAPI connections on %d\n", socketdesc, 0, 0 ); + c = connection_table_get_connection_ext(ct, lfd, CONN_FLAG_LISTENER); + c->c_sd = lfd; + c->c_prfd = *fdesc; + c->c_flags |= CONN_FLAG_LISTENER; + c->c_unix_local = 1; + connection_table_move_connection_on_to_active_list(ct, c); } } else { ct->fd[count].fd = NULL; @@ -1660,12 +1712,6 @@ setup_pr_read_pds(Connection_Table *ct, PRFileDesc **n_tcps, PRFileDesc **s_tcps #endif first_time_setup_pr_read_pds = 0; - listen_addr_count = count; - - if (n_listeners < max_listeners) { - listener_idxs[n_listeners].idx = 0; - listener_idxs[n_listeners].listenfd = NULL; - } } /* count is the number of entries we've place in the fds array. @@ -1708,7 +1754,7 @@ setup_pr_read_pds(Connection_Table *ct, PRFileDesc **n_tcps, PRFileDesc **s_tcps { int add_fd = 1; /* check timeout for PAGED RESULTS */ - if (pagedresults_is_timedout_nolock(c)) + if (!CONN_IS_LISTENER(c->c_flags) && pagedresults_is_timedout_nolock(c)) { /* Exceeded the timelimit; disconnect the client */ disconnect_server_nomutex(c, c->c_connid, -1, @@ -1725,6 +1771,11 @@ setup_pr_read_pds(Connection_Table *ct, PRFileDesc **n_tcps, PRFileDesc **s_tcps /* slot i of the connection table is mapped to slot * count of the fds array */ c->c_fdi = count; + if (CONN_IS_LISTENER(c->c_flags)) { + listener_idxs[n_listeners].listenc = c; + listener_idxs[n_listeners].idx = count; + n_listeners++; + } count++; } } @@ -1738,6 +1789,11 @@ setup_pr_read_pds(Connection_Table *ct, PRFileDesc **n_tcps, PRFileDesc **s_tcps c = next; } + if (n_listeners < max_listeners) { + listener_idxs[n_listeners].listenc = NULL; + listener_idxs[n_listeners].idx = 0; + } + if( num_to_read ) (*num_to_read) = count; @@ -1896,7 +1952,6 @@ handle_read_ready(Connection_Table *ct, fd_set *readfds) } #endif /* _WIN32 */ - static void handle_pr_read_ready(Connection_Table *ct, PRIntn num_poll) { @@ -1993,6 +2048,10 @@ handle_pr_read_ready(Connection_Table *ct, PRIntn num_poll) if ( c->c_mutex != NULL ) { PR_Lock( c->c_mutex ); + if ( CONN_IS_LISTENER( c->c_flags ) ) { + PR_Unlock( c->c_mutex ); + continue; /* already handled in slapd_daemon handle_listeners */ + } if ( connection_is_active_nolock (c) && c->c_gettingber == 0 ) { PRInt16 out_flags; @@ -2015,8 +2074,10 @@ handle_pr_read_ready(Connection_Table *ct, PRIntn num_poll) LDAPDebug( LDAP_DEBUG_CONNS, "POLL_FN() says connection on sd %d is bad " "(closing)\n", c->c_sd, 0, 0 ); - disconnect_server_nomutex( c, c->c_connid, -1, - SLAPD_DISCONNECT_POLL, EPIPE ); + if (!CONN_IS_LISTENER(c->c_flags)) { + disconnect_server_nomutex( c, c->c_connid, -1, + SLAPD_DISCONNECT_POLL, EPIPE ); + } } else if ( readready ) { @@ -2025,19 +2086,17 @@ handle_pr_read_ready(Connection_Table *ct, PRIntn num_poll) "read activity on %d\n", c->c_ci, 0, 0 ); c->c_idlesince = curtime; - /* This is where the work happens ! */ - /* MAB: 25 jan 01, error handling added */ - if ((connection_activity( c )) == -1) { + if ((connection_activity_ext( c, ct, NULL )) == -1) { /* This might happen as a result of * trying to acquire a closing connection */ LDAPDebug (LDAP_DEBUG_ANY, - "connection_activity: abandoning conn %" NSPRIu64 " as fd=%d is already closing\n", - c->c_connid,c->c_sd,0); + "connection_activity: abandoning conn %" NSPRIu64 " as fd=%d is already closing\n", + c->c_connid,c->c_sd,0); /* The call disconnect_server should do nothing, * as the connection c should be already set to CLOSING */ disconnect_server_nomutex( c, c->c_connid, -1, - SLAPD_DISCONNECT_POLL, EPIPE ); + SLAPD_DISCONNECT_POLL, EPIPE ); } } else if (( idletimeout = compute_idletimeout( @@ -2594,43 +2653,52 @@ bail: #endif /* ENABLE_AUTOBIND */ #endif /* ENABLE_LDAPI */ -/* NOTE: this routine is not reentrant */ -static int -handle_new_connection(Connection_Table *ct, int tcps, PRFileDesc *pr_acceptfd, int secure, int local) +/* NOTE: this routine may be called at the same time from multiple threads + * listenc->c_mutex must be locked + */ +int +handle_new_connection(Connection_Table *ct, Connection *listenc, Connection **conn, int gettingber) { int ns = 0; - Connection *conn = NULL; /* struct sockaddr_in from;*/ PRNetAddr from; PRFileDesc *pr_clonefd = NULL; + Connection *newconn; + time_t curtime = current_time(); + int secure = listenc->c_flags & CONN_FLAG_SSL; + int local = listenc->c_unix_local; + if (conn) { + *conn = NULL; + } memset(&from, 0, sizeof(from)); /* reset to nulls so we can see what was set */ - if ( (ns = accept_and_configure( tcps, pr_acceptfd, &from, - sizeof(from), secure, local, &pr_clonefd)) == SLAPD_INVALID_SOCKET ) { + if ( (ns = accept_and_configure( listenc, &from, sizeof(from), &pr_clonefd, secure, local)) == SLAPD_INVALID_SOCKET ) { return -1; } + /* after this - do not use listenc any more - we don't have a ref on it or a lock */ /* get a new Connection from the Connection Table */ - conn= connection_table_get_connection(ct,ns); - if(conn==NULL) + newconn= connection_table_get_connection(ct,ns); + if(newconn==NULL) { - PR_Close(pr_acceptfd); return -1; } - PR_Lock( conn->c_mutex ); + PR_Lock( newconn->c_mutex ); #if defined( XP_WIN32 ) if( !secure ) - ber_sockbuf_set_option(conn->c_sb,LBER_SOCKBUF_OPT_DESC,&ns); + ber_sockbuf_set_option(newconn->c_sb,LBER_SOCKBUF_OPT_DESC,&ns); #endif - conn->c_sd = ns; - conn->c_prfd = pr_clonefd; - conn->c_flags &= ~CONN_FLAG_CLOSING; + newconn->c_sd = ns; + newconn->c_prfd = pr_clonefd; + newconn->c_flags &= ~CONN_FLAG_CLOSING; + newconn->c_idlesince = curtime; + newconn->c_gettingber = gettingber; /* Store the fact that this new connection is an SSL connection */ if (secure) { - conn->c_flags |= CONN_FLAG_SSL; + newconn->c_flags |= CONN_FLAG_SSL; } #ifndef _WIN32 @@ -2648,8 +2716,8 @@ handle_new_connection(Connection_Table *ct, int tcps, PRFileDesc *pr_acceptfd, i #endif #if defined(USE_OPENLDAP) - ber_sockbuf_add_io( conn->c_sb, &openldap_sockbuf_io, - LBER_SBIOD_LEVEL_PROVIDER, conn ); + ber_sockbuf_add_io( newconn->c_sb, &openldap_sockbuf_io, + LBER_SBIOD_LEVEL_PROVIDER, newconn ); #else /* !USE_OPENLDAP */ { struct lber_x_ext_io_fns func_pointers; @@ -2663,7 +2731,7 @@ handle_new_connection(Connection_Table *ct, int tcps, PRFileDesc *pr_acceptfd, i #else func_pointers.lbextiofn_socket_arg = (struct lextiof_socket_private *) pr_clonefd; #endif - ber_sockbuf_set_option( conn->c_sb, + ber_sockbuf_set_option( newconn->c_sb, LBER_SOCKBUF_OPT_EXT_IO_FNS, &func_pointers); } #endif /* !USE_OPENLDAP */ @@ -2672,7 +2740,7 @@ handle_new_connection(Connection_Table *ct, int tcps, PRFileDesc *pr_acceptfd, i /* Prepare to handle the client's certificate (if any): */ int rv; - rv = slapd_ssl_handshakeCallback (conn->c_prfd, (void*)handle_handshake_done, conn); + rv = slapd_ssl_handshakeCallback (newconn->c_prfd, (void*)handle_handshake_done, newconn); if (rv < 0) { PRErrorCode prerr = PR_GetError(); @@ -2680,47 +2748,50 @@ handle_new_connection(Connection_Table *ct, int tcps, PRFileDesc *pr_acceptfd, i SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", rv, prerr, slapd_pr_strerror( prerr )); } - rv = slapd_ssl_badCertHook (conn->c_prfd, (void*)handle_bad_certificate, conn); + rv = slapd_ssl_badCertHook (newconn->c_prfd, (void*)handle_bad_certificate, newconn); if (rv < 0) { PRErrorCode prerr = PR_GetError(); LDAPDebug (LDAP_DEBUG_ANY, "SSL_BadCertHook(%i) %i " SLAPI_COMPONENT_NAME_NSPR " error %d\n", - conn->c_sd, rv, prerr); + newconn->c_sd, rv, prerr); } } - connection_reset(conn, ns, &from, sizeof(from), secure); + connection_reset(newconn, ns, &from, sizeof(from), secure); /* Call the plugin extension constructors */ - conn->c_extension = factory_create_extension(connection_type,conn,NULL /* Parent */); + newconn->c_extension = factory_create_extension(connection_type,newconn,NULL /* Parent */); #if defined(ENABLE_LDAPI) #if !defined( XP_WIN32 ) /* ldapi */ if( local ) { - conn->c_unix_local = 1; - conn->c_local_ssf = config_get_localssf(); - slapd_identify_local_user(conn); + newconn->c_unix_local = 1; + newconn->c_local_ssf = config_get_localssf(); + slapd_identify_local_user(newconn); } #endif #endif /* ENABLE_LDAPI */ - connection_new_private(conn); + connection_new_private(newconn); /* Add this connection slot to the doubly linked list of active connections. This * list is used to find the connections that should be used in the poll call. This * connection will be added directly after slot 0 which serves as the head of the list. * This must be done as the very last thing before we unlock the mutex, because once it * is added to the active list, it is live. */ - if ( conn != NULL && conn->c_next == NULL && conn->c_prev == NULL ) + if ( newconn != NULL && newconn->c_next == NULL && newconn->c_prev == NULL ) { /* Now give the new connection to the connection code */ - connection_table_move_connection_on_to_active_list(the_connection_table,conn); + connection_table_move_connection_on_to_active_list(the_connection_table,newconn); } - PR_Unlock( conn->c_mutex ); + PR_Unlock( newconn->c_mutex ); + if (conn) { + *conn = newconn; + } g_increment_current_conn_count(); diff --git a/ldap/servers/slapd/fe.h b/ldap/servers/slapd/fe.h index 3a985cb..e23340f 100644 --- a/ldap/servers/slapd/fe.h +++ b/ldap/servers/slapd/fe.h @@ -107,6 +107,7 @@ void SVRCORE_DestroyNTUserPinObj(SVRCORENTUserPinObj *obj); * connection.c */ void connection_abandon_operations( Connection *conn ); +int connection_activity_ext( Connection *conn, struct connection_table *ct, Slapi_PBlock **pb ); int connection_activity( Connection *conn ); void init_op_threads(); int connection_new_private(Connection *conn); @@ -152,6 +153,7 @@ extern Connection_Table *the_connection_table; /* JCM - Exported from globals.c Connection_Table *connection_table_new(int table_size); void connection_table_free(Connection_Table *ct); void connection_table_abandon_all_operations(Connection_Table *ct); +Connection *connection_table_get_connection_ext(Connection_Table *ct, int sd, unsigned int flags); Connection *connection_table_get_connection(Connection_Table *ct, int sd); void connection_table_move_connection_out_of_active_list(Connection_Table *ct, Connection *c); void connection_table_move_connection_on_to_active_list(Connection_Table *ct, Connection *c); @@ -180,6 +182,7 @@ int daemon_register_reslimits( void ); PRFileDesc * get_ssl_listener_fd(); int configure_pr_socket( PRFileDesc **pr_socket, int secure, int local ); void configure_ns_socket( int * ns ); +int handle_new_connection(Connection_Table *ct, Connection *listenc, Connection **conn, int gettingber); /* * sasl_io.c diff --git a/ldap/servers/slapd/libglobs.c b/ldap/servers/slapd/libglobs.c index dab5275..5b354e3 100644 --- a/ldap/servers/slapd/libglobs.c +++ b/ldap/servers/slapd/libglobs.c @@ -696,7 +696,11 @@ static struct config_get_and_set { {CONFIG_LISTEN_BACKLOG_SIZE, config_set_listen_backlog_size, NULL, 0, (void**)&global_slapdFrontendConfig.listen_backlog_size, CONFIG_INT, - (ConfigGetFunc)config_get_listen_backlog_size} + (ConfigGetFunc)config_get_listen_backlog_size}, + {CONFIG_THREADED_ACCEPT, config_set_threaded_accept, + NULL, 0, + (void**)&global_slapdFrontendConfig.threaded_accept, + CONFIG_INT, (ConfigGetFunc)config_get_threaded_accept} #ifdef MEMPOOL_EXPERIMENTAL ,{CONFIG_MEMPOOL_SWITCH_ATTRIBUTE, config_set_mempool_switch, NULL, 0, @@ -1098,6 +1102,7 @@ FrontendConfig_init () { cfg->disk_grace_period = 60; /* 1 hour */ cfg->disk_logging_critical = LDAP_OFF; cfg->sasl_max_bufsize = SLAPD_DEFAULT_SASL_MAXBUFSIZE; + cfg->threaded_accept = THREADED_ACCEPT_OFF; cfg->listen_backlog_size = DAEMON_LISTEN_SIZE; #ifdef MEMPOOL_EXPERIMENTAL @@ -6214,6 +6219,47 @@ config_get_listen_backlog_size() return retVal; } +int +config_set_threaded_accept( const char *attrname, char *value, + char *errorbuf, int apply ) +{ + int retVal = LDAP_SUCCESS; + slapdFrontendConfig_t *slapdFrontendConfig = getFrontendConfig(); + int newval; + + if ( config_value_is_null( attrname, value, errorbuf, 0 )) { + return LDAP_OPERATIONS_ERROR; + } + + if ( !apply ) { + return retVal; + } + + newval = atoi(value); + if ((newval < THREADED_ACCEPT_OFF) || (newval > THREADED_ACCEPT_READ)) { + PR_snprintf(errorbuf, SLAPI_DSE_RETURNTEXT_SIZE, "Value [%s] for attribute [%s] is invalid: " + "valid values are %d, %d, %d\n", value, attrname, THREADED_ACCEPT_OFF, + THREADED_ACCEPT_ON, THREADED_ACCEPT_READ); + return LDAP_OPERATIONS_ERROR; + } + CFG_LOCK_WRITE(slapdFrontendConfig); + slapdFrontendConfig->threaded_accept = newval; + CFG_UNLOCK_WRITE(slapdFrontendConfig); + return retVal; +} + +int +config_get_threaded_accept() +{ + slapdFrontendConfig_t *slapdFrontendConfig = getFrontendConfig(); + int retVal; + + CFG_LOCK_READ(slapdFrontendConfig); + retVal = slapdFrontendConfig->threaded_accept; + CFG_UNLOCK_READ(slapdFrontendConfig); + return retVal; +} + /* * This function is intended to be used from the dse code modify callback. It * is "optimized" for that case because it takes a berval** of values, which is diff --git a/ldap/servers/slapd/proto-slap.h b/ldap/servers/slapd/proto-slap.h index 08edeb0..516b340 100644 --- a/ldap/servers/slapd/proto-slap.h +++ b/ldap/servers/slapd/proto-slap.h @@ -549,6 +549,8 @@ int config_get_disk_grace_period(); int config_get_disk_logging_critical(); int config_get_sasl_maxbufsize(); int config_get_listen_backlog_size(void); +int config_get_threaded_accept(); +int config_set_threaded_accept( const char *attrname, char *value, char *errorbuf, int apply ); int is_abspath(const char *); char* rel2abspath( char * ); diff --git a/ldap/servers/slapd/slap.h b/ldap/servers/slapd/slap.h index f1dbbec..e8dfbe3 100644 --- a/ldap/servers/slapd/slap.h +++ b/ldap/servers/slapd/slap.h @@ -1473,6 +1473,11 @@ typedef struct conn { * processing a pagedresults search */ #define CONN_FLAG_PAGEDRESULTS_ABANDONED 512/* pagedresults abandoned */ + +#define CONN_FLAG_LISTENER 1024/* this is a listener connection */ + +#define CONN_IS_LISTENER(zzz) ((zzz) & CONN_FLAG_LISTENER) + #define CONN_GET_SORT_RESULT_CODE (-1) #define START_TLS_OID "1.3.6.1.4.1.1466.20037" @@ -1659,6 +1664,8 @@ typedef struct slapi_pblock { int pb_syntax_filter_normalized; /* the syntax filter types/values are already normalized */ void *pb_syntax_filter_data; /* extra data to pass to a syntax plugin function */ int pb_paged_results_index; /* stash SLAPI_PAGED_RESULTS_INDEX */ + /* for threaded accept - new connection parameters */ + struct connection_table *pb_ct; /* conn table to use for this new connection */ } slapi_pblock; /* index if substrlens */ @@ -2014,6 +2021,11 @@ typedef struct _slapdEntryPoints { #ifndef DAEMON_LISTEN_SIZE #define DAEMON_LISTEN_SIZE 128 #endif +#define CONFIG_THREADED_ACCEPT "nsslapd-threaded-accept" + +#define THREADED_ACCEPT_OFF 0 /* do the accept() in the polling thread */ +#define THREADED_ACCEPT_ON 1 /* do the accept() in a worker thread, pass the read() off to another worker thread */ +#define THREADED_ACCEPT_READ 2 /* do the accept() and read() in the same worker thread */ #ifdef MEMPOOL_EXPERIMENTAL #define CONFIG_MEMPOOL_SWITCH_ATTRIBUTE "nsslapd-mempool" @@ -2250,6 +2262,7 @@ typedef struct _slapdFrontendConfig { int disk_threshold; int disk_grace_period; int disk_logging_critical; + int threaded_accept; } slapdFrontendConfig_t; /* possible values for slapdFrontendConfig_t.schemareplace */ -- 1.7.1