class Elasticsearch::Embedded::Cluster
Class used to manage a local cluster of elasticsearch nodes
Attributes
Options for cluster
Options for cluster
Options for downloader
Options for cluster
Options for cluster
Options for cluster
Options for cluster
Options for cluster
Options for downloader
Options for downloader
Public Class Methods
Assign default values to options
# File lib/elasticsearch/embedded/cluster.rb, line 22 def initialize @nodes = 1 @port = 9250 @version = Downloader::DEFAULT_VERSION @working_dir = Downloader::TEMPORARY_PATH @timeout = 30 @cluster_name = 'elasticsearch_test' @pids = [] @pids_lock = Mutex.new end
Public Instance Methods
Used for persistent clusters, otherwise cluster won’t get green state because of missing replicas
# File lib/elasticsearch/embedded/cluster.rb, line 102 def apply_development_template! development_settings = { template: '*', settings: { number_of_shards: 1, number_of_replicas: 0, } } # Create the template on cluster http_object.put('/_template/development_template', JSON.dump(development_settings)) end
Remove all indices in the cluster
@return [Array<Net::HTTPResponse>] raw http responses
# File lib/elasticsearch/embedded/cluster.rb, line 89 def delete_all_indices! delete_index! :_all end
Remove the indices given as args
@param [Array<String,Symbol>] args list of indices to delet @return [Array<Net::HTTPResponse>] raw http responses
# File lib/elasticsearch/embedded/cluster.rb, line 97 def delete_index!(*args) args.map { |index| http_object.request(Net::HTTP::Delete.new("/#{index}")) } end
Start server unless it’s running
# File lib/elasticsearch/embedded/cluster.rb, line 73 def ensure_started! start unless running? end
Thread safe access to all spawned process pids
# File lib/elasticsearch/embedded/cluster.rb, line 68 def pids @pids_lock.synchronize { @pids } end
Returns true when started cluster is running
@return Boolean
# File lib/elasticsearch/embedded/cluster.rb, line 80 def running? cluster_health = Timeout::timeout(0.25) { __get_cluster_health } rescue nil # Response is present, cluster name is the same and number of nodes is the same !!cluster_health && cluster_health['cluster_name'] == cluster_name && cluster_health['number_of_nodes'] == nodes end
Start an elasticsearch cluster and return immediately
# File lib/elasticsearch/embedded/cluster.rb, line 34 def start @downloader = Downloader.download(version: version, path: working_dir) start_cluster apply_development_template! if persistent end
Start an elasticsearch cluster and wait until running, also register a signal handler to close the cluster on INT, TERM and QUIT signals
# File lib/elasticsearch/embedded/cluster.rb, line 42 def start_and_wait! # register handler before starting cluster register_shutdown_handler # start the cluster start # Wait for all child processes to end then return Process.waitall end
Stop the cluster and return after all child processes are dead
# File lib/elasticsearch/embedded/cluster.rb, line 52 def stop logger.warn 'Cluster is still starting, wait until startup is complete before sending shutdown command' if @pids_lock.locked? @pids_lock.synchronize do http_object.post('/_shutdown', nil) logger.debug 'Cluster stopped succesfully using shutdown api' Timeout.timeout(2) { Process.waitall } # Reset running pids reader @pids = [] end rescue logger.warn "Following processes are still alive #{pids}, kill them with signals" # Send term signal if post request fails to all processes still alive after 2 seconds pids.each { |pid| wait_or_kill(pid) } end
Private Instance Methods
Tries to load cluster health information
@api private
# File lib/elasticsearch/embedded/cluster.rb, line 296 def __get_cluster_health JSON.parse(http_object.get('/_cluster/health').body) rescue nil end
Print information about the cluster on STDOUT
@api private
# File lib/elasticsearch/embedded/cluster.rb, line 276 def __print_cluster_info health = JSON.parse(http_object.get('/_cluster/health').body) nodes = JSON.parse(http_object.get('/_nodes/process,http').body) master = JSON.parse(http_object.get('/_cluster/state').body)['master_node'] logger.info '-'*80 logger.info 'Cluster: '.ljust(12) + health['cluster_name'].to_s logger.info 'Status: '.ljust(12) + health['status'].to_s logger.info 'Nodes: '.ljust(12) + health['number_of_nodes'].to_s nodes['nodes'].each do |id, info| m = id == master ? '+' : '-' logger.info ''.ljust(12) + "#{m} #{info['name']} | version: #{info['version']}, pid: #{info['process']['id']}, address: #{info['http']['bound_address']}" end end
Blocks the process and waits for the cluster to be in a “green” state.
Prints information about the cluster on STDOUT if the cluster is available.
@param status [String] The status to wait for (yellow, green) @param timeout [Integer] The explicit timeout for the operation
@api private
@return Boolean
# File lib/elasticsearch/embedded/cluster.rb, line 254 def __wait_for_status(status='green', timeout = 30) Timeout::timeout(timeout) do loop do response = JSON.parse(http_object.get("/_cluster/health?wait_for_status=#{status}").body) rescue {} # check response and return if ok if response['status'] == status && nodes == response['number_of_nodes'].to_i __print_cluster_info and break end logger.debug "Still waiting for #{status} status in #{cluster_name}" sleep 1 end end true end
Build command line to launch an instance
# File lib/elasticsearch/embedded/cluster.rb, line 117 def build_command_line(instance_number) [ downloader.executable, '-D es.foreground=yes', "-D es.cluster.name=#{cluster_name}", "-D es.node.name=node-#{instance_number}", "-D es.http.port=#{port + (instance_number - 1)}", "-D es.gateway.type=#{cluster_options[:gateway_type]}", "-D es.index.store.type=#{cluster_options[:index_store]}", "-D es.path.data=#{cluster_options[:path_data]}-#{instance_number}", "-D es.path.work=#{cluster_options[:path_work]}-#{instance_number}", '-D es.network.host=0.0.0.0', '-D es.discovery.zen.ping.multicast.enabled=true', '-D es.script.disable_dynamic=false', '-D es.node.test=true', '-D es.node.bench=true', additional_options, verbose ? nil : '> /dev/null' ].compact.join(' ') end
Used as arguments for building command line to launch elasticsearch
# File lib/elasticsearch/embedded/cluster.rb, line 197 def cluster_options { port: port, nodes: nodes, cluster_name: cluster_name, timeout: timeout, # command to run is taken from downloader object command: downloader.executable, # persistency options gateway_type: persistent ? 'local' : 'none', index_store: persistent ? 'mmapfs' : 'memory', path_data: File.join(persistent ? downloader.working_dir : Dir.tmpdir, 'cluster_data'), path_work: File.join(persistent ? downloader.working_dir : Dir.tmpdir, 'cluster_workdir'), } end
Return an http object to make requests
# File lib/elasticsearch/embedded/cluster.rb, line 214 def http_object @http ||= Net::HTTP.new('localhost', port) end
Spawn an elasticsearch process and return its pid
# File lib/elasticsearch/embedded/cluster.rb, line 139 def launch_instance(instance_number = 1) # Start the process within a new process group to avoid signal propagation Process.spawn(build_command_line(instance_number), pgroup: true).tap do |pid| logger.debug "Launched elasticsearch process with pid #{pid}, detaching it" Process.detach pid end end
Return running instances pids, borrowed from code in Elasticsearch::Extensions::Test::Cluster. This method returns elasticsearch nodes pids and not spawned command pids, they are different because of elasticsearch shell wrapper used to launch the daemon
# File lib/elasticsearch/embedded/cluster.rb, line 150 def nodes_pids # Try to fetch node info from running cluster nodes = JSON.parse(http_object.get('/_nodes/?process').body) rescue [] # Fetch pids from returned data nodes.empty? ? nodes : nodes['nodes'].map { |_, info| info['process']['id'] } end
Register a shutdown proc which handles INT, TERM and QUIT signals
# File lib/elasticsearch/embedded/cluster.rb, line 219 def register_shutdown_handler stopper = ->(sig) do Thread.new do logger.info "Received SIG#{Signal.signame(sig)}, quitting" stop end end # Stop cluster on Ctrl+C, TERM (foreman) or QUIT (other) [:TERM, :INT, :QUIT].each { |sig| Signal.trap(sig, &stopper) } end
# File lib/elasticsearch/embedded/cluster.rb, line 157 def start_cluster logger.info "Starting ES #{version} cluster with working directory set to #{working_dir}. Process pid is #{$$}" if running? logger.warn "Elasticsearch cluster already running on port #{port}" wait_for_green(timeout) return end # Launch single node instances of elasticsearch with synchronization @pids_lock.synchronize do 1.upto(nodes).each do |i| @pids << launch_instance(i) end # Wait for cluster green state before releasing lock wait_for_green(timeout) # Add started nodes pids to pid array @pids.concat(nodes_pids) end end
Waits until the cluster is green and prints information
@example Print the information about the default cluster
Elasticsearch::Extensions::Test::Cluster.wait_for_green
@param (see #__wait_for_status)
@return Boolean
# File lib/elasticsearch/embedded/cluster.rb, line 239 def wait_for_green(timeout = 60) __wait_for_status('green', timeout) end
# File lib/elasticsearch/embedded/cluster.rb, line 176 def wait_or_kill(pid) begin Timeout::timeout(2) do Process.kill(:TERM, pid) logger.debug "Sent SIGTERM to process #{pid}" Process.waitpid(pid) logger.info "Process #{pid} exited successfully" end rescue Errno::ESRCH, Errno::ECHILD # No such process or no child => process is already dead logger.debug "Process with pid #{pid} is already dead" rescue Timeout::Error logger.info "Process #{pid} still running after 2 seconds, sending SIGKILL to it" Process.kill(:KILL, pid) rescue nil ensure logger.debug "Removing #{pid} from running pids" @pids_lock.synchronize { @pids.delete(pid) } end end