class EM::Voldemort::Cluster

A client for a Voldemort cluster. The cluster is initialized by giving the hostname and port of one of its nodes, and the other nodes are autodiscovered.

TODO if one node is down, a request should be retried on a replica.

Constants

CLUSTER_INFO_KEY
METADATA_STORE_NAME
RETRY_BOOTSTRAP_PERIOD
STORES_INFO_KEY

Attributes

bootstrap_host[R]
bootstrap_port[R]
cluster_name[R]
logger[R]

Public Class Methods

new(options={}) click to toggle source
# File lib/em-voldemort/cluster.rb, line 16
def initialize(options={})
  @bootstrap_host = options[:host] or raise "#{self.class.name} requires :host"
  @bootstrap_port = options[:port] or raise "#{self.class.name} requires :port"
  @logger = options[:logger] || Logger.new($stdout)
  @bootstrap_state = :not_started
  @stores = {}
end

Public Instance Methods

connect() click to toggle source

Bootstraps the cluster (discovers all cluster nodes and metadata by connecting to the one node that was specified). Calling connect is optional, since it also happens automatically when you start making requests.

# File lib/em-voldemort/cluster.rb, line 27
def connect
  @bootstrap_timer ||= setup_bootstrap_timer do
    start_bootstrap if @bootstrap_state == :not_started || @bootstrap_state == :failed
  end
  start_bootstrap if @bootstrap_state == :not_started
  @bootstrap
end
get(store_name, key, router=nil) click to toggle source

Fetches the value associated with a particular key in a particular store. Returns a deferrable that succeeds with the value, or fails with an exception object.

# File lib/em-voldemort/cluster.rb, line 37
def get(store_name, key, router=nil)
  when_ready do |deferrable|
    connections = choose_connections(key, router)
    if connections.size == 0
      deferrable.fail(ServerError.new('No connection can handle the request'))
    elsif connections.first.health == :good
      # Send the request to the preferred node, but fall back to others if it fails.
      get_in_sequence(connections, store_name, key, deferrable)
    else
      # The request to the first node will probably fail, but we send it anyway, so that the
      # connection can discover when the node comes back up. Make the request to other
      # connections at the same time to avoid delaying the request.
      get_in_parallel(connections, store_name, key, deferrable)
    end
  end
end
store(store_name) click to toggle source

Returns a {Store} object configured for accessing a particular store on the cluster.

# File lib/em-voldemort/cluster.rb, line 55
def store(store_name)
  @stores[store_name.to_s] ||= Store.new(self, store_name)
end

Private Instance Methods

choose_connections(key, router=nil) click to toggle source
# File lib/em-voldemort/cluster.rb, line 173
def choose_connections(key, router=nil)
  if router
    router.partitions(key, @node_by_partition).map do |partition|
      @node_by_partition[partition]
    end.compact
  else
    @node_by_id.values.sample(2) # choose two random connections
  end
end
finish_bootstrap() click to toggle source
# File lib/em-voldemort/cluster.rb, line 96
def finish_bootstrap
  @bootstrap_conn.close
  @bootstrap_conn = nil
  deferrable = @bootstrap
  @bootstrap = nil
  if @bootstrap_state == :complete
    @bootstrap_timer.cancel
    @bootstrap_timer = nil
    deferrable.succeed
  else
    deferrable.fail
  end
end
get_from_connection(connection, store_name, key, deferrable=EM::DefaultDeferrable.new) click to toggle source

Makes a ‘get’ request for a particular key to a particular Voldemort store, using a particular connection. Returns a deferrable that succeeds with the value in the store if successful, or fails with an exception object if not.

# File lib/em-voldemort/cluster.rb, line 226
def get_from_connection(connection, store_name, key, deferrable=EM::DefaultDeferrable.new)
  request = connection.send_request(get_request(store_name, key))

  request.callback do |response|
    begin
      parsed_response = get_response(response)
    rescue ClientError => error
      deferrable.fail(error)
    rescue => error
      message = "protocol error #{error.class}: #{error.message} while parsing response: #{response.inspect}"
      logger.error(message)
      deferrable.fail(ServerError.new(message))
    else
      deferrable.succeed(parsed_response)
    end
  end

  request.errback {|response| deferrable.fail(response) }
  deferrable
end
get_in_parallel(connections, store_name, key, deferrable) click to toggle source

Makes a ‘get’ request to all given connections in parallel. Succeeds with the first successful response, or fails if all connections’ requests fail.

# File lib/em-voldemort/cluster.rb, line 201
def get_in_parallel(connections, store_name, key, deferrable)
  raise ArgumentError, 'connections must not be empty' if connections.empty?
  responses = 0
  done = false
  connections.each do |connection|
    request = get_from_connection(connection, store_name, key)
    request.callback do |response|
      deferrable.succeed(response) unless done
      done = true
    end
    request.errback do |error|
      if error.is_a?(ClientError) && !done
        deferrable.fail(error)
        done = true
      elsif !done
        responses += 1
        deferrable.fail(error) if responses == connections.size
      end
    end
  end
end
get_in_sequence(connections, store_name, key, deferrable) click to toggle source

Makes a ‘get’ request to the first connection in the given list of connections. If the request fails with a ServerError, retries the request on the next connection in the list, etc, eventually failing if none of the connections can successfully handle the request.

# File lib/em-voldemort/cluster.rb, line 186
def get_in_sequence(connections, store_name, key, deferrable)
  raise ArgumentError, 'connections must not be empty' if connections.empty?
  request = get_from_connection(connections.first, store_name, key)
  request.callback {|response| deferrable.succeed(response) }
  request.errback do |error|
    if error.is_a?(ServerError) && connections.size > 1
      get_in_sequence(connections.drop(1), store_name, key, deferrable)
    else
      deferrable.fail(error)
    end
  end
end
parse_cluster_info(cluster_xml) click to toggle source

Parses Voldemort’s cluster.xml configuration file, as obtained in the bootstrap process.

# File lib/em-voldemort/cluster.rb, line 131
def parse_cluster_info(cluster_xml)
  doc = Nokogiri::XML(cluster_xml)
  @cluster_name = doc.xpath('/cluster/name').text
  @node_by_id = {}
  @node_by_partition = {}
  doc.xpath('/cluster/server').each do |node|
    node_id = node.xpath('id').text.to_i
    connection = Connection.new(
      :host => node.xpath('host').text,
      :port => node.xpath('socket-port').text.to_i,
      :node_id => node_id,
      :logger => logger
    )
    @node_by_id[node_id] = connection
    node.xpath('partitions').text.split(/\s*,\s*/).map(&:to_i).each do |partition|
      raise "duplicate assignment of partition #{partition}" if @node_by_partition[partition]
      @node_by_partition[partition] = connection
    end
  end
  raise 'no partitions defined on cluster' if @node_by_partition.empty?
  (0...@node_by_partition.size).each do |partition|
    raise "missing node assignment for partition #{partition}" unless @node_by_partition[partition]
  end
  @bootstrap_state = :cluster_info_ok
rescue => e
  logger.warn "Error processing cluster.xml: #{e}"
  @bootstrap_state = :failed
end
parse_stores_info(stores_xml) click to toggle source
# File lib/em-voldemort/cluster.rb, line 160
def parse_stores_info(stores_xml)
  doc = Nokogiri::XML(stores_xml)
  doc.xpath('/stores/store').each do |store|
    store_name = store.xpath('name').text
    @stores[store_name] ||= Store.new(self, store_name)
    @stores[store_name].load_config(store)
  end
  @bootstrap_state = :complete
rescue => e
  logger.warn "Error processing stores.xml: #{e}"
  @bootstrap_state = :failed
end
setup_bootstrap_timer() { || ... } click to toggle source
# File lib/em-voldemort/cluster.rb, line 61
def setup_bootstrap_timer
  EM.add_periodic_timer(RETRY_BOOTSTRAP_PERIOD) { yield }
end
start_bootstrap() click to toggle source
# File lib/em-voldemort/cluster.rb, line 65
def start_bootstrap
  @bootstrap_state = :started
  @bootstrap_conn = Connection.new(:host => bootstrap_host, :port => bootstrap_port, :logger => logger)
  @bootstrap = EM::DefaultDeferrable.new

  cluster_req = get_from_connection(@bootstrap_conn, METADATA_STORE_NAME, CLUSTER_INFO_KEY)

  cluster_req.callback do |cluster_xml|
    if parse_cluster_info(cluster_xml) == :cluster_info_ok
      stores_req = get_from_connection(@bootstrap_conn, METADATA_STORE_NAME, STORES_INFO_KEY)
      stores_req.callback do |stores_xml|
        parse_stores_info(stores_xml)
        finish_bootstrap
      end
      stores_req.errback do |error|
        logger.warn "Could not load Voldemort's stores.xml: #{error}"
        @bootstrap_state = :failed
        finish_bootstrap
      end
    else
      finish_bootstrap
    end
  end

  cluster_req.errback do |error|
    logger.warn "Could not load Voldemort's cluster.xml: #{error}"
    @bootstrap_state = :failed
    finish_bootstrap
  end
end
when_ready() { |request| ... } click to toggle source

Delays execution of the block until bootstrap has completed. Returns a new deferrable, and passes the same deferrable to the block when it is executed (it’s up to the block to make the deferrable succeed or fail).

# File lib/em-voldemort/cluster.rb, line 113
def when_ready(&block)
  connect
  request = EM::DefaultDeferrable.new
  case @bootstrap_state
  when :started, :cluster_info_ok
    @bootstrap.callback { yield request }
    @bootstrap.errback { request.fail(ServerError.new('Could not bootstrap Voldemort cluster')) }
  when :complete
    yield request
  when :failed
    request.fail(ServerError.new('Could not bootstrap Voldemort cluster'))
  else
    raise "bad bootstrap_state: #{@bootstrap_state.inspect}"
  end
  request
end