class SidekiqPublisher::Publisher
Attributes
client[R]
instrumenter[R]
job_class_cache[R]
Public Class Methods
new(instrumenter: Instrumenter.new)
click to toggle source
# File lib/sidekiq_publisher/publisher.rb, line 8 def initialize(instrumenter: Instrumenter.new) @instrumenter = instrumenter @client = SidekiqPublisher::Client.new @job_class_cache = {} end
Public Instance Methods
publish()
click to toggle source
# File lib/sidekiq_publisher/publisher.rb, line 14 def publish Job.unpublished_batches do |batch| instrumenter.instrument("publish_batch.publisher") do items = batch.map do |job| { "jid" => job[:job_id], "class" => lookup_job_class(job[:job_class]), "args" => job[:args], "at" => job[:run_at], "queue" => job[:queue], "wrapped" => job[:wrapped], "created_at" => job[:created_at].to_f, }.tap(&:compact!) end instrumenter.instrument("enqueue_batch.publisher") do |notification| enqueue_batch(batch, items, notification) end end end purge_expired_published_jobs rescue StandardError => ex failure_warning(__method__, ex) end
Private Instance Methods
enqueue_batch(batch, items, notification)
click to toggle source
# File lib/sidekiq_publisher/publisher.rb, line 43 def enqueue_batch(batch, items, notification) pushed_count = client.bulk_push(items) published_count = update_jobs_as_published!(batch) rescue StandardError => ex failure_warning(__method__, ex) ensure published_count = update_jobs_as_published!(batch) if pushed_count.present? && published_count.nil? notification[:published_count] = published_count if published_count.present? end
failure_warning(method, ex)
click to toggle source
# File lib/sidekiq_publisher/publisher.rb, line 71 def failure_warning(method, ex) logger.warn("#{self.class.name}: msg=\"#{method} failed\" error=#{ex.class} error_msg=#{ex.message.inspect}\n") instrumenter.instrument("error.publisher", exception_object: ex, exception: [ex.class.name, ex.message]) end
logger()
click to toggle source
# File lib/sidekiq_publisher/publisher.rb, line 77 def logger SidekiqPublisher.logger end
lookup_job_class(name)
click to toggle source
# File lib/sidekiq_publisher/publisher.rb, line 53 def lookup_job_class(name) job_class_cache.fetch(name) do job_class_cache[name] = name.constantize end end
perform_purge?()
click to toggle source
# File lib/sidekiq_publisher/publisher.rb, line 67 def perform_purge? rand(100).zero? end
purge_expired_published_jobs()
click to toggle source
# File lib/sidekiq_publisher/publisher.rb, line 63 def purge_expired_published_jobs Job.purge_expired_published!(instrumenter: instrumenter) if perform_purge? end
update_jobs_as_published!(jobs)
click to toggle source
# File lib/sidekiq_publisher/publisher.rb, line 59 def update_jobs_as_published!(jobs) Job.published!(jobs.map { |job| job[:id] }) end