class Consul::Async::JSONEndpoint

Endpoint (aka URL) of a remote API that might be called

Attributes

conf[R]
default_value[R]
enforce_json_200[R]
last_result[R]
query_params[R]
queue[R]
start_time[R]
stats[R]
url[R]

Public Class Methods

new(conf, url, default_value, enforce_json_200 = true, query_params = {}) click to toggle source
# File lib/consul/async/json_endpoint.rb, line 96
def initialize(conf, url, default_value, enforce_json_200 = true, query_params = {})
  @conf = conf.create(url)
  @default_value = default_value
  @url = url
  @queue = EM::Queue.new
  @s_callbacks = []
  @e_callbacks = []
  @enforce_json_200 = enforce_json_200
  @start_time = Time.now.utc
  @consecutive_errors = 0
  @query_params = query_params
  @stopping = false
  @stats = EndPointStats.new
  @last_result = JSONResult.new(default_value.to_json, false, HttpResponse.new(nil), stats, 1, fake: true)
  on_response { |result| @stats.on_response result }
  on_error { |http| @stats.on_error http }
  _enable_network_debug if conf.debug && conf.debug[:network]
  fetch
  queue << Object.new
end

Public Instance Methods

_enable_network_debug() click to toggle source
# File lib/consul/async/json_endpoint.rb, line 117
def _enable_network_debug
  on_response do |result|
    stats = result.stats
    warn "[DBUG][ OK ]#{result.modified? ? '[MODFIED]' : '[NO DIFF]'}" \
    "[s:#{stats.successes},err:#{stats.errors}]" \
    "[#{stats.body_bytes_human.ljust(8)}][#{stats.bytes_per_sec_human.ljust(9)}]"\
    " #{url.ljust(48)}, next in #{result.retry_in} s"
  end
  on_error { |http| warn "[ERROR]: #{url}: #{http.error.inspect}" }
end
on_error(&block) click to toggle source
# File lib/consul/async/json_endpoint.rb, line 132
def on_error(&block)
  @e_callbacks << block
end
on_response(&block) click to toggle source
# File lib/consul/async/json_endpoint.rb, line 128
def on_response(&block)
  @s_callbacks << block
end
ready?() click to toggle source
# File lib/consul/async/json_endpoint.rb, line 136
def ready?
  @ready
end
terminate() click to toggle source
# File lib/consul/async/json_endpoint.rb, line 140
def terminate
  @stopping = true
end

Private Instance Methods

_compute_retry_in(retry_in) click to toggle source
# File lib/consul/async/json_endpoint.rb, line 169
def _compute_retry_in(retry_in)
  retry_in / 2 + Consul::Async::Utilities.random.rand(retry_in)
end
_handle_error(http) { || ... } click to toggle source
# File lib/consul/async/json_endpoint.rb, line 173
def _handle_error(http)
  retry_in = _compute_retry_in([600, conf.retry_duration + 2**@consecutive_errors].min)
  ::Consul::Async::Debug.puts_error "[#{url}] - #{http.error} - Retry in #{retry_in}s #{stats.body_bytes_human}"
  @consecutive_errors += 1
  http_result = HttpResponse.new(http)
  EventMachine.add_timer(retry_in) do
    yield
    queue.push(Object.new)
  end
  @e_callbacks.each { |c| c.call(http_result) }
end
build_request() click to toggle source
# File lib/consul/async/json_endpoint.rb, line 146
def build_request
  res = {
    head: {
      'Accept' => 'application/json'
    },
    url: url,
    keepalive: true,
    callback: method(:on_response)
  }
  if conf.json_body
    res[:body] = conf.json_body.to_json
    res[:head]['Content-Type'] = 'application/json'
  end
  res[:head]['accept-encoding'] = 'identity' unless conf.enable_gzip_compression
  conf.headers.map do |k, v|
    res[:head][k] = v
  end
  @query_params.each_pair do |k, v|
    res[:query][k] = v
  end
  res
end
fetch() click to toggle source
# File lib/consul/async/json_endpoint.rb, line 185
def fetch
  options = {
    connect_timeout: 5, # default connection setup timeout
    inactivity_timeout: 60 # default connection inactivity (post-setup) timeout
  }
  unless conf.tls_cert_chain.nil?
    options[:tls] = {
      cert_chain_file: conf.tls_cert_chain,
      private_key_file: conf.tls_private_key,
      verify_peer: conf.tls_verify_peer
    }
  end
  connection = {
    conn: EventMachine::HttpRequest.new(conf.url, options)
  }
  cb = proc do
    request_method = conf.request_method.to_sym
    http = connection[:conn].send(request_method, build_request)
    http.callback do
      if enforce_json_200 && !(200..299).cover?(http.response_header.status) && http.response_header['Content-Type'] != 'application/json'
        _handle_error(http) do
          warn "[RETRY][#{url}] (#{@consecutive_errors} errors)" if (@consecutive_errors % 10) == 1
        end
      else
        @consecutive_errors = 0
        http_result = HttpResponse.new(http)
        new_content = http_result.response.freeze
        modified = @last_result.fake? || @last_result.data != new_content
        retry_in = modified ? conf.min_duration : conf.retry_on_non_diff
        retry_in = _compute_retry_in(retry_in)
        retry_in = 0.1 if retry_in < 0.1
        unless @stopping
          EventMachine.add_timer(retry_in) do
            queue.push(Object.new)
          end
        end
        result = JSONResult.new(new_content, modified, http_result, stats, retry_in, fake: false)
        @last_result = result
        @ready = true
        @s_callbacks.each { |c| c.call(result) }
      end
    end

    http.errback do
      unless @stopping
        _handle_error(http) do
          if (@consecutive_errors % 10) == 1
            add_msg = http.error
            if Gem.win_platform? && http.error.include?('unable to create new socket: Too many open files')
              add_msg += "\n *** Windows does not support more than 2048 watches, watch less endpoints ***"
            end
            ::Consul::Async::Debug.puts_error "[RETRY][#{url}] (#{@consecutive_errors} errors) due to #{add_msg}"
          end
        end
      end
    end
    queue.pop(&cb)
  end
  queue.pop(&cb)
end