class ShortBus::Driver

ShorBus::Driver is the message dispatcher.

Attributes

debug[RW]
services[R]

Public Class Methods

new(*options) click to toggle source

Example:

Arguments:

options: hash
# File lib/short_bus/driver.rb, line 17
def initialize(*options)
  @options = {
    debug: false,
    default_message_spec: nil,
    default_publisher_spec: nil,
    default_thread_count: 1,
    max_message_queue_size: 1_000_000
  }
  @options.merge! options[0] if options[0].is_a?(Hash)
  @debug = @options[:debug]

  @messages = SizedQueue.new(@options[:max_message_queue_size])
  @services = {}
  @threads = { message_router: launch_message_router }
end

Public Instance Methods

<<(arg, publisher = nil)
Alias for: publish
publish(arg, publisher = nil) click to toggle source
# File lib/short_bus/driver.rb, line 55
def publish(arg, publisher = nil)
  return unless (message = convert_to_message arg)
  message.publisher = publisher if publisher
  @messages.push message
  message
end
Also aliased as: <<
subscribe(*args, &block) click to toggle source

Subscribes a callback (lamba, block, method) to receive messages

@param [*args] @return [ShortBus::Service] Service object that was created and registered

# File lib/short_bus/driver.rb, line 38
def subscribe(*args, &block)
  service_args = {
    debug: @debug,
    driver: self,
    message_spec: @options[:default_message_spec],
    name: nil,
    publisher_spec: @options[:default_publisher_spec],
    service: nil,
    thread_count: @options[:default_thread_count]
  }.merge args[0].is_a?(Hash) ? args[0] : { service: args[0] }

  service_args[:service] = block.to_proc if block_given?
  debug_message("#subscribe service: #{service_args[:service]}")
  service = Service.new(service_args)
  @services[service.to_s] = service
end
unsubscribe(service) click to toggle source
# File lib/short_bus/driver.rb, line 64
def unsubscribe(service)
  if service.is_a? ShortBus::Service
    unsubscribe service.to_s
  elsif @services.key?(service)
    @services[service].stop
    @services.delete service
  end
end

Private Instance Methods

convert_to_message(arg) click to toggle source
# File lib/short_bus/driver.rb, line 75
def convert_to_message(arg)
  if arg.is_a? ShortBus::Message
    arg
  elsif arg.is_a? String
    Message.new(arg)
  elsif arg.is_a?(Array) && arg[0].is_a?(String)
    Message.new(arg)
  elsif arg.is_a?(Hash) && arg.key?(:message) && arg[:message]
    publisher = arg.key?(:publisher) ? arg[:publisher] : nil
    payload = arg.key?(:payload) ? arg[:payload] : nil
    Message.new(message: arg[:message], payload: payload, publisher: publisher)
  end
end
launch_message_router() click to toggle source
# File lib/short_bus/driver.rb, line 89
def launch_message_router
  Thread.new do
    loop do
      message = @messages.shift
      debug_message "route_message(#{message})"
      @services.values.each { |service| service.check message }
    end
  end
end