class Consul::Async::JSONEndpoint
Endpoint (aka URL) of a remote API that might be called
Attributes
conf[R]
default_value[R]
enforce_json_200[R]
last_result[R]
query_params[R]
queue[R]
start_time[R]
stats[R]
url[R]
Public Class Methods
new(conf, url, default_value, enforce_json_200 = true, query_params = {})
click to toggle source
# File lib/consul/async/json_endpoint.rb, line 96 def initialize(conf, url, default_value, enforce_json_200 = true, query_params = {}) @conf = conf.create(url) @default_value = default_value @url = url @queue = EM::Queue.new @s_callbacks = [] @e_callbacks = [] @enforce_json_200 = enforce_json_200 @start_time = Time.now.utc @consecutive_errors = 0 @query_params = query_params @stopping = false @stats = EndPointStats.new @last_result = JSONResult.new(default_value.to_json, false, HttpResponse.new(nil), stats, 1, fake: true) on_response { |result| @stats.on_response result } on_error { |http| @stats.on_error http } _enable_network_debug if conf.debug && conf.debug[:network] fetch queue << Object.new end
Public Instance Methods
_enable_network_debug()
click to toggle source
# File lib/consul/async/json_endpoint.rb, line 117 def _enable_network_debug on_response do |result| stats = result.stats warn "[DBUG][ OK ]#{result.modified? ? '[MODFIED]' : '[NO DIFF]'}" \ "[s:#{stats.successes},err:#{stats.errors}]" \ "[#{stats.body_bytes_human.ljust(8)}][#{stats.bytes_per_sec_human.ljust(9)}]"\ " #{url.ljust(48)}, next in #{result.retry_in} s" end on_error { |http| warn "[ERROR]: #{url}: #{http.error.inspect}" } end
on_error(&block)
click to toggle source
# File lib/consul/async/json_endpoint.rb, line 132 def on_error(&block) @e_callbacks << block end
on_response(&block)
click to toggle source
# File lib/consul/async/json_endpoint.rb, line 128 def on_response(&block) @s_callbacks << block end
ready?()
click to toggle source
# File lib/consul/async/json_endpoint.rb, line 136 def ready? @ready end
terminate()
click to toggle source
# File lib/consul/async/json_endpoint.rb, line 140 def terminate @stopping = true end
Private Instance Methods
_compute_retry_in(retry_in)
click to toggle source
# File lib/consul/async/json_endpoint.rb, line 169 def _compute_retry_in(retry_in) retry_in / 2 + Consul::Async::Utilities.random.rand(retry_in) end
_handle_error(http) { || ... }
click to toggle source
# File lib/consul/async/json_endpoint.rb, line 173 def _handle_error(http) retry_in = _compute_retry_in([600, conf.retry_duration + 2**@consecutive_errors].min) ::Consul::Async::Debug.puts_error "[#{url}] - #{http.error} - Retry in #{retry_in}s #{stats.body_bytes_human}" @consecutive_errors += 1 http_result = HttpResponse.new(http) EventMachine.add_timer(retry_in) do yield queue.push(Object.new) end @e_callbacks.each { |c| c.call(http_result) } end
build_request()
click to toggle source
# File lib/consul/async/json_endpoint.rb, line 146 def build_request res = { head: { 'Accept' => 'application/json' }, url: url, keepalive: true, callback: method(:on_response) } if conf.json_body res[:body] = conf.json_body.to_json res[:head]['Content-Type'] = 'application/json' end res[:head]['accept-encoding'] = 'identity' unless conf.enable_gzip_compression conf.headers.map do |k, v| res[:head][k] = v end @query_params.each_pair do |k, v| res[:query][k] = v end res end
fetch()
click to toggle source
# File lib/consul/async/json_endpoint.rb, line 185 def fetch options = { connect_timeout: 5, # default connection setup timeout inactivity_timeout: 60 # default connection inactivity (post-setup) timeout } unless conf.tls_cert_chain.nil? options[:tls] = { cert_chain_file: conf.tls_cert_chain, private_key_file: conf.tls_private_key, verify_peer: conf.tls_verify_peer } end connection = { conn: EventMachine::HttpRequest.new(conf.url, options) } cb = proc do request_method = conf.request_method.to_sym http = connection[:conn].send(request_method, build_request) http.callback do if enforce_json_200 && !(200..299).cover?(http.response_header.status) && http.response_header['Content-Type'] != 'application/json' _handle_error(http) do warn "[RETRY][#{url}] (#{@consecutive_errors} errors)" if (@consecutive_errors % 10) == 1 end else @consecutive_errors = 0 http_result = HttpResponse.new(http) new_content = http_result.response.freeze modified = @last_result.fake? || @last_result.data != new_content retry_in = modified ? conf.min_duration : conf.retry_on_non_diff retry_in = _compute_retry_in(retry_in) retry_in = 0.1 if retry_in < 0.1 unless @stopping EventMachine.add_timer(retry_in) do queue.push(Object.new) end end result = JSONResult.new(new_content, modified, http_result, stats, retry_in, fake: false) @last_result = result @ready = true @s_callbacks.each { |c| c.call(result) } end end http.errback do unless @stopping _handle_error(http) do if (@consecutive_errors % 10) == 1 add_msg = http.error if Gem.win_platform? && http.error.include?('unable to create new socket: Too many open files') add_msg += "\n *** Windows does not support more than 2048 watches, watch less endpoints ***" end ::Consul::Async::Debug.puts_error "[RETRY][#{url}] (#{@consecutive_errors} errors) due to #{add_msg}" end end end end queue.pop(&cb) end queue.pop(&cb) end