class Bones::RPC::Node

Represents a client to a node in a server cluster.

@since 0.0.1

Attributes

address[R]

@!attribute address

@return [ Address ] The address.

@!attribute down_at

@return [ Time ] The time the node was marked as down.

@!attribute latency

@return [ Integer ] The latency in milliseconds.

@!attribute options

@return [ Hash ] The node options.

@!attribute refreshed_at

@return [ Time ] The last time the node did a refresh.
cluster[R]

@!attribute address

@return [ Address ] The address.

@!attribute down_at

@return [ Time ] The time the node was marked as down.

@!attribute latency

@return [ Integer ] The latency in milliseconds.

@!attribute options

@return [ Hash ] The node options.

@!attribute refreshed_at

@return [ Time ] The last time the node did a refresh.
down_at[R]

@!attribute address

@return [ Address ] The address.

@!attribute down_at

@return [ Time ] The time the node was marked as down.

@!attribute latency

@return [ Integer ] The latency in milliseconds.

@!attribute options

@return [ Hash ] The node options.

@!attribute refreshed_at

@return [ Time ] The last time the node did a refresh.
latency[R]

@!attribute address

@return [ Address ] The address.

@!attribute down_at

@return [ Time ] The time the node was marked as down.

@!attribute latency

@return [ Integer ] The latency in milliseconds.

@!attribute options

@return [ Hash ] The node options.

@!attribute refreshed_at

@return [ Time ] The last time the node did a refresh.
refreshed_at[R]

@!attribute address

@return [ Address ] The address.

@!attribute down_at

@return [ Time ] The time the node was marked as down.

@!attribute latency

@return [ Integer ] The latency in milliseconds.

@!attribute options

@return [ Hash ] The node options.

@!attribute refreshed_at

@return [ Time ] The last time the node did a refresh.

Public Class Methods

new(cluster, address) click to toggle source
# File lib/bones/rpc/node.rb, line 168
def initialize(cluster, address)
  @cluster = cluster
  @address = address
  @connection = cluster.session.backend.connection_class.new(current_actor)
  @down_at = nil
  @refreshed_at = nil
  @latency = nil
  @instrumenter = @cluster.options[:instrumenter] || Instrumentable::Log
  @registry = Node::Registry.new
  @request_id = 0
  @synack_id = 0
  @address.resolve(current_actor)
end

Public Instance Methods

==(other) click to toggle source

Is this node equal to another?

@example Is the node equal to another.

node == other

@param [ Node ] other The other node.

@return [ true, false ] If the addresses are equal.

@since 0.0.1

# File lib/bones/rpc/node.rb, line 39
def ==(other)
  return false unless other.is_a?(Node)
  address.resolved == other.address.resolved
end
Also aliased as: eql?
adapter() click to toggle source
# File lib/bones/rpc/node.rb, line 45
def adapter
  @adapter ||= Adapter.get(options[:adapter] || :json)
end
attach(channel, id, future) click to toggle source
# File lib/bones/rpc/node.rb, line 49
def attach(channel, id, future)
  @registry.set(channel, id, future)
end
cleanup_socket(socket) click to toggle source
# File lib/bones/rpc/node.rb, line 53
def cleanup_socket(socket)
  @registry.flush
end
connect() click to toggle source

Connect the node on the underlying connection.

@example Connect the node.

node.connect

@raise [ Errors::ConnectionFailure ] If connection failed.

@return [ true ] If the connection suceeded.

@since 0.0.1

# File lib/bones/rpc/node.rb, line 67
def connect
  start = Time.now
  connection { |conn| conn.connect }
  @latency = Time.now - start
  @down_at = nil
  true
end
connected?() click to toggle source

Is the node currently connected?

@example Is the node connected?

node.connected?

@return [ true, false ] If the node is connected or not.

@since 0.0.1

# File lib/bones/rpc/node.rb, line 83
def connected?
  connection { |conn| conn.alive? }
end
connection() { |connection| ... } click to toggle source
# File lib/bones/rpc/node.rb, line 87
def connection
  if block_given?
    yield @connection
  else
    @connection
  end
end
detach(channel, id) click to toggle source
# File lib/bones/rpc/node.rb, line 95
def detach(channel, id)
  @registry.get(channel, id)
end
disconnect() click to toggle source

Force the node to disconnect from the server.

@example Disconnect the node.

node.disconnect

@return [ true ] If the disconnection succeeded.

@since 0.0.1

# File lib/bones/rpc/node.rb, line 107
def disconnect
  connection { |conn| conn.disconnect }
  true
end
down() click to toggle source

Mark the node as down.

@example Mark the node as down.

node.down

@return [ nil ] Nothing.

@since 0.0.1

# File lib/bones/rpc/node.rb, line 132
def down
  @down_at = Time.new
  @latency = nil
  disconnect if connected?
end
down?() click to toggle source

Is the node down?

@example Is the node down?

node.down?

@return [ Time, nil ] The time the node went down, or nil if up.

@since 0.0.1

# File lib/bones/rpc/node.rb, line 120
def down?
  !!@down_at
end
ensure_connected() { |current_actor| ... } click to toggle source

Yields the block if a connection can be established, retrying when a connection error is raised.

@example Ensure we are connection.

node.ensure_connected do
  #...
end

@raises [ ConnectionFailure ] When a connection cannot be established.

@return [ nil ] nil.

@since 0.0.1

# File lib/bones/rpc/node.rb, line 151
def ensure_connected(&block)
  begin
    connect unless connected?
    yield(current_actor)
  rescue Exception => e
    Failover.get(e).execute(e, current_actor, &block)
  end
end
eql?(other)
Alias for: ==
handle_message(message) click to toggle source
# File lib/bones/rpc/node.rb, line 160
def handle_message(message)
  logging(message) do
    if future = message.get(current_actor)
      message.signal(future)
    end
  end
end
inspect() click to toggle source

Get the node as a nice formatted string.

@example Inspect the node.

node.inspect

@return [ String ] The string inspection.

@since 0.0.1

# File lib/bones/rpc/node.rb, line 190
def inspect
  "<#{self.class.name} resolved_address=#{address.resolved.inspect}>"
end
needs_refresh?(time) click to toggle source

Does the node need to be refreshed?

@example Does the node require refreshing?

node.needs_refresh?(time)

@param [ Time ] time The next referesh time.

@return [ true, false] Whether the node needs to be refreshed.

@since 0.0.1

# File lib/bones/rpc/node.rb, line 204
def needs_refresh?(time)
  !refreshed_at || refreshed_at < time
end
notify(method, params) click to toggle source
# File lib/bones/rpc/node.rb, line 208
def notify(method, params)
  without_future(Protocol::Notify.new(method, params))
end
options() click to toggle source
# File lib/bones/rpc/node.rb, line 212
def options
  cluster.options
end
refresh() click to toggle source

Refresh information about the node, such as it's status in the replica set and it's known peers.

@example Refresh the node.

node.refresh

@raise [ ConnectionFailure ] If the node cannot be reached.

@raise [ ReplicaSetReconfigured ] If the node is no longer a primary node and

refresh was called within an +#ensure_primary+ block.

@return [ nil ] nil.

@since 0.0.1

# File lib/bones/rpc/node.rb, line 230
def refresh
  if address.resolve(current_actor)
    begin
      @refreshed_at = Time.now
      if synchronize.value(refresh_timeout)
        cluster.handle_refresh(current_actor)
      else
        down
      end
    rescue Timeout::Error
      down
    end
  end
end
refresh_timeout() click to toggle source

Get the timeout, in seconds, for this node.

@example Get the timeout in seconds.

node.refresh_timeout

@return [ Integer ] The configured timeout or the default of 5.

@since 0.0.1

# File lib/bones/rpc/node.rb, line 253
def refresh_timeout
  @refresh_timeout ||= (options[:timeout] || 5)
end
registry_empty?() click to toggle source
# File lib/bones/rpc/node.rb, line 257
def registry_empty?
  @registry.empty?
end
request(method, params) click to toggle source
# File lib/bones/rpc/node.rb, line 261
def request(method, params)
  with_future(Protocol::Request.new(next_request_id, method, params))
end
synchronize() click to toggle source
# File lib/bones/rpc/node.rb, line 265
def synchronize
  with_future(Protocol::Synchronize.new(next_synack_id, adapter))
end

Private Instance Methods

logging(message) { || ... } click to toggle source

Yield the block with logging.

@api private

@example Yield with logging.

logging(operations) do
  node.command(ismaster: 1)
end

@param [ Array<Message> ] operations The operations.

@return [ Object ] The result of the yield.

@since 0.0.1

# File lib/bones/rpc/node.rb, line 285
def logging(message)
  instrument(TOPIC, prefix: "  BONES-RPC: #{address.resolved}", ops: [message]) do
    yield if block_given?
  end
end
next_request_id() click to toggle source
# File lib/bones/rpc/node.rb, line 291
def next_request_id
  @request_id += 1
  if @request_id >= 1 << 31
    @request_id = 0
  end
  @request_id
end
next_synack_id() click to toggle source
# File lib/bones/rpc/node.rb, line 299
def next_synack_id
  @synack_id += 1
  if @synack_id >= (1 << 32) - 1
    @synack_id = 0
  end
  @synack_id
end
process(message, future = nil) click to toggle source
# File lib/bones/rpc/node.rb, line 307
def process(message, future = nil)
  logging(message) do
    ensure_connected do
      connection { |conn| conn.write([[message, future]]) }
    end
  end
  return future
rescue Exception => e
  abort(e)
end
with_future(message) click to toggle source
# File lib/bones/rpc/node.rb, line 318
def with_future(message)
  process(message, cluster.session.backend.future_class.new)
end
without_future(message) click to toggle source
# File lib/bones/rpc/node.rb, line 322
def without_future(message)
  process(message, nil)
end