class Sidekiq::Throttled::Communicator
Inter-process communication for sidekiq. It starts listener thread on sidekiq server and listens for incoming messages.
@example
# Add incoming message handler for server Communicator.instance.receive "knock" do |who| puts "#{who}'s knocking on the door" end # Emit message from console Sidekiq.redis do |conn| Communicator.instance.transmit(conn, "knock", "ixti") end
Constants
- CHANNEL_NAME
Redis PUB/SUB channel name
Public Class Methods
Initializes singleton instance.
# File lib/sidekiq/throttled/communicator.rb, line 36 def initialize @callbacks = Callbacks.new @listener = nil @mutex = Mutex.new end
Public Instance Methods
Communicator
readiness hook.
@yield Runs given block every time listener thread subscribes
to Redis pub/sub channel.
@return [void]
# File lib/sidekiq/throttled/communicator.rb, line 110 def ready(&handler) @callbacks.on("ready", &handler) yield if @listener&.ready? end
Add incoming message handler.
@example
Communicator.instance.receive "knock" do |payload| # do something upon `knock` message end
@param [#to_s] message @yield [payload] Runs given block everytime `message` being received. @yieldparam [Object, nil] payload Payload that was transmitted @yieldreturn [void] @return [void]
# File lib/sidekiq/throttled/communicator.rb, line 101 def receive(message, &handler) @callbacks.on("message:#{message}", &handler) end
Configures Sidekiq
server to start/stop listener thread.
@private @return [void]
# File lib/sidekiq/throttled/communicator.rb, line 65 def setup! Sidekiq.configure_server do |config| config.on(:startup) { start_listener } config.on(:quiet) { stop_listener } end end
Starts listener thread.
@return [void]
# File lib/sidekiq/throttled/communicator.rb, line 45 def start_listener @mutex.synchronize do @listener ||= Listener.new(CHANNEL_NAME, @callbacks) end end
Stops listener thread.
@return [void]
# File lib/sidekiq/throttled/communicator.rb, line 54 def stop_listener @mutex.synchronize do @listener&.stop @listener = nil end end
Transmit message to listeners.
@example
Sidekiq.redis do |conn| Communicator.instance.transmit(conn, "knock") end
@param [Redis] redis Redis client @param [#to_s] message @param [Object] payload @return [void]
# File lib/sidekiq/throttled/communicator.rb, line 84 def transmit(redis, message, payload = nil) redis.publish(CHANNEL_NAME, Marshal.dump([message.to_s, payload])) end