class Elasticsearch::Manager::ESManager

Attributes

leader[RW]
members[RW]
nodes[RW]
state[RW]

Public Class Methods

new(cluster_host = 'localhost', port = 9200) click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 11
def initialize(cluster_host = 'localhost', port = 9200)
  @client = Elasticsearch::Client::ESClient.new(cluster_host, port)
  @state = nil
  @leader = nil
  @nodes = nil
  @members = nil
end

Public Instance Methods

cluster_green?() click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 19
def cluster_green?
  @client.green?
end
cluster_health() click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 41
def cluster_health
  health = @client.health
  Health.new.extend(Health::Representer).from_hash(health)
end
cluster_members!() click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 33
def cluster_members!
  @state = cluster_state
  @nodes = state.nodes
  @nodes.sort! { |a,b| a.id <=> b.id }
  @leader = @nodes.select { |n| n.master }[0].ip
  @members = @nodes.map { |n| n.ip }
end
cluster_stable?() click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 27
def cluster_stable?
  health = cluster_health
  moving = [health.relocating_shards, health.initializing_shards, health.unassigned_shards]
  cluster_green? && moving.all? { |x| x == 0 } 
end
cluster_state() click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 46
def cluster_state
  state = @client.state
  ClusterState.new.extend(ClusterState::Representer).from_hash(state)
end
cluster_status() click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 23
def cluster_status
  @client.status
end
disable_routing() click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 51
def disable_routing
  ret = @client.routing(true)
  ret['transient']['cluster']['routing']['allocation']['enable'] == 'none'
end
enable_routing() click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 56
def enable_routing
  ret = @client.routing(false)
  ret['transient']['cluster']['routing']['allocation']['enable'] == 'all'
end
list_node_ips() click to toggle source
# File lib/elasticsearch/manager/nodes.rb, line 5
def list_node_ips
  puts "#{@leader} -- master"
  @members.each do |m|
    unless m == @leader
      puts m
    end
  end
end
restart_node(node_ip, timeout, sleep_interval) click to toggle source
# File lib/elasticsearch/manager/rollingrestart.rb, line 30
def restart_node(node_ip, timeout, sleep_interval)
    puts "\nRestarting Elasticsearch on node: #{node_ip}"
    # Pull the current node's state
    n = @state.nodes.select { |n| n.ip == node_ip }[0]

    raise ClusterSettingsUpdateError, "Could not disable shard routing prior to restarting node: #{node_ip}".colorize(:red) unless disable_routing

    Net::SSH.start(node_ip, ENV['USER']) do |ssh|
      ssh.exec 'sudo service elasticsearch restart'
    end
    puts "Elasticsearch restarted on node: #{node_ip}"

    begin
      wait_for_node_available(node_ip, timeout, sleep_interval)
      puts "Node back up!".colorize(:green)
    rescue Timeout::Error
      raise NodeAvailableTimeout, "Node did not become available after waiting #{timeout} seconds...".colorize(:red)
    end

    # Make sure the cluster is willing to concurrently recover as many
    # shards per node as this node happens to have.
    raise ClusterSettingsUpdateError, "Could not update node_concurrent_recoveries prior to restarting node: #{node_ip}".colorize(:red) unless set_concurrent_recoveries(n.count_started_shards + 1)

    raise ClusterSettingsUpdateError, "Could not re-enable shard routing following restart of node: #{node_ip}".colorize(:red) unless enable_routing

    begin
      wait_for_stable(timeout, sleep_interval)
      puts "Cluster stabilized!".colorize(:green)
    rescue Timeout::Error
      raise StabalizationTimeout, "Cluster not re-stabilize after waiting #{timeout} seconds...".colorize(:red)
    end
end
rolling_restart(timeout = 600, sleep_interval = 30) click to toggle source
# File lib/elasticsearch/manager/rollingrestart.rb, line 14
def rolling_restart(timeout = 600, sleep_interval = 30)
  highline = HighLine.new
  @members.each do |m|
    unless m == @leader
      unless highline.agree('Continue with rolling restart of cluster? (y/n) ')
        raise UserRequestedStop, "Stopping rolling restart at user request!".colorize(:red)
      end
      restart_node(m, timeout, sleep_interval)
    end
  end
  unless highline.agree("\nRestarting current cluster master, continue? (y/n) ")
    raise UserRequestedStop, "Stopping rolling restart at user request before restarting master node!".colorize(:red)
  end
  restart_node(@leader, timeout, sleep_interval)
end
set_concurrent_recoveries(num_recoveries) click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 61
def set_concurrent_recoveries(num_recoveries)
  ret = @client.node_concurrent_recoveries(num_recoveries)
  # Elasticache seems to return integer settings as strings when setting them...
  ret['transient']['cluster']['routing']['allocation']['node_concurrent_recoveries'] == num_recoveries.to_s
end

Protected Instance Methods

wait_for_node_available(node_ip, timeout = 600, sleep_interval = 30) click to toggle source
# File lib/elasticsearch/manager/rollingrestart.rb, line 74
def wait_for_node_available(node_ip, timeout = 600, sleep_interval = 30)
  Timeout.timeout(timeout) do
    state = cluster_state
    while !state.nodes.map { |n| n.ip }.include?(node_ip)
      puts "Waiting for node to become available...".colorize(:yellow)
      sleep(sleep_interval)
      state = cluster_state
    end
  end
end
wait_for_stable(timeout = 600, sleep_interval = 30) click to toggle source
# File lib/elasticsearch/manager/rollingrestart.rb, line 65
def wait_for_stable(timeout = 600, sleep_interval = 30)
  Timeout.timeout(timeout) do
    while !cluster_stable?
      puts "Waiting for cluster to stabilize...".colorize(:yellow)
      sleep(sleep_interval)
    end
  end
end