class Elasticsearch::Manager::ESManager
Attributes
leader[RW]
members[RW]
nodes[RW]
state[RW]
Public Class Methods
new(cluster_host = 'localhost', port = 9200)
click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 11 def initialize(cluster_host = 'localhost', port = 9200) @client = Elasticsearch::Client::ESClient.new(cluster_host, port) @state = nil @leader = nil @nodes = nil @members = nil end
Public Instance Methods
cluster_green?()
click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 19 def cluster_green? @client.green? end
cluster_health()
click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 41 def cluster_health health = @client.health Health.new.extend(Health::Representer).from_hash(health) end
cluster_members!()
click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 33 def cluster_members! @state = cluster_state @nodes = state.nodes @nodes.sort! { |a,b| a.id <=> b.id } @leader = @nodes.select { |n| n.master }[0].ip @members = @nodes.map { |n| n.ip } end
cluster_stable?()
click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 27 def cluster_stable? health = cluster_health moving = [health.relocating_shards, health.initializing_shards, health.unassigned_shards] cluster_green? && moving.all? { |x| x == 0 } end
cluster_state()
click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 46 def cluster_state state = @client.state ClusterState.new.extend(ClusterState::Representer).from_hash(state) end
cluster_status()
click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 23 def cluster_status @client.status end
disable_routing()
click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 51 def disable_routing ret = @client.routing(true) ret['transient']['cluster']['routing']['allocation']['enable'] == 'none' end
enable_routing()
click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 56 def enable_routing ret = @client.routing(false) ret['transient']['cluster']['routing']['allocation']['enable'] == 'all' end
list_node_ips()
click to toggle source
# File lib/elasticsearch/manager/nodes.rb, line 5 def list_node_ips puts "#{@leader} -- master" @members.each do |m| unless m == @leader puts m end end end
restart_node(node_ip, timeout, sleep_interval)
click to toggle source
# File lib/elasticsearch/manager/rollingrestart.rb, line 30 def restart_node(node_ip, timeout, sleep_interval) puts "\nRestarting Elasticsearch on node: #{node_ip}" # Pull the current node's state n = @state.nodes.select { |n| n.ip == node_ip }[0] raise ClusterSettingsUpdateError, "Could not disable shard routing prior to restarting node: #{node_ip}".colorize(:red) unless disable_routing Net::SSH.start(node_ip, ENV['USER']) do |ssh| ssh.exec 'sudo service elasticsearch restart' end puts "Elasticsearch restarted on node: #{node_ip}" begin wait_for_node_available(node_ip, timeout, sleep_interval) puts "Node back up!".colorize(:green) rescue Timeout::Error raise NodeAvailableTimeout, "Node did not become available after waiting #{timeout} seconds...".colorize(:red) end # Make sure the cluster is willing to concurrently recover as many # shards per node as this node happens to have. raise ClusterSettingsUpdateError, "Could not update node_concurrent_recoveries prior to restarting node: #{node_ip}".colorize(:red) unless set_concurrent_recoveries(n.count_started_shards + 1) raise ClusterSettingsUpdateError, "Could not re-enable shard routing following restart of node: #{node_ip}".colorize(:red) unless enable_routing begin wait_for_stable(timeout, sleep_interval) puts "Cluster stabilized!".colorize(:green) rescue Timeout::Error raise StabalizationTimeout, "Cluster not re-stabilize after waiting #{timeout} seconds...".colorize(:red) end end
rolling_restart(timeout = 600, sleep_interval = 30)
click to toggle source
# File lib/elasticsearch/manager/rollingrestart.rb, line 14 def rolling_restart(timeout = 600, sleep_interval = 30) highline = HighLine.new @members.each do |m| unless m == @leader unless highline.agree('Continue with rolling restart of cluster? (y/n) ') raise UserRequestedStop, "Stopping rolling restart at user request!".colorize(:red) end restart_node(m, timeout, sleep_interval) end end unless highline.agree("\nRestarting current cluster master, continue? (y/n) ") raise UserRequestedStop, "Stopping rolling restart at user request before restarting master node!".colorize(:red) end restart_node(@leader, timeout, sleep_interval) end
set_concurrent_recoveries(num_recoveries)
click to toggle source
# File lib/elasticsearch/manager/manager.rb, line 61 def set_concurrent_recoveries(num_recoveries) ret = @client.node_concurrent_recoveries(num_recoveries) # Elasticache seems to return integer settings as strings when setting them... ret['transient']['cluster']['routing']['allocation']['node_concurrent_recoveries'] == num_recoveries.to_s end
Protected Instance Methods
wait_for_node_available(node_ip, timeout = 600, sleep_interval = 30)
click to toggle source
# File lib/elasticsearch/manager/rollingrestart.rb, line 74 def wait_for_node_available(node_ip, timeout = 600, sleep_interval = 30) Timeout.timeout(timeout) do state = cluster_state while !state.nodes.map { |n| n.ip }.include?(node_ip) puts "Waiting for node to become available...".colorize(:yellow) sleep(sleep_interval) state = cluster_state end end end
wait_for_stable(timeout = 600, sleep_interval = 30)
click to toggle source
# File lib/elasticsearch/manager/rollingrestart.rb, line 65 def wait_for_stable(timeout = 600, sleep_interval = 30) Timeout.timeout(timeout) do while !cluster_stable? puts "Waiting for cluster to stabilize...".colorize(:yellow) sleep(sleep_interval) end end end