class Xcflushd::Flusher
Constants
- WAIT_TIME_REPORT_AUTH
- XcflushdError
Attributes
auth_ttl[R]
error_handler[R]
logger[R]
reporter[R]
storage[R]
thread_pool[R]
Public Class Methods
new(reporter, authorizer, storage, auth_ttl, error_handler, logger, threads)
click to toggle source
# File lib/xcflushd/flusher.rb, line 11 def initialize(reporter, authorizer, storage, auth_ttl, error_handler, logger, threads) @reporter = reporter @authorizer = authorizer @storage = storage @auth_ttl = auth_ttl @error_handler = error_handler @logger = logger @thread_pool = Concurrent::FixedThreadPool.new(threads) end
Public Instance Methods
flush()
click to toggle source
TODO: decide if we want to renew the authorizations every time.
# File lib/xcflushd/flusher.rb, line 34 def flush reports_to_flush = run_and_log_time('Getting the reports from Redis') do reports end run_and_log_time('Reporting to 3scale') { report(reports_to_flush) } # Ideally, we would like to ensure that once we start checking # authorizations, they have taken into account the reports that we just # performed. However, in 3scale, reports are asynchronous and the current # API does not provide a way to know whether a report has already been # processed. # For now, let's just wait a few seconds. This will greatly mitigate the # problem. run_and_log_time('Giving reports some time to be processed') do sleep(WAIT_TIME_REPORT_AUTH) end auths = run_and_log_time('Getting the auths from 3scale') do authorizations(reports_to_flush) end run_and_log_time('Renewing the auths in Redis') { renew(auths) } end
shutdown()
click to toggle source
# File lib/xcflushd/flusher.rb, line 21 def shutdown @thread_pool.shutdown end
terminate()
click to toggle source
# File lib/xcflushd/flusher.rb, line 29 def terminate @thread_pool.kill end
wait_for_termination(secs = nil)
click to toggle source
# File lib/xcflushd/flusher.rb, line 25 def wait_for_termination(secs = nil) @thread_pool.wait_for_termination(secs) end
Private Instance Methods
async_report_tasks(reports)
click to toggle source
# File lib/xcflushd/flusher.rb, line 116 def async_report_tasks(reports) reports.map do |report| task = Concurrent::Future.new(executor: thread_pool) do reporter.report(report[:service_id], report[:credentials], report[:usage]) end [report, task] end.to_h end
renew(authorizations)
click to toggle source
# File lib/xcflushd/flusher.rb, line 103 def renew(authorizations) authorizations.each do |authorization| begin storage.renew_auths(authorization[:service_id], authorization[:credentials], authorization[:auths], auth_ttl) rescue Storage::RenewAuthError => e error_handler.handle_renew_auth_error(e) end end end
report(reports)
click to toggle source
# File lib/xcflushd/flusher.rb, line 68 def report(reports) report_tasks = async_report_tasks(reports) report_tasks.values.each(&:execute) report_tasks.values.each(&:value) # blocks until all finish failed = report_tasks.select { |_report, task| task.rejected? } .map { |report, task| [report, task.reason] } .to_h error_handler.handle_report_errors(failed) unless failed.empty? end
reports()
click to toggle source
# File lib/xcflushd/flusher.rb, line 64 def reports storage.reports_to_flush end
run_and_log_time(action, &blk)
click to toggle source
# File lib/xcflushd/flusher.rb, line 146 def run_and_log_time(action, &blk) t = Time.now res = blk.call logger.debug("#{action} took #{(Time.now - t).round(3)} seconds") res end