class ShortBus::Service

ShortBus::Service tracks a registered service (subscriber)

Attributes

name[R]
threads[R]

Public Class Methods

new( debug: false, driver: nil, max_run_queue_size: 1_000_000, message_spec: nil, name: nil, recursive: false, publisher_spec: nil, service: nil, suppress_exception: false, thread_count: 1 ) click to toggle source
# File lib/short_bus/service.rb, line 13
def initialize(
  debug: false,
  driver: nil,
  max_run_queue_size: 1_000_000,
  message_spec: nil,
  name: nil,
  recursive: false,
  publisher_spec: nil,
  service: nil,
  suppress_exception: false,
  thread_count: 1
)
  @debug = debug
  @driver = driver
  @message_spec = message_spec ? Spec.new(message_spec) : nil
  @recursive = recursive
  @publisher_spec = publisher_spec ? Spec.new(publisher_spec) : nil
  @service = service
  @suppress_exception = suppress_exception
  @thread_count = thread_count

  @name = name || @service.to_s || OpenSSL::HMAC.new(rand.to_s, 'sha1').to_s
  @run_queue = SizedQueue.new(max_run_queue_size)
  @threads = []
  start
end

Public Instance Methods

check(message, dry_run = false) click to toggle source
# File lib/short_bus/service.rb, line 40
def check(message, dry_run = false)
  debug_message "[#{@name}]#check(#{message})#{' dry_run' if dry_run}#"
  if(
    (!@message_spec || @message_spec.match(message.to_s)) &&
    (!@publisher_spec || @publisher_spec.match(message.publisher)) &&
    (message.publisher != @name || @recursive)
  )
    @run_queue << message unless dry_run
  end
end
service_thread() click to toggle source

TODO: consider some mechanism to pass Exceptions up to the main thread,

perhaps with a whitelist, optional logging, something clean.
# File lib/short_bus/service.rb, line 53
def service_thread
  Thread.new do
    begin
      run_service @run_queue.shift until Thread.current.key?(:stop)
    rescue Exception => exc
      puts "Service [#{@name}] => #{exc.inspect}" unless @suppress_exception
      abort if exc.is_a? SystemExit
      retry unless Thread.current.key?(:stop)
    end
  end
end
start() click to toggle source
# File lib/short_bus/service.rb, line 65
def start
  @threads << service_thread while @threads.length < @thread_count
end
stop(when_to_kill = nil) click to toggle source
# File lib/short_bus/service.rb, line 69
def stop(when_to_kill = nil)
  @threads.each do |thread|
    if when_to_kill.is_a? Numeric
      begin
        Timeout.timeout(when_to_kill) { stop }
      rescue Timeout::Error
        stop :now
      end
    elsif when_to_kill == :now
      thread.kill
    else
      thread[:stop] = true
    end
  end
  @threads.delete_if(&:join)
end
stop!() click to toggle source
# File lib/short_bus/service.rb, line 86
def stop!
  stop :now
end
to_s() click to toggle source
# File lib/short_bus/service.rb, line 90
def to_s
  @name
end

Private Instance Methods

run_service(message) click to toggle source
# File lib/short_bus/service.rb, line 96
def run_service(message)
  debug_message "[#{@name}]#run_service(#{message}) -> #{@service.class.name} ##{@service.arity}"
  if @service.is_a?(Proc) || @service.is_a?(Method)
    if @service.arity == 0
      @driver.publish(@service.call, @name)
    elsif [1, -1, -2].include? @service.arity
      @driver.publish(@service.call(message), @name)
    else
      raise ArgumentError, "Service invalid arg count: #{@service.class.name}"
    end
  else
    raise ArgumentError, "Unknown service type: #{@service.class.name}"
  end
end