class Mongo::Cluster

Represents a group of servers on the server side, either as a single server, a replica set, or a single or multiple mongos.

@since 2.0.0

Constants

IDLE_WRITE_PERIOD_SECONDS

How often an idle primary writes a no-op to the oplog.

@since 2.4.0

MAX_READ_RETRIES

The default number of mongos read retries.

@since 2.1.1

MAX_WRITE_RETRIES

The default number of mongos write retries.

@since 2.4.2

READ_RETRY_INTERVAL

The default mongos read retry interval, in seconds.

@since 2.1.1

Attributes

app_metadata[R]

@return [ Mongo::Cluster::AppMetadata ] The application metadata, used for connection

handshakes.

@since 2.4.0

monitoring[R]

@return [ Monitoring ] monitoring The monitoring.

options[R]

@return [ Hash ] The options hash.

topology[R]

@return [ Object ] The cluster topology.

Public Class Methods

create(client) click to toggle source

Create a cluster for the provided client, for use when we don't want the client's original cluster instance to be the same.

@api private

@example Create a cluster for the client.

Cluster.create(client)

@param [ Client ] client The client to create on.

@return [ Cluster ] The cluster.

@since 2.0.0

# File lib/mongo/cluster.rb, line 424
def self.create(client)
  cluster = Cluster.new(
    client.cluster.addresses.map(&:to_s),
    client.instance_variable_get(:@monitoring).dup,
    client.options
  )
  client.instance_variable_set(:@cluster, cluster)
end
finalize(pools, cursor_reaper) click to toggle source

Finalize the cluster for garbage collection. Disconnects all the scoped connection pools.

@example Finalize the cluster.

Cluster.finalize(pools)

@param [ Hash<Address, Server::ConnectionPool> ] pools The connection

pools.

@return [ Proc ] The Finalizer.

@since 2.2.0

# File lib/mongo/cluster.rb, line 197
def self.finalize(pools, cursor_reaper)
  proc do
    begin; cursor_reaper.kill_cursors; rescue; end
    cursor_reaper.stop!
    pools.values.each do |pool|
      pool.disconnect!
    end
  end
end
new(seeds, monitoring, options = Options::Redacted.new) click to toggle source

Instantiate the new cluster.

@api private

@example Instantiate the cluster.

Mongo::Cluster.new(["127.0.0.1:27017"], monitoring)

@note Cluster should never be directly instantiated outside of a Client.

@param [ Array<String> ] seeds The addresses of the configured servers. @param [ Monitoring ] monitoring The monitoring. @param [ Hash ] options The options.

@since 2.0.0

# File lib/mongo/cluster.rb, line 152
def initialize(seeds, monitoring, options = Options::Redacted.new)
  @addresses = []
  @servers = []
  @monitoring = monitoring
  @event_listeners = Event::Listeners.new
  @options = options.freeze
  @app_metadata ||= AppMetadata.new(self)
  @update_lock = Mutex.new
  @pool_lock = Mutex.new
  @topology = Topology.initial(seeds, monitoring, options)

  publish_sdam_event(
    Monitoring::TOPOLOGY_OPENING,
    Monitoring::Event::TopologyOpening.new(@topology)
  )

  subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self))
  subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self))
  subscribe_to(Event::MEMBER_DISCOVERED, Event::MemberDiscovered.new(self))

  seeds.each{ |seed| add(seed) }

  publish_sdam_event(
    Monitoring::TOPOLOGY_CHANGED,
    Monitoring::Event::TopologyChanged.new(@topology, @topology)
  ) if @servers.size > 1

  @cursor_reaper = CursorReaper.new
  @cursor_reaper.run!

  ObjectSpace.define_finalizer(self, self.class.finalize(pools, @cursor_reaper))
end

Public Instance Methods

==(other) click to toggle source

Determine if this cluster of servers is equal to another object. Checks the servers currently in the cluster, not what was configured.

@example Is the cluster equal to the object?

cluster == other

@param [ Object ] other The object to compare to.

@return [ true, false ] If the objects are equal.

@since 2.0.0

# File lib/mongo/cluster.rb, line 81
def ==(other)
  return false unless other.is_a?(Cluster)
  addresses == other.addresses && options == other.options
end
add(host) click to toggle source

Add a server to the cluster with the provided address. Useful in auto-discovery of new servers when an existing server executes an ismaster and potentially non-configured servers were included.

@example Add the server for the address to the cluster.

cluster.add('127.0.0.1:27018')

@param [ String ] host The address of the server to add.

@return [ Server ] The newly added server, if not present already.

@since 2.0.0

# File lib/mongo/cluster.rb, line 98
def add(host)
  address = Address.new(host, options)
  if !addresses.include?(address)
    if addition_allowed?(address)
      @update_lock.synchronize { @addresses.push(address) }
      server = Server.new(address, self, @monitoring, event_listeners, options)
      @update_lock.synchronize { @servers.push(server) }
      server
    end
  end
end
add_hosts(description) click to toggle source

Add hosts in a description to the cluster.

@example Add hosts in a description to the cluster.

cluster.add_hosts(description)

@param [ Mongo::Server::Description ] description The description.

@since 2.0.6

# File lib/mongo/cluster.rb, line 389
def add_hosts(description)
  if topology.add_hosts?(description, servers_list)
    description.servers.each { |s| add(s) }
  end
end
addresses() click to toggle source

The addresses in the cluster.

@example Get the addresses in the cluster.

cluster.addresses

@return [ Array<Mongo::Address> ] The addresses.

@since 2.0.6

# File lib/mongo/cluster.rb, line 441
def addresses
  addresses_list
end
disconnect!() click to toggle source

Disconnect all servers.

@example Disconnect the cluster's servers.

cluster.disconnect!

@return [ true ] Always true.

@since 2.1.0

# File lib/mongo/cluster.rb, line 361
def disconnect!
  begin; @cursor_reaper.kill_cursors; rescue; end
  @cursor_reaper.stop!
  @servers.each { |server| server.disconnect! } and true
end
elect_primary!(description) click to toggle source

Elect a primary server from the description that has just changed to a primary.

@example Elect a primary server.

cluster.elect_primary!(description)

@param [ Server::Description ] description The newly elected primary.

@return [ Topology ] The cluster topology.

@since 2.0.0

# File lib/mongo/cluster.rb, line 245
def elect_primary!(description)
  @topology = topology.elect_primary(description, servers_list)
end
has_readable_server?(server_selector = nil) click to toggle source

Determine if the cluster would select a readable server for the provided read preference.

@example Is a readable server present?

topology.has_readable_server?(server_selector)

@param [ ServerSelector ] server_selector The server

selector.

@return [ true, false ] If a readable server is present.

@since 2.4.0

# File lib/mongo/cluster.rb, line 122
def has_readable_server?(server_selector = nil)
  topology.has_readable_server?(self, server_selector)
end
has_writable_server?() click to toggle source

Determine if the cluster would select a writable server.

@example Is a writable server present?

topology.has_writable_server?

@return [ true, false ] If a writable server is present.

@since 2.4.0

# File lib/mongo/cluster.rb, line 134
def has_writable_server?
  topology.has_writable_server?(self)
end
inspect() click to toggle source

Get the nicer formatted string for use in inspection.

@example Inspect the cluster.

cluster.inspect

@return [ String ] The cluster inspection.

@since 2.0.0

# File lib/mongo/cluster.rb, line 215
def inspect
  "#<Mongo::Cluster:0x#{object_id} servers=#{servers} topology=#{topology.display_name}>"
end
max_read_retries() click to toggle source

Get the maximum number of times the cluster can retry a read operation on a mongos.

@example Get the max read retries.

cluster.max_read_retries

@return [ Integer ] The maximum retries.

@since 2.1.1

# File lib/mongo/cluster.rb, line 258
def max_read_retries
  options[:max_read_retries] || MAX_READ_RETRIES
end
next_primary(ping = true) click to toggle source

Get the next primary server we can send an operation to.

@example Get the next primary server.

cluster.next_primary

@param [ true, false ] ping Whether to ping the server before selection.

@return [ Mongo::Server ] A primary server.

@since 2.0.0

# File lib/mongo/cluster.rb, line 229
def next_primary(ping = true)
  @primary_selector ||= ServerSelector.get(ServerSelector::PRIMARY)
  @primary_selector.select_server(self, ping)
end
pool(server) click to toggle source

Get the scoped connection pool for the server.

@example Get the connection pool.

cluster.pool(server)

@param [ Server ] server The server.

@return [ Server::ConnectionPool ] The connection pool.

@since 2.2.0

# File lib/mongo/cluster.rb, line 272
def pool(server)
  @pool_lock.synchronize do
    pools[server.address] ||= Server::ConnectionPool.get(server)
  end
end
read_retry_interval() click to toggle source

Get the interval, in seconds, in which a mongos read operation is retried.

@example Get the read retry interval.

cluster.read_retry_interval

@return [ Float ] The interval.

@since 2.1.1

# File lib/mongo/cluster.rb, line 287
def read_retry_interval
  options[:read_retry_interval] || READ_RETRY_INTERVAL
end
reconnect!() click to toggle source

Reconnect all servers.

@example Reconnect the cluster's servers.

cluster.reconnect!

@return [ true ] Always true.

@since 2.1.0

# File lib/mongo/cluster.rb, line 375
def reconnect!
  scan!
  servers.each { |server| server.reconnect! }
  @cursor_reaper.restart! and true
end
remove(host) click to toggle source

Remove the server from the cluster for the provided address, if it exists.

@example Remove the server from the cluster.

server.remove('127.0.0.1:27017')

@param [ String ] host The host/port or socket address.

@since 2.0.0

# File lib/mongo/cluster.rb, line 313
def remove(host)
  address = Address.new(host)
  removed_servers = @servers.select { |s| s.address == address }
  @update_lock.synchronize { @servers = @servers - removed_servers }
  removed_servers.each{ |server| server.disconnect! } if removed_servers
  publish_sdam_event(
    Monitoring::SERVER_CLOSED,
    Monitoring::Event::ServerClosed.new(address, topology)
  )
  @update_lock.synchronize { @addresses.reject! { |addr| addr == address } }
end
remove_hosts(description) click to toggle source

Remove hosts in a description from the cluster.

@example Remove hosts in a description from the cluster.

cluster.remove_hosts(description)

@param [ Mongo::Server::Description ] description The description.

@since 2.0.6

# File lib/mongo/cluster.rb, line 403
def remove_hosts(description)
  if topology.remove_hosts?(description)
    servers_list.each do |s|
      remove(s.address.to_s) if topology.remove_server?(description, s)
    end
  end
end
scan!() click to toggle source

Force a scan of all known servers in the cluster.

@example Force a full cluster scan.

cluster.scan!

@note This operation is done synchronously. If servers in the cluster are

down or slow to respond this can potentially be a slow operation.

@return [ true ] Always true.

@since 2.0.0

# File lib/mongo/cluster.rb, line 336
def scan!
  servers_list.each{ |server| server.scan! } and true
end
servers() click to toggle source

Get a list of server candidates from the cluster that can have operations executed on them.

@example Get the server candidates for an operation.

cluster.servers

@return [ Array<Server> ] The candidate servers.

@since 2.0.0

# File lib/mongo/cluster.rb, line 349
def servers
  topology.servers(servers_list.compact).compact
end
standalone_discovered() click to toggle source

Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.

@example Notify the cluster that a standalone server was discovered.

cluster.standalone_discovered

@return [ Topology ] The cluster topology.

@since 2.0.6

# File lib/mongo/cluster.rb, line 300
def standalone_discovered
  @topology = topology.standalone_discovered
end

Private Instance Methods

addition_allowed?(address) click to toggle source
# File lib/mongo/cluster.rb, line 451
def addition_allowed?(address)
  !@topology.single? || direct_connection?(address)
end
addresses_list() click to toggle source
# File lib/mongo/cluster.rb, line 467
def addresses_list
  @update_lock.synchronize do
    @addresses.reduce([]) do |addresses, address|
      addresses << address
    end
  end
end
direct_connection?(address) click to toggle source
# File lib/mongo/cluster.rb, line 447
def direct_connection?(address)
  address.seed == @topology.seed
end
pools() click to toggle source
# File lib/mongo/cluster.rb, line 455
def pools
  @pools ||= {}
end
servers_list() click to toggle source
# File lib/mongo/cluster.rb, line 459
def servers_list
  @update_lock.synchronize do
    @servers.reduce([]) do |servers, server|
      servers << server
    end
  end
end