class Elasticsearch::Extensions::Test::Cluster::Cluster

Constants

COMMANDS

Attributes

arguments[R]

Public Class Methods

new(arguments={}) click to toggle source

Create a new instance of the Cluster class

@option arguments [String] :cluster_name Cluster name (default: `elasticsearch_test`) @option arguments [Integer] :number_of_nodes Number of desired nodes (default: 2) @option arguments [String] :command Elasticsearch command (default: `elasticsearch`) @option arguments [String] :port Starting port number; will be auto-incremented (default: 9250) @option arguments [String] :node_name The node name (will be appended with a number) @option arguments [String] :path_data Path to the directory to store data in @option arguments [String] :path_work Path to the directory with auxiliary files @option arguments [String] :path_logs Path to the directory with log files @option arguments [Boolean] :multicast_enabled Whether multicast is enabled (default: true) @option arguments [Integer] :timeout Timeout when starting the cluster (default: 60) @option arguments [Integer] :timeout_version Timeout when waiting for `elasticsearch –version` (default: 15) @option arguments [String] :network_host The host that nodes will bind on and publish to @option arguments [Boolean] :clear_cluster Wipe out cluster content on startup (default: true) @option arguments [Boolean] :quiet Disable printing to STDERR (default: false)

You can also use environment variables to set the constructor options (see source).

@see Cluster#start

# File lib/elasticsearch/extensions/test/cluster.rb, line 240
def initialize(arguments={})
  @arguments = arguments.dup

  @arguments[:command]           ||= ENV.fetch('TEST_CLUSTER_COMMAND',   'elasticsearch')
  @arguments[:port]              ||= ENV.fetch('TEST_CLUSTER_PORT',      9250).to_i
  @arguments[:cluster_name]      ||= ENV.fetch('TEST_CLUSTER_NAME',      __default_cluster_name).chomp
  @arguments[:node_name]         ||= ENV.fetch('TEST_CLUSTER_NODE_NAME', 'node')
  @arguments[:path_data]         ||= ENV.fetch('TEST_CLUSTER_DATA',      '/tmp/elasticsearch_test')
  @arguments[:path_work]         ||= ENV.fetch('TEST_CLUSTER_TMP',       '/tmp')
  @arguments[:path_logs]         ||= ENV.fetch('TEST_CLUSTER_LOGS',      '/tmp/log/elasticsearch')
  @arguments[:es_params]         ||= ENV.fetch('TEST_CLUSTER_PARAMS',    '')
  @arguments[:multicast_enabled] ||= ENV.fetch('TEST_CLUSTER_MULTICAST', 'true')
  @arguments[:timeout]           ||= ENV.fetch('TEST_CLUSTER_TIMEOUT',   60).to_i
  @arguments[:timeout_version]   ||= ENV.fetch('TEST_CLUSTER_TIMEOUT_VERSION', 15).to_i
  @arguments[:number_of_nodes]   ||= ENV.fetch('TEST_CLUSTER_NODES',     2).to_i
  @arguments[:network_host]      ||= ENV.fetch('TEST_CLUSTER_NETWORK_HOST', __default_network_host)
  @arguments[:quiet]             ||= ! ENV.fetch('QUIET', '').empty?

  @clear_cluster = if @arguments[:clear_cluster].nil?
                     (ENV.fetch('TEST_CLUSTER_CLEAR', 'true') != 'false')
                   else
                                 !!@arguments[:clear_cluster]
                   end

  # Make sure `cluster_name` is not dangerous
  raise ArgumentError, "The `cluster_name` argument cannot be empty string or a slash" \
    if @arguments[:cluster_name] =~ /^[\/\\]?$/
end

Public Instance Methods

__check_for_running_processes(pids) click to toggle source

Check whether process for PIDs are running

@api private

# File lib/elasticsearch/extensions/test/cluster.rb, line 677
def __check_for_running_processes(pids)
  if `ps -p #{pids.join(' ')}`.split("\n").size < arguments[:number_of_nodes]+1
    __log "\n[!!!] Process failed to start (see output above)".ansi(:red)
    exit(1)
  end
end
__cluster_info() click to toggle source

Return information about the cluster

@api private

@return String

# File lib/elasticsearch/extensions/test/cluster.rb, line 616
def __cluster_info
  health = JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_cluster/health")))
  nodes  = if version == '0.90'
    JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_nodes/?process&http")))
  else
    JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_nodes/process,http")))
  end
  master = JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_cluster/state")))['master_node']

  result = ["\n",
            ('-'*80).ansi(:faint),
            'Cluster: '.ljust(20).ansi(:faint) + health['cluster_name'].to_s.ansi(:faint),
            'Status:  '.ljust(20).ansi(:faint) + health['status'].to_s.ansi(:faint),
            'Nodes:   '.ljust(20).ansi(:faint) + health['number_of_nodes'].to_s.ansi(:faint)].join("\n")

  nodes['nodes'].each do |id, info|
    m = id == master ? '*' : '-'
    result << "\n" +
              ''.ljust(20) +
              "#{m} ".ansi(:faint) +
              "#{info['name'].ansi(:bold)} ".ansi(:faint) +
              "| version: #{info['version'] rescue 'N/A'}, ".ansi(:faint) +
              "pid: #{info['process']['id'] rescue 'N/A'}, ".ansi(:faint) +
              "address: #{info['http']['bound_address'] rescue 'N/A'}".ansi(:faint)
  end

  result
end
__cluster_url() click to toggle source

Returns the HTTP URL for the cluster based on `:network_host` setting

@api private

@return String

# File lib/elasticsearch/extensions/test/cluster.rb, line 455
def __cluster_url
  if '_local_' == arguments[:network_host]
    "http://localhost:#{arguments[:port]}"
  else
    "http://#{arguments[:network_host]}:#{arguments[:port]}"
  end
end
__command(version, arguments, node_number) click to toggle source

Returns the launch command for a specific version

@api private

@return String

# File lib/elasticsearch/extensions/test/cluster.rb, line 567
def __command(version, arguments, node_number)
  raise ArgumentError, "Cannot find command for version [#{version}]" unless (command = COMMANDS[version])

  arguments.merge!({ dist: @dist })
  command.call(arguments, node_number)
end
__default_cluster_name() click to toggle source

Returns a reasonably unique cluster name

@api private

@return String

# File lib/elasticsearch/extensions/test/cluster.rb, line 445
def __default_cluster_name
  "elasticsearch-test-#{Socket.gethostname.downcase}"
end
__default_network_host() click to toggle source

Returns default `:network_host` setting based on the version

@api private

@return String

# File lib/elasticsearch/extensions/test/cluster.rb, line 426
def __default_network_host
  case version
    when /^0|^1/
      '0.0.0.0'
    when /^2/
      '_local_'
    when /^5|^6|^7|^8/
      '_local_'
    else
      raise RuntimeError, "Cannot determine default network host from version [#{version}]"
  end
end
__determine_version() click to toggle source

Determine Elasticsearch version to be launched

Tries to get the version from the arguments passed, if not available, it parses the version number from the `lib/elasticsearch-X.Y.Z.jar` file, if that is not available, uses `elasticsearch –version` or `elasticsearch -v`

@api private

@return String

# File lib/elasticsearch/extensions/test/cluster.rb, line 473
def __determine_version
  path_to_lib = File.dirname(arguments[:command]) + '/../lib/'
  version = if arguments[:version]
    arguments[:version]
  elsif File.exist?(path_to_lib) && !(jar = Dir.entries(path_to_lib).select { |f| f =~ /^elasticsearch\-\d/ }.first).nil?
    __log "Determining version from [#{jar}]" if ENV['DEBUG']
    if m = jar.match(/elasticsearch\-(\d+\.\d+\.\d+).*/)
      m[1]
    else
      raise RuntimeError, "Cannot determine Elasticsearch version from jar [#{jar}]"
    end
  else
    __log "[!] Cannot find Elasticsearch .jar from path to command [#{arguments[:command]}], using `#{arguments[:command]} --version`" if ENV['DEBUG']

    unless File.exist? arguments[:command]
      __log "File [#{arguments[:command]}] does not exists, checking full path by `which`: ", :print if ENV['DEBUG']

      begin
        full_path = `which #{arguments[:command]}`.strip
        __log "#{full_path.inspect}\n", :print if ENV['DEBUG']
      rescue Exception => e
        raise RuntimeError, "Cannot determine full path to [#{arguments[:command]}] with 'which'"
      end

      if full_path.empty?
        raise Errno::ENOENT, "Cannot find Elasticsearch launch script from [#{arguments[:command]}] -- did you pass a correct path?"
      end
    end

    output = ''

    begin
      # First, try the new `--version` syntax...
      __log "Running [#{arguments[:command]} --version] to determine version" if ENV['DEBUG']
      io = IO.popen("#{arguments[:command]} --version")
      pid = io.pid

      Timeout::timeout(arguments[:timeout_version]) do
        Process.wait(pid)
        output = io.read
      end

    rescue Timeout::Error
      # ...else, the old `-v` syntax
      __log "Running [#{arguments[:command]} -v] to determine version" if ENV['DEBUG']
      output = `#{arguments[:command]} -v`
    ensure
      if pid
        Process.kill('INT', pid) rescue Errno::ESRCH # Most likely the process has terminated already
      end
      io.close unless io.closed?
    end

    STDERR.puts "> #{output}" if ENV['DEBUG']

    if output.empty?
      raise RuntimeError, "Cannot determine Elasticsearch version from [#{arguments[:command]} --version] or [#{arguments[:command]} -v]"
    end

    @dist = output.match(/Build: ([a-z]+)\//)&.[](1)

    if(m = output.match(/Version: (\d+\.\d+.\d+).*,/))
      m[1]
    else
      raise RuntimeError, "Cannot determine Elasticsearch version from elasticsearch --version output [#{output}]"
    end
  end

  case version
    when /^0\.90.*/
      '0.90'
    when /^1\..*/
      '1.0'
    when /^2\..*/
      '2.0'
    when /^5\..*/
      '5.0'
    when /^6\..*/
      '6.0'
    when /^7\..*/
      '7.0'
    when /^8\..*/
      '8.0'
    else
      raise RuntimeError, "Cannot determine major version from [#{version}]"
  end
end
__get_cluster_health(status=nil) click to toggle source

Tries to load cluster health information

@api private

@return Hash,Nil

# File lib/elasticsearch/extensions/test/cluster.rb, line 651
def __get_cluster_health(status=nil)
  uri = URI("#{__cluster_url}/_cluster/health")
  uri.query = "wait_for_status=#{status}" if status

  begin
    response = Net::HTTP.get(uri)
  rescue Exception => e
    STDERR.puts e.inspect if ENV['DEBUG']
    return nil
  end

  JSON.parse(response)
end
__get_nodes() click to toggle source

Get the information about nodes

@api private

# File lib/elasticsearch/extensions/test/cluster.rb, line 688
def __get_nodes
  JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_nodes/process")))
end
__log(message, mode=:puts) click to toggle source

Print to STDERR

# File lib/elasticsearch/extensions/test/cluster.rb, line 694
def __log(message, mode=:puts)
 STDERR.__send__ mode, message unless @arguments[:quiet]
end
__remove_cluster_data() click to toggle source

Remove the data directory

@api private

# File lib/elasticsearch/extensions/test/cluster.rb, line 669
def __remove_cluster_data
  FileUtils.rm_rf arguments[:path_data]
end
__wait_for_status(status='green', timeout=30) click to toggle source

Blocks the process and waits for the cluster to be in a “green” state

Prints information about the cluster on STDOUT if the cluster is available.

@param status [String] The status to wait for (yellow, green) @param timeout [Integer] The explicit timeout for the operation

@api private

@return Boolean

# File lib/elasticsearch/extensions/test/cluster.rb, line 585
def __wait_for_status(status='green', timeout=30)
  begin
    Timeout::timeout(timeout) do
      loop do
        response = __get_cluster_health(status)
        __log response if ENV['DEBUG']

        if response && response['status'] == status && ( arguments[:number_of_nodes].nil? || arguments[:number_of_nodes].to_i == response['number_of_nodes'].to_i  )
          break
        end

        __log '.'.ansi(:faint), :print
        sleep 1
      end
    end
  rescue Timeout::Error => e
    message = "\nTimeout while waiting for cluster status [#{status}]"
    message += " and [#{arguments[:number_of_nodes]}] nodes" if arguments[:number_of_nodes]
    __log message.ansi(:red, :bold)
    raise e
  end

  return true
end
running?() click to toggle source

Returns true when a specific test node is running within the cluster

@return Boolean

# File lib/elasticsearch/extensions/test/cluster.rb, line 394
def running?
  if cluster_health = Timeout::timeout(0.25) { __get_cluster_health } rescue nil
    return cluster_health['cluster_name']    == arguments[:cluster_name] && \
           cluster_health['number_of_nodes'] == arguments[:number_of_nodes]
  end
  return false
end
start() click to toggle source

Starts a cluster

Launches the specified number of nodes in a test-suitable configuration and prints information about the cluster – unless this specific cluster is already running.

@example Start a cluster with the default configuration (2 nodes, installed version, etc)

Elasticsearch::Extensions::Test::Cluster::Cluster.new.start

@example Start a cluster with a custom configuration

Elasticsearch::Extensions::Test::Cluster::Cluster.new(
  cluster_name: 'my-cluster',
  number_of_nodes: 3,
  node_name: 'my-node',
  port: 9350
).start

@example Start a cluster with a different Elasticsearch version

Elasticsearch::Extensions::Test::Cluster::Cluster.new(
  command: "/usr/local/Cellar/elasticsearch/1.0.0.Beta2/bin/elasticsearch"
).start

@return Boolean,Array @see Cluster#stop

# File lib/elasticsearch/extensions/test/cluster.rb, line 293
def start
  if self.running?
    __log "[!] Elasticsearch cluster already running".ansi(:red)
    return false
  end

  __remove_cluster_data if @clear_cluster

  __log "Starting ".ansi(:faint) + arguments[:number_of_nodes].to_s.ansi(:bold, :faint) +
        " Elasticsearch #{arguments[:number_of_nodes] < 2 ? 'node' : 'nodes'}..".ansi(:faint), :print

  pids = []

  __log "\nUsing Elasticsearch version [#{version}]" if ENV['DEBUG']

  arguments[:number_of_nodes].times do |n|
    n += 1

    command =  __command(version, arguments, n)
    command += '> /dev/null' unless ENV['DEBUG']

    __log command.gsub(/ {1,}/, ' ').ansi(:bold) if ENV['DEBUG']

    pid = Process.spawn(command)
    Process.detach pid
    pids << pid
    sleep 1
  end

  __check_for_running_processes(pids)
  wait_for_green
  __log __cluster_info

  return true
end
stop() click to toggle source

Stops the cluster

Fetches the PID numbers from “Nodes Info” API and terminates matching nodes.

@example Stop the default cluster

Elasticsearch::Extensions::Test::Cluster::Cluster.new.stop

@example Stop the cluster reachable on specific port

Elasticsearch::Extensions::Test::Cluster::Cluster.new(port: 9350).stop

@return Boolean,Array @see Cluster#start

# File lib/elasticsearch/extensions/test/cluster.rb, line 342
def stop
  begin
    nodes = __get_nodes
  rescue Exception => e
    __log "[!] Exception raised when stopping the cluster: #{e.inspect}".ansi(:red)
    nil
  end

  return false if nodes.nil? or nodes.empty?

  pids  = nodes['nodes'].map { |id, info| info['process']['id'] }

  unless pids.empty?
    __log "Stopping Elasticsearch nodes... ".ansi(:faint), :print

    pids.each_with_index do |pid, i|
      ['INT','KILL'].each do |signal|
        begin
          Process.kill signal, pid
        rescue Exception => e
          __log "[#{e.class}] PID #{pid} not found. ".ansi(:red), :print
        end

        # Give the system some breathing space to finish...
        Kernel.sleep 1

        # Check that pid really is dead
        begin
          Process.getpgid pid
          # `getpgid` will raise error if pid is dead, so if we get here, try next signal
          next
        rescue Errno::ESRCH
          __log "Stopped PID #{pid}".ansi(:green) +
          (ENV['DEBUG'] ? " with #{signal} signal".ansi(:green) : '') +
          ". ".ansi(:green), :print
          break # pid is dead
        end
      end
    end

    __log "\n"
  else
    return false
  end

  return pids
end
version() click to toggle source

Returns the major version of Elasticsearch

@return String @see __determine_version

# File lib/elasticsearch/extensions/test/cluster.rb, line 415
def version
  @version ||= __determine_version
end
wait_for_green() click to toggle source

Waits until the cluster is green and prints information about it

@return Boolean

# File lib/elasticsearch/extensions/test/cluster.rb, line 406
def wait_for_green
  __wait_for_status('green', arguments[:timeout])
end