class SidekiqPublisher::Runner

Constants

CHANNEL_NAME
LISTENER_TIMEOUT_SECONDS

Attributes

instrumenter[R]
publisher[R]

Public Class Methods

new(instrumenter = Instrumenter.new) click to toggle source
# File lib/sidekiq_publisher/runner.rb, line 14
def initialize(instrumenter = Instrumenter.new)
  @instrumenter = instrumenter
  @publisher = Publisher.new(instrumenter: @instrumenter)
end
run(instrumenter = Instrumenter.new) click to toggle source
# File lib/sidekiq_publisher/runner.rb, line 10
def self.run(instrumenter = Instrumenter.new)
  new(instrumenter).run
end

Public Instance Methods

run() click to toggle source
# File lib/sidekiq_publisher/runner.rb, line 19
def run
  ActiveRecord::PostgresPubSub::Listener.listen(
    CHANNEL_NAME,
    listen_timeout: LISTENER_TIMEOUT_SECONDS
  ) do |listener|
    listener.on_start { call_publish("start") }
    listener.on_notify { call_publish("notify") }
    listener.on_timeout { listener_timeout }
  end
end

Private Instance Methods

call_publish(event) click to toggle source
# File lib/sidekiq_publisher/runner.rb, line 34
def call_publish(event)
  instrumenter.instrument("#{event}.publisher") do
    publisher.publish
  end
end
listener_timeout() click to toggle source
# File lib/sidekiq_publisher/runner.rb, line 40
def listener_timeout
  instrumenter.instrument("timeout.listener") do
    if Job.unpublished.exists?
      SidekiqPublisher.logger&.warn(
        "#{self.class.name}: msg='publishing pending jobs at timeout'"
      )
      call_publish("timeout")
    else
      Job.purge_expired_published!(instrumenter: instrumenter)
    end
  end
end