class Sidekiq::Throttled::Communicator::Callbacks

Callbacks registry and runner. Runs registered callbacks in dedicated Fiber solving issue with ConnectionPool and Redis client in subscriber mode.

Once Redis entered subscriber mode `#subscribe` method, it can't be used for any command but pub/sub or quit, making it impossible to use for anything else. ConnectionPool binds reserved client to Thread, thus nested `#with` calls inside same thread result into a same connection. That makes it impossible to issue any normal Redis commands from within listener Thread.

@private

Public Class Methods

new() click to toggle source

Initializes callbacks registry.

# File lib/sidekiq/throttled/communicator/callbacks.rb, line 26
def initialize
  @mutex    = Mutex.new
  @handlers = Hash.new { |h, k| h[k] = [] }
end

Public Instance Methods

on(event, &handler) click to toggle source

Registers handler of given event.

@example

callbacks.on "and out comes wolves" do |who|
  puts "#{who} let the dogs out?!"
end

@param [#to_s] event @raise [ArgumentError] if no handler block given @yield [*args] Runs given block upon `event` @yieldreturn [void] @return [self]

# File lib/sidekiq/throttled/communicator/callbacks.rb, line 44
def on(event, &handler)
  raise ArgumentError, "No block given" unless handler

  @mutex.synchronize { @handlers[event.to_s] << handler }
  self
end
run(event, payload = nil) click to toggle source

Runs event handlers with given args.

@param [#to_s] event @param [Object] payload @return [void]

# File lib/sidekiq/throttled/communicator/callbacks.rb, line 56
def run(event, payload = nil)
  @mutex.synchronize do
    Fiber.new do
      @handlers[event.to_s].each do |callback|
        begin
          callback.call(payload)
        rescue => e
          handle_exception(e, {
            :context => "sidekiq:throttled"
          })
        end
      end
    end.resume
  end
end