class Consul::Async::ConsulEndpoint

This class represents a specific path in Consul HTTP API It also stores x_consul_index and keep track on updates of API So, it basically performs all the optimizations to keep updated with Consul internal state.

Attributes

conf[R]
default_value[R]
enforce_json_200[R]
last_result[R]
path[R]
query_params[R]
queue[R]
start_time[R]
stats[R]
x_consul_index[R]

Public Class Methods

new(conf, path, enforce_json_200 = true, query_params = {}, default_value = '[]', agent = nil) click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 145
def initialize(conf, path, enforce_json_200 = true, query_params = {}, default_value = '[]', agent = nil)
  @conf = conf.create(path, agent: agent)
  @default_value = default_value
  @path = path
  @queue = EM::Queue.new
  @x_consul_index = 0
  @s_callbacks = []
  @e_callbacks = []
  @enforce_json_200 = enforce_json_200
  @start_time = Time.now.utc
  @consecutive_errors = 0
  @query_params = query_params
  @stopping = false
  @stats = EndPointStats.new
  @last_result = ConsulResult.new(default_value, false, HttpResponse.new(nil), 0, stats, 1, fake: true)
  on_response { |result| @stats.on_response result }
  on_error { |http| @stats.on_error http }
  _enable_network_debug if conf.debug && conf.debug[:network]
  fetch
  queue << 0
end

Public Instance Methods

_enable_network_debug() click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 167
def _enable_network_debug
  on_response do |result|
    state = result.x_consul_index.to_i < 1 ? '[WARN]' : '[ OK ]'
    stats = result.stats
    warn "[DBUG]#{state}#{result.modified? ? '[MODFIED]' : '[NO DIFF]'}" \
    "[s:#{stats.successes},err:#{stats.errors}]" \
    "[#{stats.body_bytes_human.ljust(8)}][#{stats.bytes_per_sec_human.ljust(9)}]"\
    " #{path.ljust(48)} idx:#{result.x_consul_index}, next in #{result.retry_in} s"
  end
  on_error { |http| warn "[ERROR]: #{path}: #{http.error.inspect}" }
end
on_error(&block) click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 183
def on_error(&block)
  @e_callbacks << block
end
on_response(&block) click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 179
def on_response(&block)
  @s_callbacks << block
end
ready?() click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 187
def ready?
  @ready
end
terminate() click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 191
def terminate
  @stopping = true
end

Private Instance Methods

_compute_retry_in(retry_in) click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 224
def _compute_retry_in(retry_in)
  retry_in / 2 + Consul::Async::Utilities.random.rand(retry_in)
end
_handle_error(http, consul_index) { || ... } click to toggle source

rubocop:enable Style/ClassVars

# File lib/consul/async/consul_endpoint.rb, line 234
def _handle_error(http, consul_index)
  retry_in = _compute_retry_in([600, conf.retry_duration + 2**@consecutive_errors].min)
  if http.response_header.status == 429
    _last_429
    retry_in = 60 + Consul::Async::Utilities.random.rand(180) if retry_in < 60
    _last_429[:time] = Time.now.utc
    _last_429[:count] += 1
    if (_last_429[:count] % 10) == 1
      if _last_429[:count] == 1
        ::Consul::Async::Debug.puts_error "Rate limiting detected on Consul side (HTTP 429)!\n\n" \
                                          "******************************* CONFIGURATION ISSUE DETECTED *******************************\n" \
                                          "* Too many simultaneous connections for Consul agent #{conf.base_url}\n" \
                                          "* You should tune 'limits.http_max_conns_per_client' to a higher value.\n" \
                                          "* This program will behave badly until you change this.\n" \
                                          "* See https://www.consul.io/docs/agent/options.html#http_max_conns_per_client for more info\n" \
                                          "********************************************************************************************\n\n"
      end
      ::Consul::Async::Debug.puts_error "[#{path}] Too many conns to #{conf.base_url}, errors=#{_last_429[:count]} - Retry in #{retry_in}s #{stats.body_bytes_human}"
    end
  else
    ::Consul::Async::Debug.puts_error "[#{path}] X-Consul-Index:#{consul_index} - #{http.error} - Retry in #{retry_in}s #{stats.body_bytes_human}"
  end
  @consecutive_errors += 1
  http_result = HttpResponse.new(http)
  EventMachine.add_timer(retry_in) do
    yield
    queue.push(consul_index)
  end
  @e_callbacks.each { |c| c.call(http_result) }
end
_last_429() click to toggle source

rubocop:disable Style/ClassVars

# File lib/consul/async/consul_endpoint.rb, line 229
def _last_429
  @@_last_429 ||= { count: 0 }
end
build_request(consul_index) click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 197
def build_request(consul_index)
  res = {
    head: {
      'Accept' => 'application/json',
      'X-Consul-Index' => consul_index,
      'X-Consul-Token' => conf.token
    },
    path: path,
    query: {
      wait: "#{conf.wait_duration}s",
      index: consul_index,
      stale: 'stale'
    },
    keepalive: true,
    callback: method(:on_response)
  }
  res[:head]['accept-encoding'] = 'identity' unless conf.enable_gzip_compression
  @query_params.each_pair do |k, v|
    res[:query][k] = v
  end
  res
end
fetch() click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 265
def fetch
  options = {
    connect_timeout: 5, # default connection setup timeout
    inactivity_timeout: conf.wait_duration + 1 + (conf.wait_duration / 16) # default connection inactivity (post-setup) timeout
  }
  unless conf.tls_cert_chain.nil?
    options[:tls] = {
      cert_chain_file: conf.tls_cert_chain,
      private_key_file: conf.tls_private_key,
      verify_peer: conf.tls_verify_peer
    }
  end
  connection = {
    conn: EventMachine::HttpRequest.new(conf.base_url, options)
  }
  cb = proc do |consul_index|
    http = connection[:conn].get(build_request(consul_index))
    http.callback do
      # Dirty hack, but contrary to other path, when key is not present, Consul returns 404
      is_kv_empty = path.start_with?('/v1/kv') && http.response_header.status == 404
      if !is_kv_empty && enforce_json_200 && http.response_header.status != 200 && http.response_header['Content-Type'] != 'application/json'
        _handle_error(http, consul_index) do
          warn "[RETRY][#{path}] (#{@consecutive_errors} errors)" if (@consecutive_errors % 10) == 1
        end
      else
        n_consul_index = find_x_consul_index(http)
        @x_consul_index = n_consul_index.to_i if n_consul_index
        @consecutive_errors = 0
        http_result = if is_kv_empty
                        HttpResponse.new(http, default_value)
                      else
                        HttpResponse.new(http)
                      end
        new_content = http_result.response.freeze
        modified = @last_result.fake? || @last_result.data != new_content
        if n_consul_index.nil?
          retry_in = modified ? conf.missing_index_retry_time_on_diff : conf.missing_index_retry_time_on_unchanged
          n_consul_index = consul_index
        else
          retry_in = modified ? conf.min_duration : conf.retry_on_non_diff
          retry_in = 0.1 if retry_in < (Time.now - @last_result.last_update)
        end
        retry_in = _compute_retry_in(retry_in)
        retry_in = 0.1 if retry_in < 0.1
        unless @stopping
          EventMachine.add_timer(retry_in) do
            queue.push(n_consul_index)
          end
        end
        result = ConsulResult.new(new_content, modified, http_result, n_consul_index, stats, retry_in, fake: false)
        @last_result = result
        @ready = true
        @s_callbacks.each { |c| c.call(result) }
      end
    end

    http.errback do
      unless @stopping
        _handle_error(http, consul_index) do
          if (@consecutive_errors % 10) == 1
            add_msg = http.error
            if Gem.win_platform? && http.error.include?('unable to create new socket: Too many open files')
              add_msg += "\n *** Windows does not support more than 2048 watches, watch less endpoints ***"
            end
            ::Consul::Async::Debug.puts_error "[RETRY][#{path}] (#{@consecutive_errors} errors) due to #{add_msg}"
          end
        end
      end
    end
    queue.pop(&cb)
  end
  queue.pop(&cb)
end
find_x_consul_index(http) click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 220
def find_x_consul_index(http)
  http.response_header['X-Consul-Index']
end