class Sidekiq::Throttled::Communicator::Listener
Redis subscription listener thread.
@private
Public Class Methods
Starts listener thread.
@param [String] channel Redis pub/sub channel to listen @param [Callbacks] callbacks Message callbacks registry
# File lib/sidekiq/throttled/communicator/listener.rb, line 18 def initialize(channel, callbacks) @channel = channel @callbacks = callbacks @terminated = false @subscribed = false super { listen until @terminated } end
Public Instance Methods
Whenever main loop is still running.
@return [Boolean]
# File lib/sidekiq/throttled/communicator/listener.rb, line 37 def listening? !@terminated end
Whenever underlying redis client subscribed to pub/sup channel.
@return [Boolean]
# File lib/sidekiq/throttled/communicator/listener.rb, line 30 def ready? @subscribed end
Stops listener.
@return [void]
# File lib/sidekiq/throttled/communicator/listener.rb, line 44 def stop # Raising exception while client is in subscription mode makes # redis close connection and thus causing ConnectionPool reopen # it (normal mode). Otherwise subscription mode client will be # pushed back to ConnectionPool causing problems. raise Sidekiq::Shutdown end
Private Instance Methods
Wraps {#subscribe} with exception handlers:
-
`Sidekiq::Shutdown` exception marks listener as stopped and returns making `while` loop of listener thread terminate.
-
`StandardError` got recorded to the log and swallowed, making `while` loop of the listener thread restart.
-
`Exception` is recorded to the log and re-raised.
@return [void]
# File lib/sidekiq/throttled/communicator/listener.rb, line 65 def listen subscribe rescue Sidekiq::Shutdown @terminated = true @subscribed = false rescue StandardError => e # rubocop:disable Style/RescueStandardError @subscribed = false handle_exception(e, { :context => "sidekiq:throttled" }) sleep 1 rescue Exception => e # rubocop:disable Lint/RescueException @terminated = true @subscribed = false handle_exception(e, { :context => "sidekiq:throttled" }) raise end
Subscribes to channel and triggers all registerd handlers for received messages.
@note Puts thread's Redis connection to subscription mode and
locks thread.
@see redis.io/topics/pubsub @see redis.io/commands/subscribe @see Callbacks#run
@return [void]
# File lib/sidekiq/throttled/communicator/listener.rb, line 91 def subscribe Sidekiq.redis do |conn| conn.subscribe @channel do |on| on.subscribe do @subscribed = true @callbacks.run("ready") end on.message do |_channel, data| message, payload = Marshal.load(data) @callbacks.run("message:#{message}", payload) end end end end