class Elasticsearch::Drain::CLI
Attributes
active_nodes[RW]
drainer[R]
Public Instance Methods
adjusted_min_size(nodes)
click to toggle source
# File lib/elasticsearch/drain/cli.rb, line 116 def adjusted_min_size(nodes) min_size = drainer.asg.min_size desired_capacity = drainer.asg.desired_capacity desired_min_size = if (desired_capacity - nodes.length) >= min_size # Removing the nodes won't violate the min_size # Reduce the asg min_size proportionally (min_size - nodes.length) <= 0 ? 0 : (min_size - nodes.length) else # Removing the nodes will result in the min_size being violated (desired_capacity - nodes.length) <= 0 ? 0 : (desired_capacity - nodes.length) end desired_min_size end
asg()
click to toggle source
rubocop:enable Metrics/LineLength
# File lib/elasticsearch/drain/cli.rb, line 20 def asg # rubocop:disable Metrics/MethodLength @drainer = Elasticsearch::Drain.new(options[:host], options[:asg], options[:region]) ensure_cluster_healthy @active_nodes = drainer.active_nodes_in_asg # If :nodes are specified, :number has no effect if options[:nodes] say "Nodes #{options[:nodes].join(', ')} have been specified, the --number option has no effect" number_to_drain = nil currently_draining_nodes = nil else number_to_drain = options[:number] currently_draining_nodes = drainer.cluster.currently_draining('_id') end # If a node or nodes are specified, only drain the requested node(s) @active_nodes = active_nodes.find_all do |n| instance_id = drainer.asg.instance(n.ipaddress).instance_id options[:nodes].include?(instance_id) end if options[:nodes] do_exit { say_status 'Complete', 'Nothing to do', :green } if active_nodes.empty? say_status 'Found Nodes', "AutoScalingGroup: #{instances(active_nodes)}", :magenta until active_nodes.empty? ensure_cluster_healthy nodes = active_nodes # If there are nodes in cluster settings "transient.cluster.routing.allocation.exclude" # test if those nodes are still in the ASG. If so, work on them first unless nodes are # specified. if currently_draining_nodes nodes_to_drain = active_nodes.find_all { |n| currently_draining_nodes.split(',').include?(n.id) } # If the list of nodes_to_drain isn't empty, we want to set nodes to the list of nodes # we've already been working on. unless nodes_to_drain.empty? nodes = nodes_to_drain say_status 'Active Nodes', "Resuming drain process on #{instances(nodes)}", :magenta end # We should only process currently_draining_nodes once currently_draining_nodes = nil end # If we specify a number but DON'T specify nodes, sample the active_nodes. if number_to_drain nodes = nodes.sample(number_to_drain.to_i) say_status 'Active Nodes', "Sampled #{number_to_drain} nodes and got #{instances(nodes)}", :magenta end @active_nodes = nodes unless options[:continue] drain_nodes(nodes) deleted_nodes = remove_nodes(nodes) # Remove the drained nodes from the list of active_nodes deleted_nodes.each do |deleted_node| active_nodes.delete_if { |n| n.id == deleted_node.id } end unless active_nodes.empty? say_status 'Drain Nodes', "#{active_nodes.length} nodes remaining", :green sleep_time = wait_sleep_time say_status 'Waiting', "Sleeping for #{sleep_time} seconds before the next iteration", :green sleep sleep_time end end say_status 'Complete', 'Draining nodes complete!', :green end
do_exit(code = 0, &block)
click to toggle source
# File lib/elasticsearch/drain/cli.rb, line 106 def do_exit(code = 0, &block) block.call exit code end
drain_nodes(nodes)
click to toggle source
# File lib/elasticsearch/drain/cli.rb, line 129 def drain_nodes(nodes) drainer.asg.min_size = adjusted_min_size(nodes) nodes_to_drain = nodes.map(&:id).join(',') say_status 'Drain Nodes', "Draining nodes: #{nodes_to_drain}", :magenta drainer.cluster.drain_nodes(nodes_to_drain, '_id') end
ensure_cluster_healthy()
click to toggle source
# File lib/elasticsearch/drain/cli.rb, line 98 def ensure_cluster_healthy if drainer.cluster.healthy? say_status 'Cluster Health', 'Cluster is healthy', :green else do_exit(1) { say_status 'Cluster Health', 'Cluster is unhealthy', :red } end end
instances(nodes)
click to toggle source
# File lib/elasticsearch/drain/cli.rb, line 111 def instances(nodes) instances = nodes.map(&:ipaddress) instances.join(' ') end
remove_node(instance)
click to toggle source
# File lib/elasticsearch/drain/cli.rb, line 170 def remove_node(instance) # rubocop:disable Metrics/MethodLength instance_id = drainer.asg.instance(instance.ipaddress).instance_id instance.instance_id = instance_id say_status( 'Removing Node', "Removing #{instance.ipaddress} from Elasticsearch cluster and #{drainer.asg.asg} AutoScalingGroup", :magenta ) sleep 5 unless instance.in_recovery? node = "#{instance.instance_id}(#{instance.ipaddress})" ensure_cluster_healthy say_status 'ASG Remove Node', "Removing node: #{node} from AutoScalingGroup: #{drainer.asg.asg}", :magenta drainer.asg.detach_instance(instance.instance_id) sleep 2 ensure_cluster_healthy say_status 'Terminate Instance', "Terminating instance: #{node}", :magenta instance.terminate rescue Errors::NodeNotFound false end
remove_nodes(nodes)
click to toggle source
# File lib/elasticsearch/drain/cli.rb, line 146 def remove_nodes(nodes) # rubocop:disable Metrics/MethodLength deleted_nodes = [] while nodes.length > 0 sleep_time = wait_sleep_time nodes.each do |instance| instance = drainer.nodes.filter_nodes([instance.ipaddress], true).first unless instance.nil? || instance == 0 if instance.bytes_stored > 0 say_status 'Drain Status', "Node #{instance.ipaddress} has #{instance.bytes_stored} bytes to move", :blue sleep sleep_time else next unless remove_node(instance) deleted_nodes.push(nodes.find { |n| n.ipaddress == instance.ipaddress }) nodes.delete_if { |n| n.ipaddress == instance.ipaddress } break if nodes.length < 1 say_status 'Waiting', 'Sleeping for 1 minute before removing the next node', :green sleep 60 end end end end deleted_nodes end
wait_sleep_time()
click to toggle source
# File lib/elasticsearch/drain/cli.rb, line 136 def wait_sleep_time ips = active_nodes.map(&:ipaddress) bytes = drainer.nodes.filter_nodes(ips).map(&:bytes_stored) sleep_time = 10 sleep_time = 30 if bytes.any? { |b| b >= 100_000 } sleep_time = 60 if bytes.any? { |b| b >= 1_000_000 } sleep_time = 120 if bytes.any? { |b| b >= 10_000_000_000 } sleep_time end