module OpenSearch::Transport::Transport::Base

@abstract Module with common functionality for transport implementations.

Constants

ACCEPT_ENCODING
CONTENT_TYPE_REGEX
CONTENT_TYPE_STR
DEFAULT_CONTENT_TYPE
DEFAULT_MAX_RETRIES
DEFAULT_PORT
DEFAULT_PROTOCOL
DEFAULT_RELOAD_AFTER
DEFAULT_RESURRECT_AFTER
DEFAULT_SERIALIZER_CLASS
GZIP
GZIP_FIRST_TWO_BYTES
HEX_STRING_DIRECTIVE
RUBY_ENCODING
SANITIZED_PASSWORD
USER_AGENT_REGEX
USER_AGENT_STR

Attributes

connections[R]
counter[R]
hosts[R]
last_request_at[R]
logger[RW]
options[R]
protocol[R]
reload_after[RW]
reload_connections[RW]
resurrect_after[RW]
serializer[RW]
sniffer[RW]
tracer[RW]

Public Class Methods

new(arguments = {}, &block) click to toggle source

Creates a new transport object

@param arguments [Hash] Settings and options for the transport @param block [Proc] Lambda or Proc which can be evaluated in the context of the “session” object

@option arguments [Array] :hosts An Array of normalized hosts information @option arguments [Array] :options A Hash with options (usually passed by {Client})

@see Client#initialize

# File lib/opensearch/transport/transport/base.rb, line 59
def initialize(arguments = {}, &block)
  @state_mutex = Mutex.new

  @hosts       = arguments[:hosts]   || []
  @options     = arguments[:options] || {}
  @options[:http] ||= {}
  @options[:retry_on_status] ||= []

  @block       = block
  @compression = !!@options[:compression]
  @connections = __build_connections

  @serializer  = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) )
  @protocol    = options[:protocol] || DEFAULT_PROTOCOL

  @logger      = options[:logger]
  @tracer      = options[:tracer]

  @sniffer     = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)
  @counter     = 0
  @counter_mtx = Mutex.new
  @last_request_at = Time.now
  @reload_connections = options[:reload_connections]
  @reload_after    = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
  @resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
  @retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i }
end

Public Instance Methods

__build_connection(host, options={}, block=nil) click to toggle source

@abstract Build and return a connection.

A transport implementation *must* implement this method.
See {HTTP::Faraday#__build_connection} for an example.

@return [Connections::Connection] @api private

# File lib/opensearch/transport/transport/base.rb, line 179
def __build_connection(host, options={}, block=nil)
  raise NoMethodError, "Implement this method in your class"
end
__build_connections() click to toggle source

Builds and returns a collection of connections

The adapters have to implement the {Base#__build_connection} method.

@return [Connections::Collection] @api private

# File lib/opensearch/transport/transport/base.rb, line 156
def __build_connections
  Connections::Collection.new \
    :connections => hosts.map { |host|
    host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL
    host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT
    if (options[:user] || options[:http][:user]) && !host[:user]
      host[:user] ||= options[:user] || options[:http][:user]
      host[:password] ||= options[:password] || options[:http][:password]
    end

    __build_connection(host, (options[:transport_options] || {}), @block)
  },
    :selector_class => options[:selector_class],
    :selector => options[:selector]
end
__close_connections() click to toggle source

Closes the connections collection

@api private

# File lib/opensearch/transport/transport/base.rb, line 187
def __close_connections
  # A hook point for specific adapters when they need to close connections
end
__convert_to_json(o=nil, options={}) click to toggle source

Converts any non-String object to JSON

@api private

# File lib/opensearch/transport/transport/base.rb, line 234
def __convert_to_json(o=nil, options={})
  o = o.is_a?(String) ? o : serializer.dump(o, options)
end
__full_url(host) click to toggle source

Returns a full URL based on information from host

@param host [Hash] Host configuration passed in from {Client}

@api private

# File lib/opensearch/transport/transport/base.rb, line 243
def __full_url(host)
  url  = "#{host[:protocol]}://"
  url += "#{CGI.escape(host[:user])}:#{CGI.escape(host[:password])}@" if host[:user]
  url += host[:host]
  url += ":#{host[:port]}" if host[:port]
  url += host[:path] if host[:path]
  url
end
__log_response(method, path, params, body, url, response, json, took, duration) click to toggle source

Log request and response information

@api private

# File lib/opensearch/transport/transport/base.rb, line 195
def __log_response(method, path, params, body, url, response, json, took, duration)
  if logger
    sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@')
    log_info "#{method.to_s.upcase} #{sanitized_url} " +
                 "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"
    log_debug "> #{__convert_to_json(body)}" if body
    log_debug "< #{response.body}"
  end
end
__raise_transport_error(response) click to toggle source

Raise error specific for the HTTP response status or a generic server error

@api private

# File lib/opensearch/transport/transport/base.rb, line 225
def __raise_transport_error(response)
  error = ERRORS[response.status] || ServerError
  raise error.new "[#{response.status}] #{response.body}"
end
__rebuild_connections(arguments={}) click to toggle source

Rebuilds the connections collection in the transport.

The methods adds new connections from the passed hosts to the collection, and removes all connections not contained in the passed hosts.

@return [Connections::Collection] @api private

# File lib/opensearch/transport/transport/base.rb, line 132
def __rebuild_connections(arguments={})
  @state_mutex.synchronize do
    @hosts       = arguments[:hosts]    || []
    @options     = arguments[:options]  || {}

    __close_connections

    new_connections = __build_connections
    stale_connections = @connections.all.select  { |c| ! new_connections.include?(c) }
    new_connections = new_connections.reject { |c| @connections.all.include?(c) }

    @connections.remove(stale_connections)
    @connections.add(new_connections)
    @connections
  end
end
__trace(method, path, params, headers, body, url, response, json, took, duration) click to toggle source

Trace the request in the ‘curl` format

@api private

# File lib/opensearch/transport/transport/base.rb, line 209
def __trace(method, path, params, headers, body, url, response, json, took, duration)
  trace_url  = "http://localhost:9200/#{path}?pretty" +
      ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" )
  trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : ''
  trace_command = "curl -X #{method.to_s.upcase}"
  trace_command += " -H '#{headers.collect { |k,v| "#{k}: #{v}" }.join(", ")}'" if headers && !headers.empty?
  trace_command += " '#{trace_url}'#{trace_body}\n"
  tracer.info trace_command
  tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#"
  tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n"
end
get_connection(options={}) click to toggle source

Returns a connection from the connection pool by delegating to {Connections::Collection#get_connection}.

Resurrects dead connection if the ‘resurrect_after` timeout has passed. Increments the counter and performs connection reloading if the `reload_connections` option is set.

@return [Connections::Connection] @see Connections::Collection#get_connection

# File lib/opensearch/transport/transport/base.rb, line 95
def get_connection(options={})
  resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after

  @counter_mtx.synchronize { @counter += 1 }
  reload_connections!         if reload_connections && counter % reload_after == 0
  connections.get_connection(options)
end
host_unreachable_exceptions() click to toggle source

@abstract Returns an Array of connection errors specific to the transport implementation.

See {HTTP::Faraday#host_unreachable_exceptions} for an example.

@return [Array]

# File lib/opensearch/transport/transport/base.rb, line 380
def host_unreachable_exceptions
  [Errno::ECONNREFUSED]
end
perform_request(method, path, params = {}, body = nil, headers = nil, opts = {}, &block) click to toggle source

Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.

@abstract The transport implementation has to implement this method either in full,

or by invoking this method with a block. See {HTTP::Faraday#perform_request} for an example.

@param method [String] Request method @param path [String] The API endpoint @param params [Hash] Request parameters (will be serialized by {Connections::Connection#full_url}) @param body [Hash] Request body (will be serialized by the {#serializer}) @param headers [Hash] Request headers (will be serialized by the {#serializer}) @param block [Proc] Code block to evaluate, passed from the implementation

@return [Response] @raise [NoMethodError] If no block is passed @raise [ServerError] If request failed on server @raise [Error] If no connection is available

# File lib/opensearch/transport/transport/base.rb, line 270
def perform_request(method, path, params = {}, body = nil, headers = nil, opts = {}, &block)
  raise NoMethodError, 'Implement this method in your transport class' unless block_given?

  start = Time.now
  tries = 0
  reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure])

  max_retries = if opts.key?(:retry_on_failure)
    opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure]
  elsif options.key?(:retry_on_failure)
    options[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure]
  end

  params = params.clone

  ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i }

  begin
    tries     += 1
    connection = get_connection or raise Error.new('Cannot get new connection from pool.')

    if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash)
      params = connection.connection.params.merge(params.to_hash)
    end

    url      = connection.full_url(path, params)

    response = block.call(connection, url)

    connection.healthy! if connection.failures > 0

    # Raise an exception so we can catch it for `retry_on_status`
    __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)

  rescue OpenSearch::Transport::Transport::ServerError => e
    if response && @retry_on_status.include?(response.status)
      log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}"
      if tries <= (max_retries || DEFAULT_MAX_RETRIES)
        retry
      else
        log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries"
        raise e
      end
    else
      raise e
    end

  rescue *host_unreachable_exceptions => e
    log_error "[#{e.class}] #{e.message} #{connection.host.inspect}"

    connection.dead!

    if reload_on_failure and tries < connections.all.size
      log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})"
      reload_connections! and retry
    end

    if max_retries
      log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}"
      if tries <= max_retries
        retry
      else
        log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries"
        raise e
      end
    else
      raise e
    end

  rescue Exception => e
    log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})"
    raise e

  end #/begin

  duration = Time.now - start

  if response.status.to_i >= 300
    __log_response    method, path, params, body, url, response, nil, 'N/A', duration
    __trace  method, path, params, connection.connection.headers, body, url, response, nil, 'N/A', duration if tracer

    # Log the failure only when `ignore` doesn't match the response status
    unless ignore.include?(response.status.to_i)
      log_fatal "[#{response.status}] #{response.body}"
    end

    __raise_transport_error response unless ignore.include?(response.status.to_i)
  end

  json     = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/
  took     = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a'

  unless ignore.include?(response.status.to_i)
    __log_response   method, path, params, body, url, response, json, took, duration
  end

  __trace  method, path, params, connection.connection.headers, body, url, response, nil, 'N/A', duration if tracer

  warnings(response.headers['warning']) if response.headers&.[]('warning')

  Response.new response.status, json || response.body, response.headers
ensure
  @last_request_at = Time.now
end
reload_connections!() click to toggle source

Reloads and replaces the connection collection based on cluster information

@see Sniffer#hosts

# File lib/opensearch/transport/transport/base.rb, line 107
def reload_connections!
  hosts = sniffer.hosts
  __rebuild_connections :hosts => hosts, :options => options
  self
rescue SnifferTimeoutError
  log_error "[SnifferTimeoutError] Timeout when reloading connections."
  self
end
resurrect_dead_connections!() click to toggle source

Tries to “resurrect” all eligible dead connections

@see Connections::Connection#resurrect!

# File lib/opensearch/transport/transport/base.rb, line 120
def resurrect_dead_connections!
  connections.dead.each { |c| c.resurrect! }
end

Private Instance Methods

apply_headers(client, options) click to toggle source
# File lib/opensearch/transport/transport/base.rb, line 418
def apply_headers(client, options)
  headers = options[:headers] || {}
  headers[CONTENT_TYPE_STR] = find_value(headers, CONTENT_TYPE_REGEX) || DEFAULT_CONTENT_TYPE
  headers[USER_AGENT_STR] = find_value(headers, USER_AGENT_REGEX) || user_agent_header(client)
  client.headers[ACCEPT_ENCODING] = GZIP if use_compression?
  client.headers.merge!(headers)
end
decompress_response(body) click to toggle source
# File lib/opensearch/transport/transport/base.rb, line 397
def decompress_response(body)
  return body unless use_compression?
  return body unless gzipped?(body)

  io = StringIO.new(body)
  gzip_reader = if RUBY_ENCODING
                  Zlib::GzipReader.new(io, :encoding => 'ASCII-8BIT')
                else
                  Zlib::GzipReader.new(io)
                end
  gzip_reader.read
end
find_value(hash, regex) click to toggle source
# File lib/opensearch/transport/transport/base.rb, line 426
def find_value(hash, regex)
  key_value = hash.find { |k,v| k.to_s.downcase =~ regex }
  if key_value
    hash.delete(key_value[0])
    key_value[1]
  end
end
gzipped?(body) click to toggle source
# File lib/opensearch/transport/transport/base.rb, line 410
def gzipped?(body)
  body[0..1].unpack(HEX_STRING_DIRECTIVE)[0] == GZIP_FIRST_TWO_BYTES
end
use_compression?() click to toggle source
# File lib/opensearch/transport/transport/base.rb, line 414
def use_compression?
  @compression
end
user_agent_header(client) click to toggle source
# File lib/opensearch/transport/transport/base.rb, line 434
def user_agent_header(client)
  @user_agent ||= begin
    meta = ["RUBY_VERSION: #{RUBY_VERSION}"]
    if RbConfig::CONFIG && RbConfig::CONFIG['host_os']
      meta << "#{RbConfig::CONFIG['host_os'].split('_').first[/[a-z]+/i].downcase} #{RbConfig::CONFIG['target_cpu']}"
    end
    "opensearch-ruby/#{VERSION} (#{meta.join('; ')})"
  end
end
warnings(warning) click to toggle source
# File lib/opensearch/transport/transport/base.rb, line 444
def warnings(warning)
  warn("warning: #{warning}")
end