class Consul::Async::VaultEndpoint

Endpoint in vault (a path in Vault)

Attributes

conf[R]
default_value[R]
enforce_json_200[R]
http_method[R]
last_result[R]
path[R]
query_params[R]
queue[R]
start_time[R]
stats[R]

Public Class Methods

new(conf, path, http_method = 'GET', enforce_json_200 = true, query_params = {}, default_value = '{}', post_data = {}, agent: nil) click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 132
def initialize(conf, path, http_method = 'GET', enforce_json_200 = true, query_params = {}, default_value = '{}', post_data = {}, agent: nil)
  @conf = conf.create(path, agent: agent)
  @default_value = default_value
  @path = path
  @http_method = http_method
  @queue = EM::Queue.new
  @x_consul_index = 0
  @s_callbacks = []
  @e_callbacks = []
  @enforce_json_200 = enforce_json_200
  @start_time = Time.now.utc
  @consecutive_errors = 0
  @query_params = query_params
  @post_data = post_data
  @stopping = false
  @stats = EndPointStats.new
  @last_result = VaultResult.new(VaultHttpResponse.new(nil, default_value), false, stats, 1)
  on_response { |result| @stats.on_response result }
  on_error { |http| @stats.on_error http }
  _enable_network_debug if conf.debug && conf.debug[:network]
  fetch
  queue.push 0
end

Public Instance Methods

_enable_network_debug() click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 156
def _enable_network_debug
  on_response do |result|
    state = result.x_consul_index.to_i < 1 ? '[WARN]' : '[ OK ]'
    stats = result.stats
    warn "[DBUG]#{state}#{result.modified? ? '[MODFIED]' : '[NO DIFF]'}" \
    "[s:#{stats.successes},err:#{stats.errors}]" \
    "[#{stats.body_bytes_human.ljust(8)}][#{stats.bytes_per_sec_human.ljust(9)}]"\
    " #{path.ljust(48)} idx:#{result.x_consul_index}, next in #{result.retry_in} s"
  end
  on_error { |http| ::Consul::Async::Debug.puts_error "#{path}: #{http.error}" }
end
on_error(&block) click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 172
def on_error(&block)
  @e_callbacks << block
end
on_response(&block) click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 168
def on_response(&block)
  @s_callbacks << block
end
ready?() click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 176
def ready?
  @ready
end
terminate() click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 180
def terminate
  @stopping = true
end

Private Instance Methods

_get_errors(http) click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 210
def _get_errors(http)
  return [http.error] if http.error

  unless http.json.nil?
    return http.json['errors'] if http.json.key?('errors')
  end
  ['unknown error']
end
_handle_error(http) { || ... } click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 219
def _handle_error(http)
  retry_in = [conf.max_retry_duration, conf.retry_duration + 2**@consecutive_errors].min
  ::Consul::Async::Debug.puts_error "[#{path}][#{http_method}] Code: #{http.response_header.status} #{_get_errors(http).join(' - ')} - Retry in #{retry_in}s"
  @consecutive_errors += 1
  http_result = VaultHttpResponse.new(http, default_value)
  EventMachine.add_timer(retry_in) do
    yield
    queue.push(0)
  end
  @e_callbacks.each { |c| c.call(http_result) }
end
build_request() click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 186
def build_request
  res = {
    head: {
      'Accept' => 'application/json',
      'X-Vault-Token' => conf.token
    },
    query: {},
    path: path,
    keepalive: true,
    callback: method(:on_response)
  }
  # if @post_data
  #   res[:body] = JSON.generate(@post_data)
  # end
  @query_params.each_pair do |k, v|
    res[:query][k] = v
  end
  res
end
fetch() click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 231
def fetch
  options = {
    connect_timeout: 5, # default connection setup timeout
    inactivity_timeout: 1 # default connection inactivity (post-setup) timeout
  }
  unless conf.tls_cert_chain.nil?
    options[:tls] = {
      cert_chain_file: conf.tls_cert_chain,
      private_key_file: conf.tls_private_key,
      verify_peer: conf.tls_verify_peer
    }
  end
  connection = EventMachine::HttpRequest.new(conf.base_url, options)
  cb = proc do |_|
    http = connection.send(http_method.downcase, build_request) # Under the hood: c.send('get', {stuff}) === c.get({stuff})
    http.callback do
      http_result = VaultHttpResponse.new(http.dup.freeze, default_value)
      if enforce_json_200 && ![200, 404].include?(http.response_header.status)
        _handle_error(http_result) { connection = EventMachine::HttpRequest.new(conf.base_url, options) }
      else
        @consecutive_errors = 0
        modified = @last_result.nil? ? true : @last_result.data != http_result.response # Leaving it do to stats with this later
        retry_in = get_lease_duration(http_result) * conf.lease_duration_factor
        retry_in = [retry_in, conf.max_retry_duration].min
        retry_in = [retry_in, conf.min_duration].max
        result = VaultResult.new(http_result, modified, stats, retry_in)
        unless @stopping
          EventMachine.add_timer(retry_in) do
            queue.push(0)
          end
        end
        @last_result = result
        @ready = true
        @s_callbacks.each { |c| c.call(result) }
      end
    end

    http.errback do
      _handle_error(http) { connection = EventMachine::HttpRequest.new(conf.base_url, options) } unless @stopping
    end
    queue.pop(&cb)
  end
  queue.pop(&cb)
end
get_lease_duration(result) click to toggle source
# File lib/consul/async/vault_endpoint.rb, line 206
def get_lease_duration(result)
  result.json['lease_duration'] || conf.min_duration
end