class Vissen::Input::Broker

Message broker that consumes a stream of messages and exposes a simple subscription interface.

Usage

This example subscribes to note messages on channel 1 and calls the fictitious method play when a matching message is published and processed.

broker = Broker.new
broker.subscribe Message::Note[1], priority: 1 do |message|
  play message.note
end

message = Message::Note.create 42, channel: 1
broker.publish message
broker.run_once

The next example sets up two different priority listeners, one of which blocks the other for some messages.

broker = Broker.new
broker.subscribe Message::Note[1], priority: 1 do |message|
  play message.note
end

broker.subscribe Message::Note[1], priority: 2 do |message, ctrl|
  ctrl.stop! if message.note % 2 == 0
end

Public Class Methods

new() click to toggle source
# File lib/vissen/input/broker.rb, line 34
def initialize
  @subscriptions = []
  @message_queue = Queue.new
end

Public Instance Methods

call(message, ctrl) click to toggle source

Processes one message. By design, implementing this method allows for multiple brokers being chained.

@param message [Message] the message to match against the

subscriptions.

@return [nil]

# File lib/vissen/input/broker.rb, line 101
def call(message, ctrl)
  # TODO: Remap the message if needed.
  @subscriptions.each do |subscription|
    break if ctrl.stop?(subscription.priority)

    subscription.match message do |msg|
      subscription.handle msg, ctrl
      message = msg
    end
  end
  nil
end
publish(*message) click to toggle source

Insert a new message into the message queue. The message is handled at a later time in `#run_once`.

@param message [Message, Hash] the message(s) to handle.

# File lib/vissen/input/broker.rb, line 78
def publish(*message)
  message.each do |m|
    @message_queue.push m
  end
end
run_once() click to toggle source

Takes one message from the message queue and handles it.

@return [true, false] true if the message_queue contained a message,

otherwise false.
# File lib/vissen/input/broker.rb, line 88
def run_once
  return false if @message_queue.empty?
  ctrl = PropagationControl.new
  call @message_queue.shift, ctrl
  true
end
subscribe(matcher, handler = nil, priority: 0, &block) click to toggle source

Register a callback for the broker to run when a message matched by the given matcher is published.

By specifying a priority a subscriber added after another can still be handled at an earlier time. The handler can either be specified as an object responding to `#call` or as a block.

@param matcher [#match?] the matcher that will be used to filter

messeges.

@param handler [#call] the handler that will be called when a matching

message is published. Mandatory unless a block is given.

@param priority [Integer] the priority determines when the subscription

will be matched against a published message in relation to other
subscriptions.

@return [Subscription] the new subscription object.

# File lib/vissen/input/broker.rb, line 54
def subscribe(matcher, handler = nil, priority: 0, &block)
  if block_given?
    raise ArgumentError if handler
    handler = block
  else
    raise ArgumentError unless handler
  end

  insert_subscription Subscription.new(matcher, handler, priority)
end
unsubscribe(subscription) click to toggle source

Removes the given subscription.

@param subscription [Subscription] the subscription to cancel. @return [Subscription, nil] the subscription that was cancelled, or nil

if the subscription was not found.
# File lib/vissen/input/broker.rb, line 70
def unsubscribe(subscription)
  @subscriptions.delete subscription
end

Private Instance Methods

insert_subscription(subscription) click to toggle source

Insert or append a new subscription to the list. The subscription will be placed before the first subscription that is found to have a lower priority, or last.

@param subscription [Subscription] the subscription to add. @return [Subscription] the subscription that was added.

# File lib/vissen/input/broker.rb, line 122
def insert_subscription(subscription)
  @subscriptions.each_with_index do |other, index|
    if other.priority < subscription.priority
      @subscriptions.insert index, subscription
      return subscription
    end
  end

  @subscriptions.push subscription
  subscription
end