class Consul::Async::VaultEndpoint
Endpoint in vault (a path in Vault)
Attributes
conf[R]
default_value[R]
enforce_json_200[R]
http_method[R]
last_result[R]
path[R]
query_params[R]
queue[R]
start_time[R]
stats[R]
Public Class Methods
new(conf, path, http_method = 'GET', enforce_json_200 = true, query_params = {}, default_value = '{}', post_data = {}, agent: nil)
click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 132 def initialize(conf, path, http_method = 'GET', enforce_json_200 = true, query_params = {}, default_value = '{}', post_data = {}, agent: nil) @conf = conf.create(path, agent: agent) @default_value = default_value @path = path @http_method = http_method @queue = EM::Queue.new @x_consul_index = 0 @s_callbacks = [] @e_callbacks = [] @enforce_json_200 = enforce_json_200 @start_time = Time.now.utc @consecutive_errors = 0 @query_params = query_params @post_data = post_data @stopping = false @stats = EndPointStats.new @last_result = VaultResult.new(VaultHttpResponse.new(nil, default_value), false, stats, 1) on_response { |result| @stats.on_response result } on_error { |http| @stats.on_error http } _enable_network_debug if conf.debug && conf.debug[:network] fetch queue.push 0 end
Public Instance Methods
_enable_network_debug()
click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 156 def _enable_network_debug on_response do |result| state = result.x_consul_index.to_i < 1 ? '[WARN]' : '[ OK ]' stats = result.stats warn "[DBUG]#{state}#{result.modified? ? '[MODFIED]' : '[NO DIFF]'}" \ "[s:#{stats.successes},err:#{stats.errors}]" \ "[#{stats.body_bytes_human.ljust(8)}][#{stats.bytes_per_sec_human.ljust(9)}]"\ " #{path.ljust(48)} idx:#{result.x_consul_index}, next in #{result.retry_in} s" end on_error { |http| ::Consul::Async::Debug.puts_error "#{path}: #{http.error}" } end
on_error(&block)
click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 172 def on_error(&block) @e_callbacks << block end
on_response(&block)
click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 168 def on_response(&block) @s_callbacks << block end
ready?()
click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 176 def ready? @ready end
terminate()
click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 180 def terminate @stopping = true end
Private Instance Methods
_get_errors(http)
click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 210 def _get_errors(http) return [http.error] if http.error unless http.json.nil? return http.json['errors'] if http.json.key?('errors') end ['unknown error'] end
_handle_error(http) { || ... }
click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 219 def _handle_error(http) retry_in = [conf.max_retry_duration, conf.retry_duration + 2**@consecutive_errors].min ::Consul::Async::Debug.puts_error "[#{path}][#{http_method}] Code: #{http.response_header.status} #{_get_errors(http).join(' - ')} - Retry in #{retry_in}s" @consecutive_errors += 1 http_result = VaultHttpResponse.new(http, default_value) EventMachine.add_timer(retry_in) do yield queue.push(0) end @e_callbacks.each { |c| c.call(http_result) } end
build_request()
click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 186 def build_request res = { head: { 'Accept' => 'application/json', 'X-Vault-Token' => conf.token }, query: {}, path: path, keepalive: true, callback: method(:on_response) } # if @post_data # res[:body] = JSON.generate(@post_data) # end @query_params.each_pair do |k, v| res[:query][k] = v end res end
fetch()
click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 231 def fetch options = { connect_timeout: 5, # default connection setup timeout inactivity_timeout: 1 # default connection inactivity (post-setup) timeout } unless conf.tls_cert_chain.nil? options[:tls] = { cert_chain_file: conf.tls_cert_chain, private_key_file: conf.tls_private_key, verify_peer: conf.tls_verify_peer } end connection = EventMachine::HttpRequest.new(conf.base_url, options) cb = proc do |_| http = connection.send(http_method.downcase, build_request) # Under the hood: c.send('get', {stuff}) === c.get({stuff}) http.callback do http_result = VaultHttpResponse.new(http.dup.freeze, default_value) if enforce_json_200 && ![200, 404].include?(http.response_header.status) _handle_error(http_result) { connection = EventMachine::HttpRequest.new(conf.base_url, options) } else @consecutive_errors = 0 modified = @last_result.nil? ? true : @last_result.data != http_result.response # Leaving it do to stats with this later retry_in = get_lease_duration(http_result) * conf.lease_duration_factor retry_in = [retry_in, conf.max_retry_duration].min retry_in = [retry_in, conf.min_duration].max result = VaultResult.new(http_result, modified, stats, retry_in) unless @stopping EventMachine.add_timer(retry_in) do queue.push(0) end end @last_result = result @ready = true @s_callbacks.each { |c| c.call(result) } end end http.errback do _handle_error(http) { connection = EventMachine::HttpRequest.new(conf.base_url, options) } unless @stopping end queue.pop(&cb) end queue.pop(&cb) end
get_lease_duration(result)
click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 206 def get_lease_duration(result) result.json['lease_duration'] || conf.min_duration end