class Vissen::Input::Broker
Message
broker that consumes a stream of messages and exposes a simple subscription interface.
Usage¶ ↑
This example subscribes to note messages on channel 1 and calls the fictitious method play when a matching message is published and processed.
broker = Broker.new broker.subscribe Message::Note[1], priority: 1 do |message| play message.note end message = Message::Note.create 42, channel: 1 broker.publish message broker.run_once
The next example sets up two different priority listeners, one of which blocks the other for some messages.
broker = Broker.new broker.subscribe Message::Note[1], priority: 1 do |message| play message.note end broker.subscribe Message::Note[1], priority: 2 do |message, ctrl| ctrl.stop! if message.note % 2 == 0 end
Public Class Methods
# File lib/vissen/input/broker.rb, line 34 def initialize @subscriptions = [] @message_queue = Queue.new end
Public Instance Methods
Processes one message. By design, implementing this method allows for multiple brokers being chained.
@param message [Message] the message to match against the
subscriptions.
@return [nil]
# File lib/vissen/input/broker.rb, line 101 def call(message, ctrl) # TODO: Remap the message if needed. @subscriptions.each do |subscription| break if ctrl.stop?(subscription.priority) subscription.match message do |msg| subscription.handle msg, ctrl message = msg end end nil end
Insert a new message into the message queue. The message is handled at a later time in `#run_once`.
@param message [Message, Hash] the message(s) to handle.
# File lib/vissen/input/broker.rb, line 78 def publish(*message) message.each do |m| @message_queue.push m end end
Takes one message from the message queue and handles it.
@return [true, false] true if the message_queue contained a message,
otherwise false.
# File lib/vissen/input/broker.rb, line 88 def run_once return false if @message_queue.empty? ctrl = PropagationControl.new call @message_queue.shift, ctrl true end
Register a callback for the broker to run when a message matched by the given matcher is published.
By specifying a priority a subscriber added after another can still be handled at an earlier time. The handler can either be specified as an object responding to `#call` or as a block.
@param matcher [#match?] the matcher that will be used to filter
messeges.
@param handler [#call] the handler that will be called when a matching
message is published. Mandatory unless a block is given.
@param priority [Integer] the priority determines when the subscription
will be matched against a published message in relation to other subscriptions.
@return [Subscription] the new subscription object.
# File lib/vissen/input/broker.rb, line 54 def subscribe(matcher, handler = nil, priority: 0, &block) if block_given? raise ArgumentError if handler handler = block else raise ArgumentError unless handler end insert_subscription Subscription.new(matcher, handler, priority) end
Removes the given subscription.
@param subscription [Subscription] the subscription to cancel. @return [Subscription, nil] the subscription that was cancelled, or nil
if the subscription was not found.
# File lib/vissen/input/broker.rb, line 70 def unsubscribe(subscription) @subscriptions.delete subscription end
Private Instance Methods
Insert or append a new subscription to the list. The subscription will be placed before the first subscription that is found to have a lower priority, or last.
@param subscription [Subscription] the subscription to add. @return [Subscription] the subscription that was added.
# File lib/vissen/input/broker.rb, line 122 def insert_subscription(subscription) @subscriptions.each_with_index do |other, index| if other.priority < subscription.priority @subscriptions.insert index, subscription return subscription end end @subscriptions.push subscription subscription end