class Sidekiq::Throttled::Communicator::Listener

Redis subscription listener thread.

@private

Public Class Methods

new(channel, callbacks) click to toggle source

Starts listener thread.

@param [String] channel Redis pub/sub channel to listen @param [Callbacks] callbacks Message callbacks registry

Calls superclass method
# File lib/sidekiq/throttled/communicator/listener.rb, line 18
def initialize(channel, callbacks)
  @channel    = channel
  @callbacks  = callbacks
  @terminated = false
  @subscribed = false

  super { listen until @terminated }
end

Public Instance Methods

listening?() click to toggle source

Whenever main loop is still running.

@return [Boolean]

# File lib/sidekiq/throttled/communicator/listener.rb, line 37
def listening?
  !@terminated
end
ready?() click to toggle source

Whenever underlying redis client subscribed to pub/sup channel.

@return [Boolean]

# File lib/sidekiq/throttled/communicator/listener.rb, line 30
def ready?
  @subscribed
end
stop() click to toggle source

Stops listener.

@return [void]

# File lib/sidekiq/throttled/communicator/listener.rb, line 44
def stop
  # Raising exception while client is in subscription mode makes
  # redis close connection and thus causing ConnectionPool reopen
  # it (normal mode). Otherwise subscription mode client will be
  # pushed back to ConnectionPool causing problems.
  raise Sidekiq::Shutdown
end

Private Instance Methods

listen() click to toggle source

Wraps {#subscribe} with exception handlers:

  • `Sidekiq::Shutdown` exception marks listener as stopped and returns making `while` loop of listener thread terminate.

  • `StandardError` got recorded to the log and swallowed, making `while` loop of the listener thread restart.

  • `Exception` is recorded to the log and re-raised.

@return [void]

# File lib/sidekiq/throttled/communicator/listener.rb, line 65
def listen
  subscribe
rescue Sidekiq::Shutdown
  @terminated = true
  @subscribed = false
rescue StandardError => e # rubocop:disable Style/RescueStandardError
  @subscribed = false
  handle_exception(e, { :context => "sidekiq:throttled" })
  sleep 1
rescue Exception => e # rubocop:disable Lint/RescueException
  @terminated = true
  @subscribed = false
  handle_exception(e, { :context => "sidekiq:throttled" })
  raise
end
subscribe() click to toggle source

Subscribes to channel and triggers all registerd handlers for received messages.

@note Puts thread's Redis connection to subscription mode and

locks thread.

@see redis.io/topics/pubsub @see redis.io/commands/subscribe @see Callbacks#run @return [void]

# File lib/sidekiq/throttled/communicator/listener.rb, line 91
def subscribe
  Sidekiq.redis do |conn|
    conn.subscribe @channel do |on|
      on.subscribe do
        @subscribed = true
        @callbacks.run("ready")
      end

      on.message do |_channel, data|
        message, payload = Marshal.load(data)
        @callbacks.run("message:#{message}", payload)
      end
    end
  end
end