class ShortBus::Service
ShortBus::Service
tracks a registered service (subscriber)
Attributes
name[R]
threads[R]
Public Class Methods
new( debug: false, driver: nil, max_run_queue_size: 1_000_000, message_spec: nil, name: nil, recursive: false, publisher_spec: nil, service: nil, suppress_exception: false, thread_count: 1 )
click to toggle source
# File lib/short_bus/service.rb, line 13 def initialize( debug: false, driver: nil, max_run_queue_size: 1_000_000, message_spec: nil, name: nil, recursive: false, publisher_spec: nil, service: nil, suppress_exception: false, thread_count: 1 ) @debug = debug @driver = driver @message_spec = message_spec ? Spec.new(message_spec) : nil @recursive = recursive @publisher_spec = publisher_spec ? Spec.new(publisher_spec) : nil @service = service @suppress_exception = suppress_exception @thread_count = thread_count @name = name || @service.to_s || OpenSSL::HMAC.new(rand.to_s, 'sha1').to_s @run_queue = SizedQueue.new(max_run_queue_size) @threads = [] start end
Public Instance Methods
check(message, dry_run = false)
click to toggle source
# File lib/short_bus/service.rb, line 40 def check(message, dry_run = false) debug_message "[#{@name}]#check(#{message})#{' dry_run' if dry_run}#" if( (!@message_spec || @message_spec.match(message.to_s)) && (!@publisher_spec || @publisher_spec.match(message.publisher)) && (message.publisher != @name || @recursive) ) @run_queue << message unless dry_run end end
service_thread()
click to toggle source
TODO: consider some mechanism to pass Exceptions up to the main thread,
perhaps with a whitelist, optional logging, something clean.
# File lib/short_bus/service.rb, line 53 def service_thread Thread.new do begin run_service @run_queue.shift until Thread.current.key?(:stop) rescue Exception => exc puts "Service [#{@name}] => #{exc.inspect}" unless @suppress_exception abort if exc.is_a? SystemExit retry unless Thread.current.key?(:stop) end end end
start()
click to toggle source
# File lib/short_bus/service.rb, line 65 def start @threads << service_thread while @threads.length < @thread_count end
stop(when_to_kill = nil)
click to toggle source
# File lib/short_bus/service.rb, line 69 def stop(when_to_kill = nil) @threads.each do |thread| if when_to_kill.is_a? Numeric begin Timeout.timeout(when_to_kill) { stop } rescue Timeout::Error stop :now end elsif when_to_kill == :now thread.kill else thread[:stop] = true end end @threads.delete_if(&:join) end
stop!()
click to toggle source
# File lib/short_bus/service.rb, line 86 def stop! stop :now end
to_s()
click to toggle source
# File lib/short_bus/service.rb, line 90 def to_s @name end
Private Instance Methods
run_service(message)
click to toggle source
# File lib/short_bus/service.rb, line 96 def run_service(message) debug_message "[#{@name}]#run_service(#{message}) -> #{@service.class.name} ##{@service.arity}" if @service.is_a?(Proc) || @service.is_a?(Method) if @service.arity == 0 @driver.publish(@service.call, @name) elsif [1, -1, -2].include? @service.arity @driver.publish(@service.call(message), @name) else raise ArgumentError, "Service invalid arg count: #{@service.class.name}" end else raise ArgumentError, "Unknown service type: #{@service.class.name}" end end