class Sidekiq::Throttled::Communicator

Inter-process communication for sidekiq. It starts listener thread on sidekiq server and listens for incoming messages.

@example

# Add incoming message handler for server
Communicator.instance.receive "knock" do |who|
  puts "#{who}'s knocking on the door"
end

# Emit message from console
Sidekiq.redis do |conn|
  Communicator.instance.transmit(conn, "knock", "ixti")
end

Constants

CHANNEL_NAME

Redis PUB/SUB channel name

@see redis.io/topics/pubsub

Public Class Methods

new() click to toggle source

Initializes singleton instance.

# File lib/sidekiq/throttled/communicator.rb, line 36
def initialize
  @callbacks = Callbacks.new
  @listener  = nil
  @mutex     = Mutex.new
end

Public Instance Methods

ready() { || ... } click to toggle source

Communicator readiness hook.

@yield Runs given block every time listener thread subscribes

to Redis pub/sub channel.

@return [void]

# File lib/sidekiq/throttled/communicator.rb, line 110
def ready(&handler)
  @callbacks.on("ready", &handler)
  yield if @listener&.ready?
end
receive(message, &handler) click to toggle source

Add incoming message handler.

@example

Communicator.instance.receive "knock" do |payload|
  # do something upon `knock` message
end

@param [#to_s] message @yield [payload] Runs given block everytime `message` being received. @yieldparam [Object, nil] payload Payload that was transmitted @yieldreturn [void] @return [void]

# File lib/sidekiq/throttled/communicator.rb, line 101
def receive(message, &handler)
  @callbacks.on("message:#{message}", &handler)
end
setup!() click to toggle source

Configures Sidekiq server to start/stop listener thread.

@private @return [void]

# File lib/sidekiq/throttled/communicator.rb, line 65
def setup!
  Sidekiq.configure_server do |config|
    config.on(:startup) { start_listener }
    config.on(:quiet) { stop_listener }
  end
end
start_listener() click to toggle source

Starts listener thread.

@return [void]

# File lib/sidekiq/throttled/communicator.rb, line 45
def start_listener
  @mutex.synchronize do
    @listener ||= Listener.new(CHANNEL_NAME, @callbacks)
  end
end
stop_listener() click to toggle source

Stops listener thread.

@return [void]

# File lib/sidekiq/throttled/communicator.rb, line 54
def stop_listener
  @mutex.synchronize do
    @listener&.stop
    @listener = nil
  end
end
transmit(redis, message, payload = nil) click to toggle source

Transmit message to listeners.

@example

Sidekiq.redis do |conn|
  Communicator.instance.transmit(conn, "knock")
end

@param [Redis] redis Redis client @param [#to_s] message @param [Object] payload @return [void]

# File lib/sidekiq/throttled/communicator.rb, line 84
def transmit(redis, message, payload = nil)
  redis.publish(CHANNEL_NAME, Marshal.dump([message.to_s, payload]))
end