class Consul::Async::ConsulEndpoint
This class represents a specific path in Consul
HTTP API It also stores x_consul_index
and keep track on updates of API So, it basically performs all the optimizations to keep updated with Consul
internal state.
Attributes
conf[R]
default_value[R]
enforce_json_200[R]
last_result[R]
path[R]
query_params[R]
queue[R]
start_time[R]
stats[R]
x_consul_index[R]
Public Class Methods
new(conf, path, enforce_json_200 = true, query_params = {}, default_value = '[]', agent = nil)
click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 145 def initialize(conf, path, enforce_json_200 = true, query_params = {}, default_value = '[]', agent = nil) @conf = conf.create(path, agent: agent) @default_value = default_value @path = path @queue = EM::Queue.new @x_consul_index = 0 @s_callbacks = [] @e_callbacks = [] @enforce_json_200 = enforce_json_200 @start_time = Time.now.utc @consecutive_errors = 0 @query_params = query_params @stopping = false @stats = EndPointStats.new @last_result = ConsulResult.new(default_value, false, HttpResponse.new(nil), 0, stats, 1, fake: true) on_response { |result| @stats.on_response result } on_error { |http| @stats.on_error http } _enable_network_debug if conf.debug && conf.debug[:network] fetch queue << 0 end
Public Instance Methods
_enable_network_debug()
click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 167 def _enable_network_debug on_response do |result| state = result.x_consul_index.to_i < 1 ? '[WARN]' : '[ OK ]' stats = result.stats warn "[DBUG]#{state}#{result.modified? ? '[MODFIED]' : '[NO DIFF]'}" \ "[s:#{stats.successes},err:#{stats.errors}]" \ "[#{stats.body_bytes_human.ljust(8)}][#{stats.bytes_per_sec_human.ljust(9)}]"\ " #{path.ljust(48)} idx:#{result.x_consul_index}, next in #{result.retry_in} s" end on_error { |http| warn "[ERROR]: #{path}: #{http.error.inspect}" } end
on_error(&block)
click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 183 def on_error(&block) @e_callbacks << block end
on_response(&block)
click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 179 def on_response(&block) @s_callbacks << block end
ready?()
click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 187 def ready? @ready end
terminate()
click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 191 def terminate @stopping = true end
Private Instance Methods
_compute_retry_in(retry_in)
click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 224 def _compute_retry_in(retry_in) retry_in / 2 + Consul::Async::Utilities.random.rand(retry_in) end
_handle_error(http, consul_index) { || ... }
click to toggle source
rubocop:enable Style/ClassVars
# File lib/consul/async/consul_endpoint.rb, line 234 def _handle_error(http, consul_index) retry_in = _compute_retry_in([600, conf.retry_duration + 2**@consecutive_errors].min) if http.response_header.status == 429 _last_429 retry_in = 60 + Consul::Async::Utilities.random.rand(180) if retry_in < 60 _last_429[:time] = Time.now.utc _last_429[:count] += 1 if (_last_429[:count] % 10) == 1 if _last_429[:count] == 1 ::Consul::Async::Debug.puts_error "Rate limiting detected on Consul side (HTTP 429)!\n\n" \ "******************************* CONFIGURATION ISSUE DETECTED *******************************\n" \ "* Too many simultaneous connections for Consul agent #{conf.base_url}\n" \ "* You should tune 'limits.http_max_conns_per_client' to a higher value.\n" \ "* This program will behave badly until you change this.\n" \ "* See https://www.consul.io/docs/agent/options.html#http_max_conns_per_client for more info\n" \ "********************************************************************************************\n\n" end ::Consul::Async::Debug.puts_error "[#{path}] Too many conns to #{conf.base_url}, errors=#{_last_429[:count]} - Retry in #{retry_in}s #{stats.body_bytes_human}" end else ::Consul::Async::Debug.puts_error "[#{path}] X-Consul-Index:#{consul_index} - #{http.error} - Retry in #{retry_in}s #{stats.body_bytes_human}" end @consecutive_errors += 1 http_result = HttpResponse.new(http) EventMachine.add_timer(retry_in) do yield queue.push(consul_index) end @e_callbacks.each { |c| c.call(http_result) } end
_last_429()
click to toggle source
rubocop:disable Style/ClassVars
# File lib/consul/async/consul_endpoint.rb, line 229 def _last_429 @@_last_429 ||= { count: 0 } end
build_request(consul_index)
click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 197 def build_request(consul_index) res = { head: { 'Accept' => 'application/json', 'X-Consul-Index' => consul_index, 'X-Consul-Token' => conf.token }, path: path, query: { wait: "#{conf.wait_duration}s", index: consul_index, stale: 'stale' }, keepalive: true, callback: method(:on_response) } res[:head]['accept-encoding'] = 'identity' unless conf.enable_gzip_compression @query_params.each_pair do |k, v| res[:query][k] = v end res end
fetch()
click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 265 def fetch options = { connect_timeout: 5, # default connection setup timeout inactivity_timeout: conf.wait_duration + 1 + (conf.wait_duration / 16) # default connection inactivity (post-setup) timeout } unless conf.tls_cert_chain.nil? options[:tls] = { cert_chain_file: conf.tls_cert_chain, private_key_file: conf.tls_private_key, verify_peer: conf.tls_verify_peer } end connection = { conn: EventMachine::HttpRequest.new(conf.base_url, options) } cb = proc do |consul_index| http = connection[:conn].get(build_request(consul_index)) http.callback do # Dirty hack, but contrary to other path, when key is not present, Consul returns 404 is_kv_empty = path.start_with?('/v1/kv') && http.response_header.status == 404 if !is_kv_empty && enforce_json_200 && http.response_header.status != 200 && http.response_header['Content-Type'] != 'application/json' _handle_error(http, consul_index) do warn "[RETRY][#{path}] (#{@consecutive_errors} errors)" if (@consecutive_errors % 10) == 1 end else n_consul_index = find_x_consul_index(http) @x_consul_index = n_consul_index.to_i if n_consul_index @consecutive_errors = 0 http_result = if is_kv_empty HttpResponse.new(http, default_value) else HttpResponse.new(http) end new_content = http_result.response.freeze modified = @last_result.fake? || @last_result.data != new_content if n_consul_index.nil? retry_in = modified ? conf.missing_index_retry_time_on_diff : conf.missing_index_retry_time_on_unchanged n_consul_index = consul_index else retry_in = modified ? conf.min_duration : conf.retry_on_non_diff retry_in = 0.1 if retry_in < (Time.now - @last_result.last_update) end retry_in = _compute_retry_in(retry_in) retry_in = 0.1 if retry_in < 0.1 unless @stopping EventMachine.add_timer(retry_in) do queue.push(n_consul_index) end end result = ConsulResult.new(new_content, modified, http_result, n_consul_index, stats, retry_in, fake: false) @last_result = result @ready = true @s_callbacks.each { |c| c.call(result) } end end http.errback do unless @stopping _handle_error(http, consul_index) do if (@consecutive_errors % 10) == 1 add_msg = http.error if Gem.win_platform? && http.error.include?('unable to create new socket: Too many open files') add_msg += "\n *** Windows does not support more than 2048 watches, watch less endpoints ***" end ::Consul::Async::Debug.puts_error "[RETRY][#{path}] (#{@consecutive_errors} errors) due to #{add_msg}" end end end end queue.pop(&cb) end queue.pop(&cb) end
find_x_consul_index(http)
click to toggle source
# File lib/consul/async/consul_endpoint.rb, line 220 def find_x_consul_index(http) http.response_header['X-Consul-Index'] end