class Prefab::ConfigClient

Constants

DEFAULT_CHECKPOINT_FREQ_SEC
DEFAULT_S3CF_BUCKET
RECONNECT_WAIT

Public Class Methods

new(base_client, timeout) click to toggle source
# File lib/prefab/config_client.rb, line 7
def initialize(base_client, timeout)
  @base_client = base_client
  @timeout = timeout
  @initialization_lock = Concurrent::ReadWriteLock.new

  @checkpoint_freq_secs = DEFAULT_CHECKPOINT_FREQ_SEC

  @config_loader = Prefab::ConfigLoader.new(@base_client)
  @config_resolver = Prefab::ConfigResolver.new(@base_client, @config_loader)

  @initialization_lock.acquire_write_lock

  @cancellable_interceptor = Prefab::CancellableInterceptor.new(@base_client)

  @s3_cloud_front = ENV["PREFAB_S3CF_BUCKET"] || DEFAULT_S3CF_BUCKET
  load_checkpoint
  start_checkpointing_thread
end
value_to_delta(key, config_value, namespace = nil) click to toggle source
# File lib/prefab/config_client.rb, line 60
def self.value_to_delta(key, config_value, namespace = nil)
  Prefab::ConfigDelta.new(key: [namespace, key].compact.join(":"),
                          value: config_value)
end

Public Instance Methods

get(prop) click to toggle source
# File lib/prefab/config_client.rb, line 31
def get(prop)
  @initialization_lock.with_read_lock do
    @config_resolver.get(prop)
  end
end
reset() click to toggle source
# File lib/prefab/config_client.rb, line 51
def reset
  @base_client.reset!
  @_stub = nil
end
start_streaming() click to toggle source
# File lib/prefab/config_client.rb, line 26
def start_streaming
  @streaming = true
  start_api_connection_thread(@config_loader.highwater_mark)
end
to_s() click to toggle source
# File lib/prefab/config_client.rb, line 56
def to_s
  @config_resolver.to_s
end
upsert(key, config_value, namespace = nil, previous_key = nil) click to toggle source
# File lib/prefab/config_client.rb, line 37
def upsert(key, config_value, namespace = nil, previous_key = nil)
  raise "Key must not contain ':' set namespaces separately" if key.include? ":"
  raise "Namespace must not contain ':'" if namespace&.include?(":")
  config_delta = Prefab::ConfigClient.value_to_delta(key, config_value, namespace)
  upsert_req = Prefab::UpsertRequest.new(config_delta: config_delta)
  upsert_req.previous_key = previous_key if previous_key&.present?

  @base_client.request Prefab::ConfigService, :upsert, req_options: { timeout: @timeout }, params: upsert_req
  @base_client.stats.increment("prefab.config.upsert")
  @config_loader.set(config_delta)
  @config_loader.rm(previous_key) if previous_key&.present?
  @config_resolver.update
end

Private Instance Methods

finish_init!(source) click to toggle source
# File lib/prefab/config_client.rb, line 145
def finish_init!(source)
  if @initialization_lock.write_locked?
    @base_client.log_internal Logger::DEBUG, "Unlocked Config via #{source}"
    @initialization_lock.release_write_lock
    @base_client.log.set_config_client(self)
  end
end
load_checkpoint() click to toggle source

Bootstrap out of the cache returns the high-watermark of what was in the cache

# File lib/prefab/config_client.rb, line 76
def load_checkpoint
  success = load_checkpoint_from_config

  if !success
    @base_client.log_internal Logger::INFO, "Fallback to S3"
    load_checkpoint_from_s3
  end

rescue => e
  @base_client.log_internal Logger::WARN, "Unexpected problem loading checkpoint #{e}"
end
load_checkpoint_from_config() click to toggle source
# File lib/prefab/config_client.rb, line 88
def load_checkpoint_from_config
  config_req = Prefab::ConfigServicePointer.new(account_id: @base_client.account_id,
                                                start_at_id: @config_loader.highwater_mark)
  resp = stub.get_all_config(config_req)
  load_deltas(resp, :api)
  resp.deltas.each do |delta|
    @config_loader.set(delta)
  end
  @config_resolver.update
  finish_init!(:api)
  true
rescue => e
  @base_client.log_internal Logger::WARN, "Unexpected problem loading checkpoint #{e}"
  false
end
load_checkpoint_from_s3() click to toggle source
# File lib/prefab/config_client.rb, line 104
def load_checkpoint_from_s3
  url = "#{@s3_cloud_front}/#{@base_client.api_key.gsub("|", "/")}"
  resp = Faraday.get url
  if resp.status == 200
    deltas = Prefab::ConfigDeltas.decode(resp.body)
    load_deltas(deltas, :s3)
  else
    @base_client.log_internal Logger::INFO, "No S3 checkpoint. Response #{resp.status} Plan may not support this."
  end
end
load_deltas(deltas, source) click to toggle source
# File lib/prefab/config_client.rb, line 116
def load_deltas(deltas, source)
  deltas.deltas.each do |delta|
    @config_loader.set(delta)
  end
  @base_client.log_internal Logger::INFO, "Found checkpoint with highwater id #{@config_loader.highwater_mark} from #{source}"
  @base_client.stats.increment("prefab.config.checkpoint.load")
  @config_resolver.update
  finish_init!(source)
end
start_api_connection_thread(start_at_id) click to toggle source

Setup a streaming connection to the API Save new config values into the loader

# File lib/prefab/config_client.rb, line 155
def start_api_connection_thread(start_at_id)
  config_req = Prefab::ConfigServicePointer.new(account_id: @base_client.account_id,
                                                start_at_id: start_at_id)
  @base_client.log_internal Logger::DEBUG, "start api connection thread #{start_at_id}"
  @base_client.stats.increment("prefab.config.api.start")

  @api_connection_thread = Thread.new do
    at_exit do
      @streaming = false
      @cancellable_interceptor.cancel
    end

    while @streaming do
      begin
        resp = stub.get_config(config_req)
        resp.each do |r|
          r.deltas.each do |delta|
            @config_loader.set(delta)
          end
          @config_resolver.update
          finish_init!(:streaming)
        end
      rescue => e
        if @streaming
          level = e.code == 1 ? Logger::DEBUG : Logger::INFO
          @base_client.log_internal level, ("config client encountered #{e.message} pausing #{RECONNECT_WAIT}")
          reset
          sleep(RECONNECT_WAIT)
        end
      end
    end
  end

end
start_checkpointing_thread() click to toggle source

A thread that checks for a checkpoint

# File lib/prefab/config_client.rb, line 127
def start_checkpointing_thread
  Thread.new do
    loop do
      begin
        load_checkpoint

        started_at = Time.now
        delta = @checkpoint_freq_secs - (Time.now - started_at)
        if delta > 0
          sleep(delta)
        end
      rescue StandardError => exn
        @base_client.log_internal Logger::INFO, "Issue Checkpointing #{exn.message}"
      end
    end
  end
end
stub() click to toggle source
# File lib/prefab/config_client.rb, line 67
def stub
  @_stub = Prefab::ConfigService::Stub.new(nil,
                                           nil,
                                           channel_override: @base_client.channel,
                                           interceptors: [@base_client.interceptor, @cancellable_interceptor])
end