class Consul::Async::EndPointsManager

This class keep references over all endpoints (aka datasources) registered for all templates. This allows reusing those endpoints as well as performing listing and garbage collecting. This is also the main object visible from ERB files which contains all methods available to template writters.

Attributes

consul_conf[R]
coordinate[R]
net_info[R]
remote_resource[R]
running[R]
start_time[R]
templates[R]
vault_conf[R]

Public Class Methods

new(consul_configuration, vault_configuration, templates, trim_mode = nil) click to toggle source
# File lib/consul/async/consul_template.rb, line 95
def initialize(consul_configuration, vault_configuration, templates, trim_mode = nil)
  @running = true
  @consul_conf = consul_configuration
  @vault_conf = vault_configuration
  @trim_mode = trim_mode
  @endpoints = {}
  @iteration = 1
  @start_time = Time.now.utc
  @last_debug_time = 0
  @net_info = {
    success: 0,
    errors: 0,
    bytes_read: 0,
    changes: 0,
    network_bytes: 0
  }
  @templates = templates
  @context = {
    current_erb_path: nil,
    template_info: {
      'source_root' => nil,
      'source' => nil,
      'destination' => nil,
      'was_rendered_once' => false
    },
    params: {}
  }
  @max_consecutive_errors_on_endpoint = consul_configuration.max_consecutive_errors_on_endpoint || 10
  @fail_fast_errors = consul_configuration.fail_fast_errors
  @coordinate = Coordinate.new(self)
  @remote_resource = RemoteResource.new(self)

  # Setup token renewal
  vault_setup_token_renew unless @vault_conf.token.nil? || !@vault_conf.token_renew
end

Public Instance Methods

agent_members(wan: false, agent: nil) click to toggle source

www.consul.io/api/agent.html#list-members

# File lib/consul/async/consul_template.rb, line 210
def agent_members(wan: false, agent: nil)
  path = '/v1/agent/members'
  query_params = {}
  query_params['wan'] = true if wan
  default_value = '[]'
  create_if_missing(path, query_params, agent: agent) { ConsulTemplateMembers.new(ConsulEndpoint.new(consul_conf, path, true, query_params, default_value, agent)) }
end
agent_metrics(agent: nil) click to toggle source

www.consul.io/api/agent.html#view-metrics

# File lib/consul/async/consul_template.rb, line 202
def agent_metrics(agent: nil)
  path = '/v1/agent/metrics'
  query_params = {}
  default_value = '{"Gauges":[], "Points":[], "Member":{}, "Counters":[], "Samples":{}}'
  create_if_missing(path, query_params, agent: agent) { ConsulAgentMetrics.new(ConsulEndpoint.new(consul_conf, path, true, query_params, default_value, agent)) }
end
agent_self(agent: nil) click to toggle source

www.consul.io/api/agent.html#read-configuration

# File lib/consul/async/consul_template.rb, line 194
def agent_self(agent: nil)
  path = '/v1/agent/self'
  query_params = {}
  default_value = '{"Config":{}, "Coord":{}, "Member":{}, "Meta":{}, "Stats":{}}'
  create_if_missing(path, query_params, agent: agent) { ConsulAgentSelf.new(ConsulEndpoint.new(consul_conf, path, true, query_params, default_value, agent)) }
end
checks_for_node(name, dc: nil, passing: false, agent: nil) click to toggle source

www.consul.io/api/health.html#list-checks-for-node

# File lib/consul/async/consul_template.rb, line 155
def checks_for_node(name, dc: nil, passing: false, agent: nil)
  raise 'You must specify a name for a service' if name.nil?

  path = "/v1/health/node/#{name}"
  query_params = {}
  query_params[:dc] = dc if dc
  query_params[:passing] = passing if passing
  create_if_missing(path, query_params, agent: agent) { ConsulTemplateChecks.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '[]', agent)) }
end
checks_for_service(name, dc: nil, passing: false, agent: nil) click to toggle source

www.consul.io/api/health.html#list-checks-for-service

# File lib/consul/async/consul_template.rb, line 144
def checks_for_service(name, dc: nil, passing: false, agent: nil)
  raise 'You must specify a name for a service' if name.nil?

  path = "/v1/health/checks/#{name}"
  query_params = {}
  query_params[:dc] = dc if dc
  query_params[:passing] = passing if passing
  create_if_missing(path, query_params, agent: agent) { ConsulTemplateChecks.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '[]', agent)) }
end
checks_in_state(check_state, dc: nil, agent: nil) click to toggle source

www.consul.io/api-docs/health#list-checks-in-state Supported in Consul 1.7+

# File lib/consul/async/consul_template.rb, line 167
def checks_in_state(check_state, dc: nil, agent: nil)
  valid_checks_states = %w[any critical passing warning]
  raise "checks_in_state('#{check_state}'...) must be one of #{valid_checks_states}" unless valid_checks_states.include?(check_state)

  path = "/v1/health/state/#{check_state}"
  query_params = {}
  query_params[:dc] = dc if dc
  create_if_missing(path, query_params, agent: agent) { ConsulTemplateChecks.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '[]', agent)) }
end
create_if_missing(path, query_params, fail_fast_errors: @fail_fast_errors, max_consecutive_errors_on_endpoint: @max_consecutive_errors_on_endpoint, agent: nil, endpoint_id: nil) { || ... } click to toggle source
# File lib/consul/async/consul_template.rb, line 398
def create_if_missing(path, query_params, fail_fast_errors: @fail_fast_errors,
                      max_consecutive_errors_on_endpoint: @max_consecutive_errors_on_endpoint,
                      agent: nil, endpoint_id: nil)
  endpoint_id ||= begin
                    fqdn = path.dup
                    query_params.each_pair do |k, v|
                      fqdn = "#{agent}#{fqdn}&#{k}=#{v}"
                    end
                    fqdn
                  end
  tpl = @endpoints[endpoint_id]
  unless tpl
    tpl = yield
    ::Consul::Async::Debug.print_debug "path #{path.ljust(64)} #{query_params.inspect}\r"
    @endpoints[endpoint_id] = tpl
    tpl.endpoint.on_response do |result|
      @net_info[:success] += 1
      @net_info[:bytes_read] += result.data.bytesize
      @net_info[:changes] += 1 if result.modified?
      @net_info[:network_bytes] += result.http.response_header['Content-Length'].to_i
    end
    tpl.endpoint.on_error do |_err|
      @net_info[:errors] = @net_info[:errors] + 1
      if tpl.endpoint.stats.successes.zero? && fail_fast_errors
        ::Consul::Async::Debug.puts_error "Endpoint #{path} is failing at first call with fail fast activated, terminating..."
        terminate
      end
      if tpl.endpoint.stats.consecutive_errors > max_consecutive_errors_on_endpoint
        ::Consul::Async::Debug.puts_error "Endpoint #{path} has too many consecutive errors: #{tpl.endpoint.stats.consecutive_errors}, terminating..."
        terminate
      end
    end
  end
  tpl._seen_at(@iteration)
  tpl
end
datacenters(agent: nil) click to toggle source

www.consul.io/api/catalog.html#list-datacenters

# File lib/consul/async/consul_template.rb, line 242
def datacenters(agent: nil)
  path = '/v1/catalog/datacenters'
  query_params = {}
  create_if_missing(path, query_params, agent: agent) { ConsulTemplateDatacenters.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '[]', agent)) }
end
find_line(e) click to toggle source
# File lib/consul/async/consul_template.rb, line 303
def find_line(e)
  return e.message.dup[5..-1] if e.message.start_with? '(erb):'

  e.backtrace.each do |line|
    return line[5..-1] if line.start_with? '(erb):'
  end
  nil
end
kv(name = nil, dc: nil, keys: nil, recurse: false, agent: nil) click to toggle source

www.consul.io/api/kv.html#read-key

# File lib/consul/async/consul_template.rb, line 249
def kv(name = nil, dc: nil, keys: nil, recurse: false, agent: nil)
  path = "/v1/kv/#{name}"
  query_params = {}
  query_params[:dc] = dc if dc
  query_params[:recurse] = recurse if recurse
  query_params[:keys] = keys if keys
  default_value = '[]'
  create_if_missing(path, query_params, agent: agent) { ConsulTemplateKV.new(ConsulEndpoint.new(consul_conf, path, true, query_params, default_value, agent), name) }
end
node(name_or_id, dc: nil, agent: nil) click to toggle source

www.consul.io/api/catalog.html#list-services-for-node

# File lib/consul/async/consul_template.rb, line 186
def node(name_or_id, dc: nil, agent: nil)
  path = "/v1/catalog/node/#{name_or_id}"
  query_params = {}
  query_params[:dc] = dc if dc
  create_if_missing(path, query_params, agent: agent) { ConsulTemplateNodes.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '{}', agent)) }
end
nodes(dc: nil, agent: nil) click to toggle source

www.consul.io/api/catalog.html#list-nodes

# File lib/consul/async/consul_template.rb, line 178
def nodes(dc: nil, agent: nil)
  path = '/v1/catalog/nodes'
  query_params = {}
  query_params[:dc] = dc if dc
  create_if_missing(path, query_params, agent: agent) { ConsulTemplateNodes.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '[]', agent)) }
end
param(key, default_value = nil) click to toggle source

Return a param of template

# File lib/consul/async/consul_template.rb, line 219
def param(key, default_value = nil)
  v = @context[:params][key]
  v ||= @context[:params][key.to_sym]
  v ||= default_value
  v
end
render(tpl, tpl_file_path, params = {}, current_template_info: nil) click to toggle source
# File lib/consul/async/consul_template.rb, line 312
def render(tpl, tpl_file_path, params = {}, current_template_info: nil)
  # Ugly, but allow to use render_file well to support stack of calls
  old_value = @context
  tpl_info = current_template_info.merge('source' => tpl_file_path.freeze)
  @context = {
    current_erb_path: tpl_file_path,
    params: params,
    template_info: tpl_info
  }
  result = ERB.new(tpl, nil, @trim_mode).result(binding)
  raise "Result is not a string :='#{result}' for #{tpl_file_path}" unless result.is_a?(String)

  @context = old_value
  result
rescue StandardError => e
  e2 = InvalidTemplateException.new e
  raise e2, "[TEMPLATE EVALUATION ERROR] #{tpl_file_path}#{find_line(e)}: #{e.message}"
rescue SyntaxError => e
  e2 = SyntaxErrorInTemplate.new e
  raise e2, "[TEMPLATE SYNTAX ERROR] #{tpl_file_path}#{find_line(e)}: #{e.message}"
end
render_file(path, params = {}) click to toggle source

render a relative file with the given params accessible from template

# File lib/consul/async/consul_template.rb, line 287
def render_file(path, params = {})
  new_path = File.expand_path(path, File.dirname(@context[:current_erb_path]))
  raise "render_file ERROR: #{path} is resolved as #{new_path}, but the file does not exists" unless File.exist? new_path

  render(File.read(new_path), new_path, params, current_template_info: template_info)
end
render_from_string(template_content, params = {}) click to toggle source

render a sub template from a string template

# File lib/consul/async/consul_template.rb, line 295
def render_from_string(template_content, params = {})
  return unless template_content

  sha1res = Digest::SHA1.hexdigest(template_content)
  new_path = File.expand_path(":memory:sha1:#{sha1res}", File.dirname(@context[:current_erb_path]))
  render(template_content, new_path, params, current_template_info: template_info)
end
secret(path = '', post_data = nil, agent: nil) click to toggle source
# File lib/consul/async/consul_template.rb, line 272
def secret(path = '', post_data = nil, agent: nil)
  raise "You need to provide a vault token to use 'secret' keyword" if vault_conf.token.nil?

  path = "/v1/#{path}".gsub(%r{/{2,}}, '/')
  query_params = {}
  method = post_data ? 'POST' : 'GET'
  create_if_missing(path, query_params,
                    fail_fast_errors: vault_conf.fail_fast_errors,
                    max_consecutive_errors_on_endpoint: vault_conf.max_consecutive_errors_on_endpoint,
                    agent: agent) do
    ConsulTemplateVaultSecret.new(VaultEndpoint.new(vault_conf, path, method, true, query_params, JSON.generate(data: {}), agent: agent))
  end
end
secrets(path = '', agent: nil) click to toggle source
# File lib/consul/async/consul_template.rb, line 259
def secrets(path = '', agent: nil)
  raise "You need to provide a vault token to use 'secret' keyword" if vault_conf.token.nil?

  path = "/v1/#{path}".gsub(%r{/{2,}}, '/')
  query_params = { list: 'true' }
  create_if_missing(path, query_params,
                    fail_fast_errors: vault_conf.fail_fast_errors,
                    max_consecutive_errors_on_endpoint: vault_conf.max_consecutive_errors_on_endpoint,
                    agent: agent) do
    ConsulTemplateVaultSecretList.new(VaultEndpoint.new(vault_conf, path, 'GET', true, query_params, JSON.generate(data: { keys: [] }), agent: agent))
  end
end
service(name, dc: nil, passing: false, tag: nil, agent: nil) click to toggle source

www.consul.io/api/health.html#list-nodes-for-service

# File lib/consul/async/consul_template.rb, line 132
def service(name, dc: nil, passing: false, tag: nil, agent: nil)
  raise 'You must specify a name for a service' if name.nil?

  path = "/v1/health/service/#{name}"
  query_params = {}
  query_params[:dc] = dc if dc
  query_params[:passing] = passing if passing
  query_params[:tag] = tag if tag
  create_if_missing(path, query_params, agent: agent) { ConsulTemplateService.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '[]', agent)) }
end
services(dc: nil, tag: nil, agent: nil) click to toggle source

www.consul.io/api/catalog.html#list-services

# File lib/consul/async/consul_template.rb, line 232
def services(dc: nil, tag: nil, agent: nil)
  path = '/v1/catalog/services'
  query_params = {}
  query_params[:dc] = dc if dc
  # Tag filtering is performed on client side
  query_params[:tag] = tag if tag
  create_if_missing(path, query_params, agent: agent) { ConsulTemplateServices.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '{}', agent)) }
end
template_info() click to toggle source

Get information about current template

# File lib/consul/async/consul_template.rb, line 227
def template_info
  @context[:template_info]
end
terminate() click to toggle source
# File lib/consul/async/consul_template.rb, line 384
def terminate
  @running = false
  @endpoints.each_value do |v|
    v.endpoint.terminate
  end
  @endpoints = {}
end
vault_setup_token_renew() click to toggle source
# File lib/consul/async/consul_template.rb, line 392
def vault_setup_token_renew
  path = 'v1/auth/token/renew-self'
  ::Consul::Async::Debug.print_debug 'Setting up vault token renewal'
  VaultEndpoint.new(vault_conf, path, :POST, {}, {})
end
write(file, tpl, last_result, tpl_file_path, params = {}, current_template_info: {}) click to toggle source
# File lib/consul/async/consul_template.rb, line 334
def write(file, tpl, last_result, tpl_file_path, params = {}, current_template_info: {})
  @iteration = Time.now.utc - @start_time
  data = render(tpl, tpl_file_path, params, current_template_info: current_template_info)
  not_ready = []
  ready = 0
  to_cleanup = []
  @endpoints.each_pair do |endpoint_key, endpt|
    if endpt.ready?
      ready += 1
    else
      # We consider only the endpoints usefull with current iteration
      not_ready << endpoint_key unless endpt.seen_at < @iteration
    end
    to_cleanup << endpoint_key if (@iteration - endpt.seen_at) > 60
  end
  if not_ready.count.positive? || data.nil?
    if @iteration - @last_debug_time > 1
      @last_debug_time = @iteration
      if data.nil?
        ::Consul::Async::Debug.print_info "Waiting for Template #{tpl_file_path} to not return nil, consider it not ready...\r"
      else
        ::Consul::Async::Debug.print_info "Waiting for data from #{not_ready.count}/#{not_ready.count + ready} endpoints: #{not_ready[0..2]}...\r"
      end
    end
    return [false, false, nil]
  end
  if to_cleanup.count > 1
    ::Consul::Async::Debug.puts_info "Cleaned up #{to_cleanup.count} endpoints: #{to_cleanup}"
    to_cleanup.each do |to_remove|
      x = @endpoints.delete(to_remove)
      x.endpoint.terminate
    end
  end
  if last_result != data
    ::Consul::Async::Debug.print_info "Write #{Utilities.bytes_to_h data.bytesize} bytes to #{file}, "\
                 "netinfo=#{@net_info} aka "\
                 "#{Utilities.bytes_to_h((net_info[:network_bytes] / (Time.now.utc - @start_time)).round(1))}/s ...\r"
    tmp_file = "#{file}.tmp"
    begin
      File.open(tmp_file, 'w') do |f|
        f.write data
      end
      File.rename(tmp_file, file)
    rescue StandardError => e
      ::Consul::Async::Debug.puts_error "Failed  writting #{Utilities.bytes_to_h data.bytesize} bytes to #{file}: #{e.class}, message: #{e.inspect}"
    end
  end
  [true, data != last_result, data]
end