module Elasticsearch::Extensions::Test::Cluster

A convenience Ruby class for starting and stopping a separate testing in-memory cluster, to not depend on – and not mess up – <localhost:9200>.

@example Start a cluster with default configuration

require 'elasticsearch/extensions/test/cluster'
Elasticsearch::Extensions::Test::Cluster.start

@see #start #start @see #stop #stop

Public Instance Methods

__get_cluster_health(port=9250) click to toggle source

Tries to load cluster health information

@api private

# File lib/elasticsearch/extensions/test/cluster.rb, line 272
def __get_cluster_health(port=9250)
  uri = URI("http://localhost:#{port}/_cluster/health")
  if response = Net::HTTP.get(uri) rescue nil
    return JSON.parse(response)
  end
end
__print_cluster_info(port) click to toggle source

Print information about the cluster on STDOUT

@api private

# File lib/elasticsearch/extensions/test/cluster.rb, line 246
def __print_cluster_info(port)
  health = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_cluster/health")))
  nodes  = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_nodes/process,http")))
  master = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_cluster/state")))['master_node']

  puts "\n",
        ('-'*80).ansi(:faint),
       'Cluster: '.ljust(20).ansi(:faint) + health['cluster_name'].to_s.ansi(:faint),
       'Status:  '.ljust(20).ansi(:faint) + health['status'].to_s.ansi(:faint),
       'Nodes:   '.ljust(20).ansi(:faint) + health['number_of_nodes'].to_s.ansi(:faint)

  nodes['nodes'].each do |id, info|
    m = id == master ? '*' : '+'
    puts ''.ljust(20) +
         "#{m} ".ansi(:faint) +
         "#{info['name'].ansi(:bold)} ".ansi(:faint) +
         "| version: #{info['version'] rescue 'N/A'}, ".ansi(:faint) +
         "pid: #{info['process']['id'] rescue 'N/A'}, ".ansi(:faint) +
         "address: #{info['http']['bound_address'] rescue 'N/A'}".ansi(:faint)
  end
end
__wait_for_status(status='green', port=9250, timeout=30) click to toggle source

Blocks the process and waits for the cluster to be in a “green” state.

Prints information about the cluster on STDOUT if the cluster is available.

@param status [String] The status to wait for (yellow, green) @param port [Integer] The port on which the cluster is reachable @param timeout [Integer] The explicit timeout for the operation

@api private

@return Boolean

# File lib/elasticsearch/extensions/test/cluster.rb, line 216
def __wait_for_status(status='green', port=9250, timeout=30)
  uri = URI("http://localhost:#{port}/_cluster/health?wait_for_status=#{status}")

  Timeout::timeout(timeout) do
    loop do
      response = begin
        JSON.parse(Net::HTTP.get(uri))
      rescue Exception => e
        puts e.inspect if ENV['DEBUG']
        nil
      end

      puts response.inspect if ENV['DEBUG']

      if response && response['status'] == status && ( @@number_of_nodes.nil? || @@number_of_nodes == response['number_of_nodes'].to_i  )
        __print_cluster_info(port) and break
      end

      print '.'.ansi(:faint)
      sleep 1
    end
  end

  return true
end
running?(arguments={}) click to toggle source

Returns true when a specific test node is running within the cluster.

@option arguments [Integer] :on The port on which the node is running. @option arguments [String] :as The cluster name.

@return Boolean

# File lib/elasticsearch/extensions/test/cluster.rb, line 180
def running?(arguments={})
  port         = arguments[:on] || (ENV['TEST_CLUSTER_PORT'] || 9250).to_i
  cluster_name = arguments[:as] ||  ENV['TEST_CLUSTER_NAME'] || 'elasticsearch_test'

  if cluster_health = Timeout::timeout(0.25) { __get_cluster_health(port) } rescue nil
    return cluster_health['cluster_name']    == cluster_name &&                     cluster_health['number_of_nodes'] == @@number_of_nodes
  end
  return false
end
start(arguments={}) click to toggle source

Starts a cluster

Launches the specified number of nodes in test-suitable configuration by default and prints information about the cluster – unless this specific cluster is running already.

Use the {Cluster#stop #stop} command with the same arguments to stop this cluster.

@option arguments [String] :command Elasticsearch command (default: `elasticsearch`). @option arguments [Integer] :nodes Number of desired nodes (default: 2). @option arguments [String] :cluster_name Cluster name (default: `elasticsearch_test`). @option arguments [String] :port Starting port number; will be auto-incremented (default: 9250). @option arguments [Integer] :timeout Timeout when starting the cluster (default: 30).

You can also use environment variables to set these options.

@example Start a cluster with default configuration (2 nodes, in-memory, etc)

Elasticsearch::Extensions::Test::Cluster.start

@example Start a cluster with a custom configuration

Elasticsearch::Extensions::Test::Cluster.start \
  cluster_name: 'my-cluster',
  nodes: 3,
  node_name: 'my-node',
  port: 9350

@example Start a cluster with a different Elasticsearch version

Elasticsearch::Extensions::Test::Cluster.start \
  command: "/usr/local/Cellar/elasticsearch/1.0.0.Beta2/bin/elasticsearch"

@return Boolean @see #stop #stop

# File lib/elasticsearch/extensions/test/cluster.rb, line 67
def start(arguments={})
  @@number_of_nodes = (ENV['TEST_CLUSTER_NODES'] || arguments[:nodes] || 2).to_i

  arguments[:command]      ||= ENV['TEST_CLUSTER_COMMAND'] || 'elasticsearch'
  arguments[:port]         ||= (ENV['TEST_CLUSTER_PORT'] || 9250).to_i
  arguments[:cluster_name] ||= ENV['TEST_CLUSTER_NAME'] || 'elasticsearch_test'
  arguments[:gateway_type] ||= 'none'
  arguments[:index_store]  ||= 'memory'
  arguments[:path_data]    ||= ENV['TEST_CLUSTER_DATA'] || '/tmp'
  arguments[:es_params]    ||= ENV['TEST_CLUSTER_PARAMS'] || ''
  arguments[:path_work]    ||= '/tmp'
  arguments[:node_name]    ||= 'node'
  arguments[:timeout]      ||= (ENV['TEST_CLUSTER_TIMEOUT'] || 30).to_i

  if running? :on => arguments[:port], :as => arguments[:cluster_name]
    print "[!] Elasticsearch cluster already running".ansi(:red)
    wait_for_green(arguments[:port], arguments[:timeout])
    return false
  end

  print "Starting ".ansi(:faint) +
        @@number_of_nodes.to_s.ansi(:bold, :faint) +
        " Elasticsearch nodes..".ansi(:faint)

  pids = []

  @@number_of_nodes.times do |n|
    n += 1
    pid = Process.spawn <<-COMMAND
      #{arguments[:command]} \
        -D es.foreground=yes \
        -D es.cluster.name=#{arguments[:cluster_name]} \
        -D es.node.name=#{arguments[:node_name]}-#{n} \
        -D es.http.port=#{arguments[:port].to_i + (n-1)} \
        -D es.gateway.type=#{arguments[:gateway_type]} \
        -D es.index.store.type=#{arguments[:index_store]} \
        -D es.path.data=#{arguments[:path_data]} \
        -D es.path.work=#{arguments[:path_work]} \
        -D es.cluster.routing.allocation.disk.threshold_enabled=false \
        -D es.network.host=0.0.0.0 \
        -D es.discovery.zen.ping.multicast.enabled=true \
        -D es.script.disable_dynamic=false \
        -D es.node.test=true \
        -D es.node.bench=true \
        -D es.logger.level=DEBUG \
        #{arguments[:es_params]} \
        > /dev/null
    COMMAND
    Process.detach pid
    pids << pid
  end

  # Check for proceses running
  if %xps -p #{pids.join(' ')}`.split("\n").size < @@number_of_nodes+1
    STDERR.puts "", "[!!!] Process failed to start (see output above)".ansi(:red)
    exit(1)
  end

  wait_for_green(arguments[:port], arguments[:timeout])
  return true
end
stop(arguments={}) click to toggle source

Stop the cluster.

Fetches the PID numbers from “Nodes Info” API and terminates matching nodes.

@example Stop the default cluster

Elasticsearch::Extensions::Test::Cluster.stop

@example Stop the cluster reachable on specific port

Elasticsearch::Extensions::Test::Cluster.stop port: 9350

@return Boolean @see #start #start

# File lib/elasticsearch/extensions/test/cluster.rb, line 142
def stop(arguments={})
  arguments[:port] ||= (ENV['TEST_CLUSTER_PORT'] || 9250).to_i

  nodes = begin
    JSON.parse(Net::HTTP.get(URI("http://localhost:#{arguments[:port]}/_nodes/?process")))
  rescue Exception => e
    STDERR.puts "[!] Exception raised when stopping the cluster: #{e.inspect}".ansi(:red)
    nil
  end

  return false if nodes.nil? or nodes.empty?

  pids  = nodes['nodes'].map { |id, info| info['process']['id'] }

  unless pids.empty?
    print "Stopping Elasticsearch nodes... ".ansi(:faint)
    pids.each_with_index do |pid, i|
      begin
        print "stopped PID #{pid}. ".ansi(:green) if Process.kill 'KILL', pid
      rescue Exception => e
        print "[#{e.class}] PID #{pid} not found. ".ansi(:red)
      end
    end
    puts
  else
    false
  end

  return pids
end
wait_for_green(port=9250, timeout=60) click to toggle source

Waits until the cluster is green and prints information

@example Print the information about the default cluster

Elasticsearch::Extensions::Test::Cluster.wait_for_green

@param (see #__wait_for_status)

@return Boolean

# File lib/elasticsearch/extensions/test/cluster.rb, line 200
def wait_for_green(port=9250, timeout=60)
  __wait_for_status('green', port, timeout)
end