class Bones::RPC::Cluster

The cluster represents a cluster of MongoDB server nodes, either a single node, a replica set, or a mongos server.

@since 0.0.1

Constants

DOWN_INTERVAL

The default interval that a node would be flagged as “down”.

@since 0.0.1

REFRESH_INTERVAL

The default interval that a node should be refreshed in.

@since 0.0.1

RETRY_INTERVAL

The default time to wait to retry an operation.

@since 0.0.1

Attributes

peers[R]

@!attribute options

@return [ Hash ] The refresh options.

@!attribute peers

@return [ Array<Node> ] The node peers.

@!attribute seeds

@return [ Array<Node> ] The seed nodes.
seeds[R]

@!attribute options

@return [ Hash ] The refresh options.

@!attribute peers

@return [ Array<Node> ] The node peers.

@!attribute seeds

@return [ Array<Node> ] The seed nodes.
session[R]

@!attribute options

@return [ Hash ] The refresh options.

@!attribute peers

@return [ Array<Node> ] The node peers.

@!attribute seeds

@return [ Array<Node> ] The seed nodes.

Public Class Methods

new(session, hosts) click to toggle source

Initialize the new cluster.

@example Initialize the cluster.

Cluster.new([ "localhost:27017" ], down_interval: 20)

@param [ Hash ] options The cluster options.

@option options :down_interval number of seconds to wait before attempting

to reconnect to a down node. (30)

@option options :refresh_interval number of seconds to cache information

about a node. (300)

@option options [ Integer ] :timeout The time in seconds to wait for an

operation to timeout. (5)

@since 0.0.1

# File lib/bones/rpc/cluster.rb, line 79
def initialize(session, hosts)
  @session = session
  @seeds = hosts.map { |host| session.backend.node_class.new(self, Address.new(host)) }
  @peers = []
end

Public Instance Methods

disconnect() click to toggle source

Disconnects all nodes in the cluster. This should only be used in cases where you know you're not going to use the cluster on the thread anymore and need to force the connections to close.

@return [ true ] True if the disconnect succeeded.

@since 0.0.1

# File lib/bones/rpc/cluster.rb, line 43
def disconnect
  nodes.each { |node| node.disconnect } and true
end
down_interval() click to toggle source

Get the interval at which a node should be flagged as down before retrying.

@example Get the down interval, in seconds.

cluster.down_interval

@return [ Integer ] The down interval.

@since 0.0.1

# File lib/bones/rpc/cluster.rb, line 56
def down_interval
  @down_interval ||= options[:down_interval] || DOWN_INTERVAL
end
handle_refresh(node) click to toggle source
# File lib/bones/rpc/cluster.rb, line 60
def handle_refresh(node)
  session.handle_refresh(node)
end
inspect() click to toggle source

Provide a pretty string for cluster inspection.

@example Inspect the cluster.

cluster.inspect

@return [ String ] A nicely formatted string.

@since 0.0.1

# File lib/bones/rpc/cluster.rb, line 93
def inspect
  "#<#{self.class.name}:#{object_id} @seeds=#{seeds.inspect}>"
end
max_retries() click to toggle source

Get the number of times an operation should be retried before raising an error.

@example Get the maximum retries.

cluster.max_retries

@return [ Integer ] The max retries.

@since 0.0.1

# File lib/bones/rpc/cluster.rb, line 106
def max_retries
  @max_retries ||= options[:max_retries] || seeds.size
end
nodes() click to toggle source

Returns the list of available nodes, refreshing 1) any nodes which were down and ready to be checked again and 2) any nodes whose information is out of date. Arbiter nodes are not returned.

@example Get the available nodes.

cluster.nodes

@return [ Array<Node> ] the list of available nodes.

@since 0.0.1

# File lib/bones/rpc/cluster.rb, line 120
def nodes
  # Find the nodes that were down but are ready to be refreshed, or those
  # with stale connection information.
  needs_refresh, available = seeds.partition do |node|
    refreshable?(node)
  end

  # Refresh those nodes.
  available.concat(refresh(needs_refresh))

  # Now return all the nodes that are available and participating in the
  # replica set.
  available.reject{ |node| node.down? }
end
options() click to toggle source
# File lib/bones/rpc/cluster.rb, line 135
def options
  session.options
end
pool_size() click to toggle source
# File lib/bones/rpc/cluster.rb, line 139
def pool_size
  options[:pool_size] || 5
end
refresh(nodes_to_refresh = seeds) click to toggle source

Refreshes information for each of the nodes provided. The node list defaults to the list of all known nodes.

If a node is successfully refreshed, any newly discovered peers will also be refreshed.

@example Refresh the nodes.

cluster.refresh

@param [ Array<Node> ] nodes_to_refresh The nodes to refresh.

@return [ Array<Node> ] the available nodes

@since 0.0.1

# File lib/bones/rpc/cluster.rb, line 157
def refresh(nodes_to_refresh = seeds)
  refreshed_nodes = []
  seen = {}
  # Set up a recursive lambda function for refreshing a node and it's peers.
  refresh_node = ->(node) do
    unless seen[node]
      seen[node] = true
      # Add the node to the global list of known nodes.
      seeds.push(node) unless seeds.include?(node)
      begin
        node.refresh
        # This node is good, so add it to the list of nodes to return.
        refreshed_nodes.push(node) unless refreshed_nodes.include?(node)
      rescue Errors::ConnectionFailure
        # We couldn't connect to the node.
      end
    end
  end

  nodes_to_refresh.each(&refresh_node)
  refreshed_nodes
end
refresh_interval() click to toggle source

Get the interval in which the node list should be refreshed.

@example Get the refresh interval, in seconds.

cluster.refresh_interval

@return [ Integer ] The refresh interval.

@since 0.0.1

# File lib/bones/rpc/cluster.rb, line 188
def refresh_interval
  @refresh_interval ||= options[:refresh_interval] || REFRESH_INTERVAL
end
retry_interval() click to toggle source

Get the operation retry interval - the time to wait before retrying a single operation.

@example Get the retry interval, in seconds.

cluster.retry_interval

@return [ Integer ] The retry interval.

@since 0.0.1

# File lib/bones/rpc/cluster.rb, line 201
def retry_interval
  @retry_interval ||= options[:retry_interval] || RETRY_INTERVAL
end

Private Instance Methods

down_boundary() click to toggle source

Get the boundary where a node that is down would need to be refreshed.

@api private

@example Get the down boundary.

cluster.down_boundary

@return [ Time ] The down boundary.

@since 0.0.1

# File lib/bones/rpc/cluster.rb, line 217
def down_boundary
  Time.new - down_interval
end
initialize_copy(_) click to toggle source

Creating a cloned cluster requires cloning all the seed nodes.

@api prviate

@example Clone the cluster.

cluster.clone

@return [ Cluster ] The cloned cluster.

@since 0.0.1

# File lib/bones/rpc/cluster.rb, line 261
def initialize_copy(_)
  @seeds = seeds.map(&:dup)
end
refresh_boundary() click to toggle source

Get the standard refresh boundary to discover new nodes.

@api private

@example Get the refresh boundary.

cluster.refresh_boundary

@return [ Time ] The refresh boundary.

@since 0.0.1

# File lib/bones/rpc/cluster.rb, line 231
def refresh_boundary
  Time.new - refresh_interval
end
refreshable?(node) click to toggle source

Is the provided node refreshable? This is in the case where the refresh boundary has passed, or the node has been down longer than the down boundary.

@api private

@example Is the node refreshable?

cluster.refreshable?(node)

@param [ Node ] node The Node to check.

@since 0.0.1

# File lib/bones/rpc/cluster.rb, line 247
def refreshable?(node)
  node.down? ? node.down_at < down_boundary : node.needs_refresh?(refresh_boundary)
end