module Elastics::Client::Cluster

Public Instance Methods

discover_cluster() click to toggle source
# File lib/elastics/client/cluster.rb, line 10
def discover_cluster
  # `nothing` allows not to fetch all unnecessary data
  discovered = request(index: '_nodes', type: '_all', id: 'nothing')['nodes'].
    map { |id, node|
      match = node['http_address'].match(/inet\[.*?\/([\da-f.:]+)/i)
      match && match[1]
    }.compact
  @cluster_mutex.synchronize do
    @hosts.clear.concat(discovered - @dead_hosts.keys)
  end
end

Private Instance Methods

add_dead_host(host, resurrect_at = nil) click to toggle source
# File lib/elastics/client/cluster.rb, line 77
def add_dead_host(host, resurrect_at = nil)
  resurrect_at ||= Time.now.to_i + @resurrect_timeout
  @hosts.delete(host)
  @dead_hosts[host] = resurrect_at
  @resurrect_at ||= resurrect_at
end
http_request(method, path, query, body, params = nil) click to toggle source
Calls superclass method
# File lib/elastics/client/cluster.rb, line 33
def http_request(method, path, query, body, params = nil)
  host = next_cluster_host
  Timeout.timeout(@connect_timeout) do
    super(method, path, query, body, params, host)
  end
rescue Timeout::Error, HTTPClient::ConnectTimeoutError
  add_dead_host(host)
  retry
end
initialize_cluster(defaults) click to toggle source
# File lib/elastics/client/cluster.rb, line 23
def initialize_cluster(defaults)
  @hosts = ThreadSafe::Array.new defaults[:host]
  @dead_hosts = ThreadSafe::Hash.new
  @connect_timeout = defaults[:connect_timeout] || 10
  @resurrect_timeout = defaults[:resurrect_timeout] || 60
  @current_host_n = 0
  @cluster_mutex = Mutex.new
  discover_cluster if defaults[:discover]
end
next_cluster_host() click to toggle source

Very simple implementation. It may skip some hosts on current cycle when other is marked as dead. This should not be a problem. TODO: check Enumerable#cycle for thread-safety and use it if possible

# File lib/elastics/client/cluster.rb, line 46
def next_cluster_host
  if @resurrect_at
    time = Time.now.to_i
    resurrect_cluster(time) if @resurrect_at <= time
  end
  host_n = @current_host_n
  loop do
    host = @hosts[host_n]
    if !host
      raise NoAliveHosts if host_n == 0
      host_n = 0
    else
      @current_host_n = host_n + 1
      return host
    end
  end
end
resurrect_cluster(time = Time.now.to_i) click to toggle source
# File lib/elastics/client/cluster.rb, line 64
def resurrect_cluster(time = Time.now.to_i)
  @cluster_mutex.synchronize do
    @dead_hosts.delete_if do |host, resurrect_at|
      # skip the rest because values are sorted
      if time < resurrect_at
        @resurrect_at = resurrect_at
        break
      end
      @hosts << host
    end
  end
end