From 4558a45ccf54ce3b7229858fbe7c2c5e72d9b925 Mon Sep 17 00:00:00 2001 From: Rich Megginson Date: Tue, 19 Feb 2013 19:32:56 -0700 Subject: [PATCH 1/4] Ticket #514 - investigate connection locking https://fedorahosted.org/389/ticket/514 Reviewed by: mreynolds (Thanks!) Branch: master Fix Description: There were two locks involved for every operation - a lock to protect the pblock queue and a lock and condition variable to protect the counter variable. This fix consolidates them into a single lock/cv for the pblock queue and gets rid of the separate lock for the counter. The queue structures have been cleaned up and renamed work_q and work_q_size. The worker threads wait for new work if not shutdown and the work_q is empty. In addition, the timeout interval for the wait for the work_q cv has been changed to "infinite" instead of 10 seconds. Platforms tested: RHEL6 x86_64 Flag Day: no Doc impact: Yes --- ldap/servers/slapd/connection.c | 328 ++++++++++++++++++--------------------- 1 files changed, 149 insertions(+), 179 deletions(-) diff --git a/ldap/servers/slapd/connection.c b/ldap/servers/slapd/connection.c index 31fc543..c68f56b 100644 --- a/ldap/servers/slapd/connection.c +++ b/ldap/servers/slapd/connection.c @@ -84,10 +84,11 @@ struct Slapi_PBlock_q static struct Slapi_PBlock_q *first_pb= NULL; /* global work queue head */ static struct Slapi_PBlock_q *last_pb= NULL; /* global work queue tail */ static PRLock *pb_q_lock=NULL; /* protects first_pb & last_pb */ - -static PRCondVar *op_thread_cv; /* used by operation threads to wait for work */ -static PRLock *op_thread_lock; /* associated with op_thread_cv */ -static int op_shutdown= 0; /* if non-zero, server is shutting down */ +static PRCondVar *pb_q_cv; /* used by operation threads to wait for work - when there is a op pblock in the queue waiting to be processed */ +static PRInt32 pb_q_size; /* size of pb_q */ +static PRInt32 pb_q_size_max; /* high water mark of pb_q_size */ +#define PB_Q_EMPTY (pb_q_size == 0) +static PRInt32 op_shutdown= 0; /* if non-zero, server is shutting down */ #define LDAP_SOCKET_IO_BUFFER_SIZE 512 /* Size of the buffer we give to the I/O system for reads */ @@ -403,38 +404,30 @@ init_op_threads() int max_threads = config_get_threadnumber(); /* Initialize the locks and cv */ - if ((pb_q_lock = PR_NewLock()) == NULL ) { - errorCode = PR_GetError(); - LDAPDebug( LDAP_DEBUG_ANY, - "init_op_threads: PR_NewLock failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", - errorCode, slapd_pr_strerror(errorCode), 0 ); - exit(-1); - } - - if ((op_thread_lock = PR_NewLock()) == NULL ) { - errorCode = PR_GetError(); - LDAPDebug( LDAP_DEBUG_ANY, - "init_op_threads: PR_NewLock failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", - errorCode, slapd_pr_strerror(errorCode), 0 ); - exit(-1); - } - - if ((op_thread_cv = PR_NewCondVar( op_thread_lock )) == NULL) { - errorCode = PR_GetError(); - LDAPDebug( LDAP_DEBUG_ANY, "init_op_threads: PR_NewCondVar failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", - errorCode, slapd_pr_strerror(errorCode), 0 ); - exit(-1); - } + if ((pb_q_lock = PR_NewLock()) == NULL ) { + errorCode = PR_GetError(); + LDAPDebug( LDAP_DEBUG_ANY, + "init_op_threads: PR_NewLock failed for pb_q_lock, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", + errorCode, slapd_pr_strerror(errorCode), 0 ); + exit(-1); + } + + if ((pb_q_cv = PR_NewCondVar( pb_q_lock )) == NULL) { + errorCode = PR_GetError(); + LDAPDebug( LDAP_DEBUG_ANY, "init_op_threads: PR_NewCondVar failed for pb_q_cv, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", + errorCode, slapd_pr_strerror(errorCode), 0 ); + exit(-1); + } /* start the operation threads */ for (i=0; i < max_threads; i++) { PR_SetConcurrency(4); if (PR_CreateThread (PR_USER_THREAD, - (VFP) (void *) connection_threadmain, NULL, - PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, - PR_UNJOINABLE_THREAD, - SLAPD_DEFAULT_THREAD_STACKSIZE - ) == NULL ) { + (VFP) (void *) connection_threadmain, NULL, + PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, + PR_UNJOINABLE_THREAD, + SLAPD_DEFAULT_THREAD_STACKSIZE + ) == NULL ) { int prerr = PR_GetError(); LDAPDebug( LDAP_DEBUG_ANY, "PR_CreateThread failed, " SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", prerr, slapd_pr_strerror( prerr ), 0 ); @@ -1535,8 +1528,6 @@ static int finished_chomping(Connection *conn) * IO Completion Ports are not available on this platform. */ -static int counter= 0; /* JCM Dumb Name */ - /* The connection private structure for UNIX turbo mode */ struct Conn_private { @@ -1700,37 +1691,26 @@ connection_free_private_buffer(Connection *conn) #define CONN_TURBO_PERCENTILE 50 /* proportion of threads allowed to be in turbo mode */ #define CONN_TURBO_HYSTERESIS 0 /* avoid flip flopping in and out of turbo mode */ -int connection_wait_for_new_pb(Slapi_PBlock **ppb, PRIntervalTime interval) +int connection_wait_for_new_pb(Slapi_PBlock **ppb, PRIntervalTime interval) { int ret = CONN_FOUND_WORK_TO_DO; - - PR_Lock( op_thread_lock ); - - /* While there is no operation to do... */ - while( counter < 1) { - /* Check if we should shutdown. */ - if (op_shutdown) { - PR_Unlock( op_thread_lock ); - return CONN_SHUTDOWN; - } - PR_WaitCondVar( op_thread_cv, interval); - } - /* There is some work to do. */ - - counter--; - PR_Unlock( op_thread_lock ); + PR_Lock( pb_q_lock ); - /* Get the next operation from the work queue. */ + while( !op_shutdown && PB_Q_EMPTY ) { + PR_WaitCondVar( pb_q_cv, interval ); + } - *ppb = get_pb(); - if (*ppb == NULL) { - LDAPDebug( LDAP_DEBUG_ANY, "pb is null \n", 0, 0, 0 ); - PR_Lock( op_thread_lock ); - counter++; - PR_Unlock( op_thread_lock ); + if ( op_shutdown ) { + LDAPDebug0Args( LDAP_DEBUG_ANY, "connection_wait_for_new_pb: shutdown\n" ); + ret = CONN_SHUTDOWN; + } else if ( NULL == ( *ppb = get_pb() ) ) { + /* not sure how this can happen */ + LDAPDebug0Args( LDAP_DEBUG_ANY, "connection_wait_for_new_pb: pb is null\n" ); ret = CONN_NOWORK; } + + PR_Unlock( pb_q_lock ); return ret; } @@ -2166,12 +2146,12 @@ static void connection_threadmain() { Slapi_PBlock *pb = NULL; - PRIntervalTime interval = PR_SecondsToInterval(10); + /* wait forever for new pb until one is available or shutdown */ + PRIntervalTime interval = PR_INTERVAL_NO_TIMEOUT; /* PR_SecondsToInterval(10); */ Connection *conn = NULL; Operation *op; ber_tag_t tag = 0; int need_wakeup = 0; - int need_conn_release = 0; int thread_turbo_flag = 0; int ret = 0; int more_data = 0; @@ -2202,6 +2182,7 @@ connection_threadmain() ret = connection_wait_for_new_pb(&pb,interval); switch (ret) { case CONN_NOWORK: + PR_ASSERT(interval != PR_INTERVAL_NO_TIMEOUT); /* this should never happen with PR_INTERVAL_NO_TIMEOUT */ continue; case CONN_SHUTDOWN: LDAPDebug( LDAP_DEBUG_TRACE, @@ -2264,9 +2245,9 @@ connection_threadmain() } /* turn off turbo mode immediately if any pb waiting in global queue */ - if (thread_turbo_flag && (counter > 0)) { + if (thread_turbo_flag && !PB_Q_EMPTY) { thread_turbo_flag = 0; - LDAPDebug(LDAP_DEBUG_CONNS,"conn %" NSPRIu64 " leaving turbo mode\n",conn->c_connid,0,0); + LDAPDebug2Args(LDAP_DEBUG_CONNS,"conn %" NSPRIu64 " leaving turbo mode - pb_q is not empty %d\n",conn->c_connid,pb_q_size); } #endif @@ -2313,13 +2294,16 @@ connection_threadmain() */ replication_connection = conn->c_isreplication_session; if ((tag != LDAP_REQ_UNBIND) && !thread_turbo_flag && !more_data && !replication_connection) { - connection_make_readable(conn); + connection_make_readable_nolock(conn); + /* once the connection is readable, another thread may access conn, + * so need locking from here on */ + signal_listner(); } /* are we in referral-only mode? */ if (config_check_referral_mode() && tag != LDAP_REQ_UNBIND) { - referral_mode_reply(pb); - goto done; + referral_mode_reply(pb); + goto done; } /* check if new password is required */ @@ -2347,6 +2331,21 @@ connection_threadmain() connection_dispatch_operation(conn, op, pb); done: + if (doshutdown) { + PR_Lock(conn->c_mutex); + connection_remove_operation( conn, op ); + /* destroying the pblock will cause destruction of the operation + * so this must happend before releasing the connection + */ + slapi_pblock_destroy( pb ); + pb = NULL; + connection_make_readable_nolock(conn); + conn->c_threadnumber--; + connection_release_nolock(conn); + PR_Unlock(conn->c_mutex); + signal_listner(); + return; + } /* * done with this operation. delete it from the op * queue for this connection, delete the number of @@ -2359,75 +2358,51 @@ done: /* total number of ops for the server */ slapi_counter_increment(ops_completed); /* If this op isn't a persistent search, remove it */ - need_conn_release = 0; - if ( !( pb->pb_op->o_flags & OP_FLAG_PS )) { - /* delete from connection operation queue & decr refcnt */ - PR_Lock( conn->c_mutex ); - connection_remove_operation( conn, op ); - /* destroying the pblock will cause destruction of the operation - * so this must happend before releasing the connection - */ - slapi_pblock_destroy( pb ); - - /* If we're in turbo mode, we keep our reference to the connection - alive */ - if (!thread_turbo_flag && !more_data) { - /* - * Don't release the connection now. - * But note down what to do. - */ - need_conn_release = 1; - } - PR_Unlock( conn->c_mutex ); - } else { /* the ps code acquires a ref to the conn - we need to release ours here */ - PR_Lock( conn->c_mutex ); - connection_release_nolock (conn); - PR_Unlock( conn->c_mutex ); - } - pb = NULL; - if (doshutdown) { - PR_Lock(conn->c_mutex); - connection_make_readable_nolock(conn); - conn->c_threadnumber--; - connection_release_nolock(conn); - PR_Unlock(conn->c_mutex); - signal_listner(); - return; - } - - if (!more_data) { /* no more data in the buffer */ - if (!thread_turbo_flag) { /* Don't do this in turbo mode */ - /* Since we didn't do so earlier, we need to make a - * replication connection readable again here */ - PR_Lock( conn->c_mutex ); - if (replication_connection || (1 == is_timedout)) { - connection_make_readable_nolock(conn); - need_wakeup = 1; - } - /* if the threadnumber of now below the maximum, wakeup - * the listener thread so that we start polling on this - * connection again - */ - if (!need_wakeup) { - if (conn->c_threadnumber == config_get_maxthreadsperconn()) + if ( pb->pb_op->o_flags & OP_FLAG_PS ) { + PR_Lock( conn->c_mutex ); + connection_release_nolock (conn); /* psearch acquires ref to conn - release this one now */ + PR_Unlock( conn->c_mutex ); + } else { + /* delete from connection operation queue & decr refcnt */ + PR_Lock( conn->c_mutex ); + connection_remove_operation( conn, op ); + /* destroying the pblock will cause destruction of the operation + * so this must happend before releasing the connection + */ + slapi_pblock_destroy( pb ); + + /* If we're in turbo mode, we keep our reference to the connection alive */ + if (!more_data) { + if (!thread_turbo_flag) { + /* + * Don't release the connection now. + * But note down what to do. + */ + if (replication_connection || (1 == is_timedout)) { + connection_make_readable_nolock(conn); need_wakeup = 1; - else - need_wakeup = 0; - } - conn->c_threadnumber--; - if (need_conn_release) { + } + if (!need_wakeup) { + if (conn->c_threadnumber == config_get_maxthreadsperconn()) + need_wakeup = 1; + else + need_wakeup = 0; + } + conn->c_threadnumber--; connection_release_nolock(conn); - } - PR_Unlock( conn->c_mutex ); - /* Call signal_listner after releasing the - * connection if required. */ - if (need_wakeup) { + /* Call signal_listner after releasing the + * connection if required. */ + if (need_wakeup) { + signal_listner(); + } + } else if (1 == is_timedout) { + connection_make_readable_nolock(conn); signal_listner(); } - } else if (1 == is_timedout) { - connection_make_readable(conn); } + PR_Unlock( conn->c_mutex ); } + pb = NULL; } /* while (1) */ } @@ -2437,33 +2412,27 @@ connection_activity(Connection *conn) { Slapi_PBlock *pb; - connection_make_new_pb(&pb, conn); - - /* Add pb to the end of the work queue. */ - add_pb( pb ); - - /* Check if exceed the max thread per connection. If so, increment - c_pbwait. Otherwise increment the counter and notify the cond. var. - there is work to do. */ - if (connection_acquire_nolock (conn) == -1) { - LDAPDebug(LDAP_DEBUG_CONNS, - "could not acquire lock in connection_activity as conn %" NSPRIu64 " closing fd=%d\n", - conn->c_connid,conn->c_sd,0); - /* XXX how to handle this error? */ - /* MAB: 25 Jan 01: let's return on error and pray this won't leak */ - return (-1); + LDAPDebug(LDAP_DEBUG_CONNS, + "could not acquire lock in connection_activity as conn %" NSPRIu64 " closing fd=%d\n", + conn->c_connid,conn->c_sd,0); + /* XXX how to handle this error? */ + /* MAB: 25 Jan 01: let's return on error and pray this won't leak */ + return (-1); } + + connection_make_new_pb(&pb, conn); + + /* set these here so setup_pr_read_pds will not add this conn back to the poll array */ conn->c_gettingber = 1; conn->c_threadnumber++; - PR_Lock( op_thread_lock ); - counter++; - PR_NotifyCondVar( op_thread_cv ); - PR_Unlock( op_thread_lock ); - + /* Add pb to the end of the work queue. */ + /* have to do this last - add_pb will signal waiters in connection_wait_for_new_pb */ + add_pb( pb ); + if (! config_check_referral_mode()) { - slapi_counter_increment(ops_initiated); - slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsInOps); + slapi_counter_increment(ops_initiated); + slapi_counter_increment(g_get_global_snmp_vars()->ops_tbl.dsInOps); } return 0; } @@ -2474,7 +2443,6 @@ connection_activity(Connection *conn) static void add_pb( Slapi_PBlock *pb) { - struct Slapi_PBlock_q *new_pb=NULL; LDAPDebug( LDAP_DEBUG_TRACE, "add_pb \n", 0, 0, 0 ); @@ -2492,25 +2460,27 @@ add_pb( Slapi_PBlock *pb) last_pb->next_pb = new_pb; last_pb = new_pb; } + PR_AtomicIncrement( &pb_q_size ); /* increment q size */ + if ( pb_q_size > pb_q_size_max ) { + pb_q_size_max = pb_q_size; + } + PR_NotifyCondVar( pb_q_cv ); /* notify waiters in connection_wait_for_new_pb */ PR_Unlock( pb_q_lock ); } -/* get_pb(): will get a pb from the begining of the work queue, return NULL if - the queue is empty.*/ +/* get_pb(): will get a pb from the beginning of the work queue, return NULL if + the queue is empty. This should only be called from connection_wait_for_new_pb + with the pb_q_lock held */ static Slapi_PBlock * get_pb() { - struct Slapi_PBlock_q *tmp = NULL; Slapi_PBlock *pb; - LDAPDebug( LDAP_DEBUG_TRACE, "get_pb \n", 0, 0, 0 ); - PR_Lock( pb_q_lock ); + LDAPDebug0Args( LDAP_DEBUG_TRACE, "get_pb \n" ); if (first_pb == NULL) { - PR_Unlock( pb_q_lock ); - LDAPDebug( LDAP_DEBUG_ANY, "get_pb: the work queue is empty.\n", - 0, 0, 0 ); + LDAPDebug0Args( LDAP_DEBUG_TRACE, "get_pb: the work queue is empty.\n" ); return NULL; } @@ -2519,11 +2489,11 @@ get_pb() last_pb = NULL; } first_pb = tmp->next_pb; - PR_Unlock( pb_q_lock ); pb = tmp->pb; /* Free the memory used by the pb found. */ slapi_ch_free ((void **)&tmp); + PR_AtomicDecrement( &pb_q_size ); /* decrement q size */ return (pb); } @@ -2540,28 +2510,28 @@ void op_thread_cleanup() { #ifdef _WIN32 - int i; - PRIntervalTime interval; - int max_threads = config_get_threadnumber(); - interval = PR_SecondsToInterval(3); + int i; + PRIntervalTime interval; + int max_threads = config_get_threadnumber(); + interval = PR_SecondsToInterval(3); #endif - LDAPDebug( LDAP_DEBUG_ANY, - "slapd shutting down - signaling operation threads\n", 0, 0, 0); - - PR_Lock( op_thread_lock ); - op_shutdown = 1; - PR_NotifyAllCondVar ( op_thread_cv ); - PR_Unlock( op_thread_lock ); + LDAPDebug( LDAP_DEBUG_ANY, + "slapd shutting down - signaling operation threads\n", 0, 0, 0); + + PR_AtomicIncrement(&op_shutdown); + PR_Lock( pb_q_lock ); + PR_NotifyAllCondVar ( pb_q_cv ); /* tell any thread waiting in connection_wait_for_new_pb to shutdown */ + PR_Unlock( pb_q_lock ); #ifdef _WIN32 - LDAPDebug( LDAP_DEBUG_ANY, - "slapd shutting down - waiting for %d threads to terminate\n", - g_get_active_threadcnt(), 0, 0 ); - /* kill off each worker waiting on GetQueuedCompletionStatus */ - for ( i = 0; i < max_threads; ++ i ) - { - PostQueuedCompletionStatus( completion_port, 0, COMPKEY_DIE ,0); - } - /* don't sleep: there's no reason to do so here DS_Sleep(interval); */ /* sleep 3 seconds */ + LDAPDebug( LDAP_DEBUG_ANY, + "slapd shutting down - waiting for %d threads to terminate\n", + g_get_active_threadcnt(), 0, 0 ); + /* kill off each worker waiting on GetQueuedCompletionStatus */ + for ( i = 0; i < max_threads; ++ i ) + { + PostQueuedCompletionStatus( completion_port, 0, COMPKEY_DIE ,0); + } + /* don't sleep: there's no reason to do so here DS_Sleep(interval); */ /* sleep 3 seconds */ #endif } -- 1.7.1