class Consul::Async::EndPointsManager
This class keep references over all endpoints (aka datasources) registered for all templates. This allows reusing those endpoints as well as performing listing and garbage collecting. This is also the main object visible from ERB files which contains all methods available to template writters.
Attributes
Public Class Methods
# File lib/consul/async/consul_template.rb, line 95 def initialize(consul_configuration, vault_configuration, templates, trim_mode = nil) @running = true @consul_conf = consul_configuration @vault_conf = vault_configuration @trim_mode = trim_mode @endpoints = {} @iteration = 1 @start_time = Time.now.utc @last_debug_time = 0 @net_info = { success: 0, errors: 0, bytes_read: 0, changes: 0, network_bytes: 0 } @templates = templates @context = { current_erb_path: nil, template_info: { 'source_root' => nil, 'source' => nil, 'destination' => nil, 'was_rendered_once' => false }, params: {} } @max_consecutive_errors_on_endpoint = consul_configuration.max_consecutive_errors_on_endpoint || 10 @fail_fast_errors = consul_configuration.fail_fast_errors @coordinate = Coordinate.new(self) @remote_resource = RemoteResource.new(self) # Setup token renewal vault_setup_token_renew unless @vault_conf.token.nil? || !@vault_conf.token_renew end
Public Instance Methods
www.consul.io/api/agent.html#list-members
# File lib/consul/async/consul_template.rb, line 210 def agent_members(wan: false, agent: nil) path = '/v1/agent/members' query_params = {} query_params['wan'] = true if wan default_value = '[]' create_if_missing(path, query_params, agent: agent) { ConsulTemplateMembers.new(ConsulEndpoint.new(consul_conf, path, true, query_params, default_value, agent)) } end
www.consul.io/api/agent.html#view-metrics
# File lib/consul/async/consul_template.rb, line 202 def agent_metrics(agent: nil) path = '/v1/agent/metrics' query_params = {} default_value = '{"Gauges":[], "Points":[], "Member":{}, "Counters":[], "Samples":{}}' create_if_missing(path, query_params, agent: agent) { ConsulAgentMetrics.new(ConsulEndpoint.new(consul_conf, path, true, query_params, default_value, agent)) } end
www.consul.io/api/agent.html#read-configuration
# File lib/consul/async/consul_template.rb, line 194 def agent_self(agent: nil) path = '/v1/agent/self' query_params = {} default_value = '{"Config":{}, "Coord":{}, "Member":{}, "Meta":{}, "Stats":{}}' create_if_missing(path, query_params, agent: agent) { ConsulAgentSelf.new(ConsulEndpoint.new(consul_conf, path, true, query_params, default_value, agent)) } end
www.consul.io/api/health.html#list-checks-for-node
# File lib/consul/async/consul_template.rb, line 155 def checks_for_node(name, dc: nil, passing: false, agent: nil) raise 'You must specify a name for a service' if name.nil? path = "/v1/health/node/#{name}" query_params = {} query_params[:dc] = dc if dc query_params[:passing] = passing if passing create_if_missing(path, query_params, agent: agent) { ConsulTemplateChecks.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '[]', agent)) } end
www.consul.io/api/health.html#list-checks-for-service
# File lib/consul/async/consul_template.rb, line 144 def checks_for_service(name, dc: nil, passing: false, agent: nil) raise 'You must specify a name for a service' if name.nil? path = "/v1/health/checks/#{name}" query_params = {} query_params[:dc] = dc if dc query_params[:passing] = passing if passing create_if_missing(path, query_params, agent: agent) { ConsulTemplateChecks.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '[]', agent)) } end
www.consul.io/api-docs/health#list-checks-in-state Supported in Consul
1.7+
# File lib/consul/async/consul_template.rb, line 167 def checks_in_state(check_state, dc: nil, agent: nil) valid_checks_states = %w[any critical passing warning] raise "checks_in_state('#{check_state}'...) must be one of #{valid_checks_states}" unless valid_checks_states.include?(check_state) path = "/v1/health/state/#{check_state}" query_params = {} query_params[:dc] = dc if dc create_if_missing(path, query_params, agent: agent) { ConsulTemplateChecks.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '[]', agent)) } end
# File lib/consul/async/consul_template.rb, line 398 def create_if_missing(path, query_params, fail_fast_errors: @fail_fast_errors, max_consecutive_errors_on_endpoint: @max_consecutive_errors_on_endpoint, agent: nil, endpoint_id: nil) endpoint_id ||= begin fqdn = path.dup query_params.each_pair do |k, v| fqdn = "#{agent}#{fqdn}&#{k}=#{v}" end fqdn end tpl = @endpoints[endpoint_id] unless tpl tpl = yield ::Consul::Async::Debug.print_debug "path #{path.ljust(64)} #{query_params.inspect}\r" @endpoints[endpoint_id] = tpl tpl.endpoint.on_response do |result| @net_info[:success] += 1 @net_info[:bytes_read] += result.data.bytesize @net_info[:changes] += 1 if result.modified? @net_info[:network_bytes] += result.http.response_header['Content-Length'].to_i end tpl.endpoint.on_error do |_err| @net_info[:errors] = @net_info[:errors] + 1 if tpl.endpoint.stats.successes.zero? && fail_fast_errors ::Consul::Async::Debug.puts_error "Endpoint #{path} is failing at first call with fail fast activated, terminating..." terminate end if tpl.endpoint.stats.consecutive_errors > max_consecutive_errors_on_endpoint ::Consul::Async::Debug.puts_error "Endpoint #{path} has too many consecutive errors: #{tpl.endpoint.stats.consecutive_errors}, terminating..." terminate end end end tpl._seen_at(@iteration) tpl end
www.consul.io/api/catalog.html#list-datacenters
# File lib/consul/async/consul_template.rb, line 242 def datacenters(agent: nil) path = '/v1/catalog/datacenters' query_params = {} create_if_missing(path, query_params, agent: agent) { ConsulTemplateDatacenters.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '[]', agent)) } end
# File lib/consul/async/consul_template.rb, line 303 def find_line(e) return e.message.dup[5..-1] if e.message.start_with? '(erb):' e.backtrace.each do |line| return line[5..-1] if line.start_with? '(erb):' end nil end
www.consul.io/api/kv.html#read-key
# File lib/consul/async/consul_template.rb, line 249 def kv(name = nil, dc: nil, keys: nil, recurse: false, agent: nil) path = "/v1/kv/#{name}" query_params = {} query_params[:dc] = dc if dc query_params[:recurse] = recurse if recurse query_params[:keys] = keys if keys default_value = '[]' create_if_missing(path, query_params, agent: agent) { ConsulTemplateKV.new(ConsulEndpoint.new(consul_conf, path, true, query_params, default_value, agent), name) } end
www.consul.io/api/catalog.html#list-services-for-node
# File lib/consul/async/consul_template.rb, line 186 def node(name_or_id, dc: nil, agent: nil) path = "/v1/catalog/node/#{name_or_id}" query_params = {} query_params[:dc] = dc if dc create_if_missing(path, query_params, agent: agent) { ConsulTemplateNodes.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '{}', agent)) } end
www.consul.io/api/catalog.html#list-nodes
# File lib/consul/async/consul_template.rb, line 178 def nodes(dc: nil, agent: nil) path = '/v1/catalog/nodes' query_params = {} query_params[:dc] = dc if dc create_if_missing(path, query_params, agent: agent) { ConsulTemplateNodes.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '[]', agent)) } end
Return a param of template
# File lib/consul/async/consul_template.rb, line 219 def param(key, default_value = nil) v = @context[:params][key] v ||= @context[:params][key.to_sym] v ||= default_value v end
# File lib/consul/async/consul_template.rb, line 312 def render(tpl, tpl_file_path, params = {}, current_template_info: nil) # Ugly, but allow to use render_file well to support stack of calls old_value = @context tpl_info = current_template_info.merge('source' => tpl_file_path.freeze) @context = { current_erb_path: tpl_file_path, params: params, template_info: tpl_info } result = ERB.new(tpl, nil, @trim_mode).result(binding) raise "Result is not a string :='#{result}' for #{tpl_file_path}" unless result.is_a?(String) @context = old_value result rescue StandardError => e e2 = InvalidTemplateException.new e raise e2, "[TEMPLATE EVALUATION ERROR] #{tpl_file_path}#{find_line(e)}: #{e.message}" rescue SyntaxError => e e2 = SyntaxErrorInTemplate.new e raise e2, "[TEMPLATE SYNTAX ERROR] #{tpl_file_path}#{find_line(e)}: #{e.message}" end
render a relative file with the given params accessible from template
# File lib/consul/async/consul_template.rb, line 287 def render_file(path, params = {}) new_path = File.expand_path(path, File.dirname(@context[:current_erb_path])) raise "render_file ERROR: #{path} is resolved as #{new_path}, but the file does not exists" unless File.exist? new_path render(File.read(new_path), new_path, params, current_template_info: template_info) end
render a sub template from a string template
# File lib/consul/async/consul_template.rb, line 295 def render_from_string(template_content, params = {}) return unless template_content sha1res = Digest::SHA1.hexdigest(template_content) new_path = File.expand_path(":memory:sha1:#{sha1res}", File.dirname(@context[:current_erb_path])) render(template_content, new_path, params, current_template_info: template_info) end
# File lib/consul/async/consul_template.rb, line 272 def secret(path = '', post_data = nil, agent: nil) raise "You need to provide a vault token to use 'secret' keyword" if vault_conf.token.nil? path = "/v1/#{path}".gsub(%r{/{2,}}, '/') query_params = {} method = post_data ? 'POST' : 'GET' create_if_missing(path, query_params, fail_fast_errors: vault_conf.fail_fast_errors, max_consecutive_errors_on_endpoint: vault_conf.max_consecutive_errors_on_endpoint, agent: agent) do ConsulTemplateVaultSecret.new(VaultEndpoint.new(vault_conf, path, method, true, query_params, JSON.generate(data: {}), agent: agent)) end end
# File lib/consul/async/consul_template.rb, line 259 def secrets(path = '', agent: nil) raise "You need to provide a vault token to use 'secret' keyword" if vault_conf.token.nil? path = "/v1/#{path}".gsub(%r{/{2,}}, '/') query_params = { list: 'true' } create_if_missing(path, query_params, fail_fast_errors: vault_conf.fail_fast_errors, max_consecutive_errors_on_endpoint: vault_conf.max_consecutive_errors_on_endpoint, agent: agent) do ConsulTemplateVaultSecretList.new(VaultEndpoint.new(vault_conf, path, 'GET', true, query_params, JSON.generate(data: { keys: [] }), agent: agent)) end end
www.consul.io/api/health.html#list-nodes-for-service
# File lib/consul/async/consul_template.rb, line 132 def service(name, dc: nil, passing: false, tag: nil, agent: nil) raise 'You must specify a name for a service' if name.nil? path = "/v1/health/service/#{name}" query_params = {} query_params[:dc] = dc if dc query_params[:passing] = passing if passing query_params[:tag] = tag if tag create_if_missing(path, query_params, agent: agent) { ConsulTemplateService.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '[]', agent)) } end
www.consul.io/api/catalog.html#list-services
# File lib/consul/async/consul_template.rb, line 232 def services(dc: nil, tag: nil, agent: nil) path = '/v1/catalog/services' query_params = {} query_params[:dc] = dc if dc # Tag filtering is performed on client side query_params[:tag] = tag if tag create_if_missing(path, query_params, agent: agent) { ConsulTemplateServices.new(ConsulEndpoint.new(consul_conf, path, true, query_params, '{}', agent)) } end
Get information about current template
# File lib/consul/async/consul_template.rb, line 227 def template_info @context[:template_info] end
# File lib/consul/async/consul_template.rb, line 384 def terminate @running = false @endpoints.each_value do |v| v.endpoint.terminate end @endpoints = {} end
# File lib/consul/async/consul_template.rb, line 392 def vault_setup_token_renew path = 'v1/auth/token/renew-self' ::Consul::Async::Debug.print_debug 'Setting up vault token renewal' VaultEndpoint.new(vault_conf, path, :POST, {}, {}) end
# File lib/consul/async/consul_template.rb, line 334 def write(file, tpl, last_result, tpl_file_path, params = {}, current_template_info: {}) @iteration = Time.now.utc - @start_time data = render(tpl, tpl_file_path, params, current_template_info: current_template_info) not_ready = [] ready = 0 to_cleanup = [] @endpoints.each_pair do |endpoint_key, endpt| if endpt.ready? ready += 1 else # We consider only the endpoints usefull with current iteration not_ready << endpoint_key unless endpt.seen_at < @iteration end to_cleanup << endpoint_key if (@iteration - endpt.seen_at) > 60 end if not_ready.count.positive? || data.nil? if @iteration - @last_debug_time > 1 @last_debug_time = @iteration if data.nil? ::Consul::Async::Debug.print_info "Waiting for Template #{tpl_file_path} to not return nil, consider it not ready...\r" else ::Consul::Async::Debug.print_info "Waiting for data from #{not_ready.count}/#{not_ready.count + ready} endpoints: #{not_ready[0..2]}...\r" end end return [false, false, nil] end if to_cleanup.count > 1 ::Consul::Async::Debug.puts_info "Cleaned up #{to_cleanup.count} endpoints: #{to_cleanup}" to_cleanup.each do |to_remove| x = @endpoints.delete(to_remove) x.endpoint.terminate end end if last_result != data ::Consul::Async::Debug.print_info "Write #{Utilities.bytes_to_h data.bytesize} bytes to #{file}, "\ "netinfo=#{@net_info} aka "\ "#{Utilities.bytes_to_h((net_info[:network_bytes] / (Time.now.utc - @start_time)).round(1))}/s ...\r" tmp_file = "#{file}.tmp" begin File.open(tmp_file, 'w') do |f| f.write data end File.rename(tmp_file, file) rescue StandardError => e ::Consul::Async::Debug.puts_error "Failed writting #{Utilities.bytes_to_h data.bytesize} bytes to #{file}: #{e.class}, message: #{e.inspect}" end end [true, data != last_result, data] end