class Xcflushd::Runner

Constants

DEFAULT_MAX_TERM_WAIT

Maximum time to wait for a graceful shutdown before becoming more aggressive at killing thread pools.

MAX_IDLING_SIGNAL_LATENCY

because Ruby is not providing us wakeup from sleep itself, we sleep in small intervals and check if we have been signalled

PRIORITY_SUBSCRIPTION_RETRY_WAIT

Amount of time to wait before retrying the subscription to the priority auth renewal pubsub channel.

Public Class Methods

run(opts = {}) click to toggle source
# File lib/xcflushd/runner.rb, line 23
def run(opts = {})
  setup_sighandlers

  @max_term_wait = opts[:max_term_wait] || DEFAULT_MAX_TERM_WAIT
  @logger = Logger.new(STDOUT)

  redis_host = opts[:redis].host
  redis_port = opts[:redis].port
  redis_driver = RUBY_ENGINE == 'jruby' ? :ruby : :hiredis

  redis = Redis.new(host: redis_host, port: redis_port, driver: redis_driver)
  storage = Storage.new(redis, @logger, StorageKeys)

  # The 3scale client shows a warning when using provider keys, because
  # they are deprecated in favor of service tokens. However, we need to
  # continue using provider keys, so we set warn_deprecated to false to
  # avoid generating unnecessary warnings.
  threescale = ThreeScale::Client.new(provider_key: opts[:provider_key],
                                      host: opts[:backend].host,
                                      port: opts[:backend].port ||
                                        (opts[:secure] ? 443 : 80),
                                      secure: opts[:secure],
                                      persistent: true,
                                      warn_deprecated: false)

  reporter = Reporter.new(threescale)
  authorizer = Authorizer.new(threescale)

  redis_pub = Redis.new(host: redis_host, port: redis_port, driver: redis_driver)
  redis_sub = Redis.new(host: redis_host, port: redis_port, driver: redis_driver)

  auth_ttl = opts[:auth_ttl]

  error_handler = FlusherErrorHandler.new(@logger, storage)
  @flusher = Flusher.new(reporter, authorizer, storage,
                         auth_ttl, error_handler, @logger,
                         flusher_threads(opts[:threads]))

  @prio_auth_renewer = PriorityAuthRenewer.new(authorizer, storage,
                                               redis_pub, redis_sub,
                                               auth_ttl, @logger,
                                               prio_threads(opts[:prio_threads]))

  @prio_auth_renewer_thread = start_priority_auth_renewer

  flush_periodically(opts[:frequency])
end

Private Class Methods

flush_periodically(flush_freq) click to toggle source
# File lib/xcflushd/runner.rb, line 94
def flush_periodically(flush_freq)
  loop do
    break if @exit
    begin
      @logger.info('Flushing...')
      flusher_start = Time.now
      next_flush = flusher_start + flush_freq
      @flusher.flush
      flusher_runtime = Time.now - flusher_start
      @logger.info("Flush completed in #{flusher_runtime} seconds")
    rescue StandardError => e
      # Let's make sure that we treat all the standard errors to ensure that
      # the flusher keeps running.
      @logger.error(e)
    end
    loop do
      # sleep in small intervals to check if signalled
      break if @exit
      time_remaining = next_flush - Time.now
      break if time_remaining <= 0
      sleep([MAX_IDLING_SIGNAL_LATENCY, time_remaining].min)
    end
  end
  @logger.info('Exiting')
rescue SignalException => e
  @logger.fatal("Received unhandled signal #{e.cause}, shutting down")
rescue Exception => e
  @logger.fatal("Unhandled exception #{e.class}, shutting down: #{e.cause} - #{e}")
ensure
  shutdown
end
flusher_threads(opts_threads) click to toggle source
# File lib/xcflushd/runner.rb, line 73
def flusher_threads(opts_threads)
  opts_threads ? opts_threads.max : Threading.default_threads
end
prio_threads(opts_prio_threads) click to toggle source
# File lib/xcflushd/runner.rb, line 77
def prio_threads(opts_prio_threads)
  opts_prio_threads ? opts_prio_threads.max : Threading.default_threads
end
setup_sighandlers() click to toggle source
# File lib/xcflushd/runner.rb, line 178
def setup_sighandlers
  @exit = false
  ['HUP', 'USR1', 'USR2'].each do |sig|
    Signal.trap(sig, "SIG_IGN")
  end
  ['EXIT', 'TERM', 'INT'].each do |sig|
    Signal.trap(sig) { @exit = true }
  end
end
shutdown() click to toggle source

Shutting down xcflushd

We issue shutdown commands to the thread pools in the auth renewer and the flusher, wait a bit for a graceful termination and then proceed with more drastic ways.

Note that there is no @prio_auth_renewer_thread.join(timeout).

This is because that thread is blocked in the Redis pubsub mechanism. Since that is handled by the Redis gem and there is no way to exit it unless an unhandled exception is raised or an explicit unsubscribe command is issued from within one of the pubsub message handlers, we can't do much to issue an unsubscribe command (it would be issued from an external place and would block on the Redis gem's internal synchronization primitives).

Therefore if we did the join we would be wasting that time once the thread pool is terminated, so we just go ahead and kill the thread right away (in terminate).

# File lib/xcflushd/runner.rb, line 146
def shutdown
  shutdown_deadline = Time.now + @max_term_wait
  tasks = [@prio_auth_renewer, @flusher]
  tasks.each do |task|
    with_logged_shutdown { task.shutdown }
  end
  tasks.each do |task|
    with_logged_shutdown do
      task.wait_for_termination(shutdown_deadline - Time.now)
    end
  end
ensure
  terminate
end
start_priority_auth_renewer() click to toggle source
# File lib/xcflushd/runner.rb, line 81
def start_priority_auth_renewer
  Thread.new do
    loop do
      break if @exit
      begin
        @prio_auth_renewer.start
      rescue StandardError
        sleep PRIORITY_SUBSCRIPTION_RETRY_WAIT
      end
    end
  end
end
terminate() click to toggle source
# File lib/xcflushd/runner.rb, line 161
def terminate
  [@prio_auth_renewer, @flusher, @prio_auth_renewer_thread].each do |task|
    with_logged_shutdown { task.terminate }
  end
end
with_logged_shutdown() { || ... } click to toggle source
# File lib/xcflushd/runner.rb, line 167
def with_logged_shutdown
  yield
rescue Exception => e
  begin
    @logger.error("while shutting down: #{e.class}, cause #{e.cause} - #{e}")
  rescue Exception
    # we want to avoid barfing if logger also breaks so that further
    # processing can continue.
  end
end