class Elasticsearch::Embedded::Cluster

Class used to manage a local cluster of elasticsearch nodes

Attributes

additional_options[RW]

Options for cluster

cluster_name[RW]

Options for cluster

downloader[RW]

Options for downloader

nodes[RW]

Options for cluster

persistent[RW]

Options for cluster

port[RW]

Options for cluster

timeout[RW]

Options for cluster

verbose[RW]

Options for cluster

version[RW]

Options for downloader

working_dir[RW]

Options for downloader

Public Class Methods

new() click to toggle source

Assign default values to options

# File lib/elasticsearch/embedded/cluster.rb, line 22
def initialize
  @nodes = 1
  @port = 9250
  @version = Downloader::DEFAULT_VERSION
  @working_dir = Downloader::TEMPORARY_PATH
  @timeout = 30
  @cluster_name = 'elasticsearch_test'
  @pids = []
  @pids_lock = Mutex.new
end

Public Instance Methods

apply_development_template!() click to toggle source

Used for persistent clusters, otherwise cluster won’t get green state because of missing replicas

# File lib/elasticsearch/embedded/cluster.rb, line 102
def apply_development_template!
  development_settings = {
      template: '*',
      settings: {
          number_of_shards: 1,
          number_of_replicas: 0,
      }
  }
  # Create the template on cluster
  http_object.put('/_template/development_template', JSON.dump(development_settings))
end
delete_all_indices!() click to toggle source

Remove all indices in the cluster

@return [Array<Net::HTTPResponse>] raw http responses

# File lib/elasticsearch/embedded/cluster.rb, line 89
def delete_all_indices!
  delete_index! :_all
end
delete_index!(*args) click to toggle source

Remove the indices given as args

@param [Array<String,Symbol>] args list of indices to delet @return [Array<Net::HTTPResponse>] raw http responses

# File lib/elasticsearch/embedded/cluster.rb, line 97
def delete_index!(*args)
  args.map { |index| http_object.request(Net::HTTP::Delete.new("/#{index}")) }
end
ensure_started!() click to toggle source

Start server unless it’s running

# File lib/elasticsearch/embedded/cluster.rb, line 73
def ensure_started!
  start unless running?
end
pids() click to toggle source

Thread safe access to all spawned process pids

# File lib/elasticsearch/embedded/cluster.rb, line 68
def pids
  @pids_lock.synchronize { @pids }
end
running?() click to toggle source

Returns true when started cluster is running

@return Boolean

# File lib/elasticsearch/embedded/cluster.rb, line 80
def running?
  cluster_health = Timeout::timeout(0.25) { __get_cluster_health } rescue nil
  # Response is present, cluster name is the same and number of nodes is the same
  !!cluster_health && cluster_health['cluster_name'] == cluster_name && cluster_health['number_of_nodes'] == nodes
end
start() click to toggle source

Start an elasticsearch cluster and return immediately

# File lib/elasticsearch/embedded/cluster.rb, line 34
def start
  @downloader = Downloader.download(version: version, path: working_dir)
  start_cluster
  apply_development_template! if persistent
end
start_and_wait!() click to toggle source

Start an elasticsearch cluster and wait until running, also register a signal handler to close the cluster on INT, TERM and QUIT signals

# File lib/elasticsearch/embedded/cluster.rb, line 42
def start_and_wait!
  # register handler before starting cluster
  register_shutdown_handler
  # start the cluster
  start
  # Wait for all child processes to end then return
  Process.waitall
end
stop() click to toggle source

Stop the cluster and return after all child processes are dead

# File lib/elasticsearch/embedded/cluster.rb, line 52
def stop
  logger.warn 'Cluster is still starting, wait until startup is complete before sending shutdown command' if @pids_lock.locked?
  @pids_lock.synchronize do
    http_object.post('/_shutdown', nil)
    logger.debug 'Cluster stopped succesfully using shutdown api'
    Timeout.timeout(2) { Process.waitall }
    # Reset running pids reader
    @pids = []
  end
rescue
  logger.warn "Following processes are still alive #{pids}, kill them with signals"
  # Send term signal if post request fails to all processes still alive after 2 seconds
  pids.each { |pid| wait_or_kill(pid) }
end

Private Instance Methods

__get_cluster_health() click to toggle source

Tries to load cluster health information

@api private

# File lib/elasticsearch/embedded/cluster.rb, line 296
def __get_cluster_health
  JSON.parse(http_object.get('/_cluster/health').body) rescue nil
end
__print_cluster_info() click to toggle source

Print information about the cluster on STDOUT

@api private

# File lib/elasticsearch/embedded/cluster.rb, line 276
def __print_cluster_info
  health = JSON.parse(http_object.get('/_cluster/health').body)
  nodes = JSON.parse(http_object.get('/_nodes/process,http').body)
  master = JSON.parse(http_object.get('/_cluster/state').body)['master_node']

  logger.info '-'*80
  logger.info 'Cluster: '.ljust(12) + health['cluster_name'].to_s
  logger.info 'Status:  '.ljust(12) + health['status'].to_s
  logger.info 'Nodes:   '.ljust(12) + health['number_of_nodes'].to_s

  nodes['nodes'].each do |id, info|
    m = id == master ? '+' : '-'
    logger.info ''.ljust(12) + "#{m} #{info['name']} | version: #{info['version']}, pid: #{info['process']['id']}, address: #{info['http']['bound_address']}"
  end
end
__wait_for_status(status='green', timeout = 30) click to toggle source

Blocks the process and waits for the cluster to be in a “green” state.

Prints information about the cluster on STDOUT if the cluster is available.

@param status [String] The status to wait for (yellow, green) @param timeout [Integer] The explicit timeout for the operation

@api private

@return Boolean

# File lib/elasticsearch/embedded/cluster.rb, line 254
def __wait_for_status(status='green', timeout = 30)
  Timeout::timeout(timeout) do
    loop do
      response = JSON.parse(http_object.get("/_cluster/health?wait_for_status=#{status}").body) rescue {}

      # check response and return if ok
      if response['status'] == status && nodes == response['number_of_nodes'].to_i
        __print_cluster_info and break
      end

      logger.debug "Still waiting for #{status} status in #{cluster_name}"
      sleep 1
    end
  end

  true
end
build_command_line(instance_number) click to toggle source

Build command line to launch an instance

# File lib/elasticsearch/embedded/cluster.rb, line 117
def build_command_line(instance_number)
  [
      downloader.executable,
      '-D es.foreground=yes',
      "-D es.cluster.name=#{cluster_name}",
      "-D es.node.name=node-#{instance_number}",
      "-D es.http.port=#{port + (instance_number - 1)}",
      "-D es.gateway.type=#{cluster_options[:gateway_type]}",
      "-D es.index.store.type=#{cluster_options[:index_store]}",
      "-D es.path.data=#{cluster_options[:path_data]}-#{instance_number}",
      "-D es.path.work=#{cluster_options[:path_work]}-#{instance_number}",
      '-D es.network.host=0.0.0.0',
      '-D es.discovery.zen.ping.multicast.enabled=true',
      '-D es.script.disable_dynamic=false',
      '-D es.node.test=true',
      '-D es.node.bench=true',
      additional_options,
      verbose ? nil : '> /dev/null'
  ].compact.join(' ')
end
cluster_options() click to toggle source

Used as arguments for building command line to launch elasticsearch

# File lib/elasticsearch/embedded/cluster.rb, line 197
def cluster_options
  {
      port: port,
      nodes: nodes,
      cluster_name: cluster_name,
      timeout: timeout,
      # command to run is taken from downloader object
      command: downloader.executable,
      # persistency options
      gateway_type: persistent ? 'local' : 'none',
      index_store: persistent ? 'mmapfs' : 'memory',
      path_data: File.join(persistent ? downloader.working_dir : Dir.tmpdir, 'cluster_data'),
      path_work: File.join(persistent ? downloader.working_dir : Dir.tmpdir, 'cluster_workdir'),
  }
end
http_object() click to toggle source

Return an http object to make requests

# File lib/elasticsearch/embedded/cluster.rb, line 214
def http_object
  @http ||= Net::HTTP.new('localhost', port)
end
launch_instance(instance_number = 1) click to toggle source

Spawn an elasticsearch process and return its pid

# File lib/elasticsearch/embedded/cluster.rb, line 139
def launch_instance(instance_number = 1)
  # Start the process within a new process group to avoid signal propagation
  Process.spawn(build_command_line(instance_number), pgroup: true).tap do |pid|
    logger.debug "Launched elasticsearch process with pid #{pid}, detaching it"
    Process.detach pid
  end
end
nodes_pids() click to toggle source

Return running instances pids, borrowed from code in Elasticsearch::Extensions::Test::Cluster. This method returns elasticsearch nodes pids and not spawned command pids, they are different because of elasticsearch shell wrapper used to launch the daemon

# File lib/elasticsearch/embedded/cluster.rb, line 150
def nodes_pids
  # Try to fetch node info from running cluster
  nodes = JSON.parse(http_object.get('/_nodes/?process').body) rescue []
  # Fetch pids from returned data
  nodes.empty? ? nodes : nodes['nodes'].map { |_, info| info['process']['id'] }
end
register_shutdown_handler() click to toggle source

Register a shutdown proc which handles INT, TERM and QUIT signals

# File lib/elasticsearch/embedded/cluster.rb, line 219
def register_shutdown_handler
  stopper = ->(sig) do
    Thread.new do
      logger.info "Received SIG#{Signal.signame(sig)}, quitting"
      stop
    end
  end
  # Stop cluster on Ctrl+C, TERM (foreman) or QUIT (other)
  [:TERM, :INT, :QUIT].each { |sig| Signal.trap(sig, &stopper) }
end
start_cluster() click to toggle source
# File lib/elasticsearch/embedded/cluster.rb, line 157
def start_cluster
  logger.info "Starting ES #{version} cluster with working directory set to #{working_dir}. Process pid is #{$$}"
  if running?
    logger.warn "Elasticsearch cluster already running on port #{port}"
    wait_for_green(timeout)
    return
  end
  # Launch single node instances of elasticsearch with synchronization
  @pids_lock.synchronize do
    1.upto(nodes).each do |i|
      @pids << launch_instance(i)
    end
    # Wait for cluster green state before releasing lock
    wait_for_green(timeout)
    # Add started nodes pids to pid array
    @pids.concat(nodes_pids)
  end
end
wait_for_green(timeout = 60) click to toggle source

Waits until the cluster is green and prints information

@example Print the information about the default cluster

Elasticsearch::Extensions::Test::Cluster.wait_for_green

@param (see #__wait_for_status)

@return Boolean

# File lib/elasticsearch/embedded/cluster.rb, line 239
def wait_for_green(timeout = 60)
  __wait_for_status('green', timeout)
end
wait_or_kill(pid) click to toggle source
# File lib/elasticsearch/embedded/cluster.rb, line 176
def wait_or_kill(pid)
  begin
    Timeout::timeout(2) do
      Process.kill(:TERM, pid)
      logger.debug "Sent SIGTERM to process #{pid}"
      Process.waitpid(pid)
      logger.info "Process #{pid} exited successfully"
    end
  rescue Errno::ESRCH, Errno::ECHILD
    # No such process or no child => process is already dead
    logger.debug "Process with pid #{pid} is already dead"
  rescue Timeout::Error
    logger.info "Process #{pid} still running after 2 seconds, sending SIGKILL to it"
    Process.kill(:KILL, pid) rescue nil
  ensure
    logger.debug "Removing #{pid} from running pids"
    @pids_lock.synchronize { @pids.delete(pid) }
  end
end