class FiniteMachine::MessageQueue
Allows for storage of asynchronous messages such as events and callbacks.
Used internally by {Observer} and {StateMachine}
@api private
Public Class Methods
Initialize an event queue in separate thread
@example
MessageQueue.new
@api public
# File lib/finite_machine/message_queue.rb, line 20 def initialize @not_empty = ConditionVariable.new @mutex = Mutex.new @queue = Queue.new @dead = false @listeners = [] @thread = nil end
Public Instance Methods
Add asynchronous event to the event queue to process
@example
event_queue << AsyncCall.build(...)
@param [AsyncCall] event
@return [nil]
@api public
# File lib/finite_machine/message_queue.rb, line 62 def <<(event) @mutex.synchronize do if @dead discard_message(event) else @queue << event @not_empty.signal end end end
Check if the event queue is alive
@example
event_queue.alive?
@return [Boolean]
@api public
# File lib/finite_machine/message_queue.rb, line 102 def alive? @mutex.synchronize { !@dead } end
Check if there are any events to handle
@example
event_queue.empty?
@api public
# File lib/finite_machine/message_queue.rb, line 90 def empty? @mutex.synchronize { @queue.empty? } end
# File lib/finite_machine/message_queue.rb, line 159 def inspect @mutex.synchronize do "#<#{self.class}:#{object_id.to_s(16)} @size=#{size}, @dead=#{@dead}>" end end
Join the event queue from current thread
@param [Fixnum] timeout
@example
event_queue.join
@return [nil, Thread]
@api public
# File lib/finite_machine/message_queue.rb, line 116 def join(timeout = nil) return unless @thread timeout.nil? ? @thread.join : @thread.join(timeout) end
# File lib/finite_machine/message_queue.rb, line 48 def running? !@thread.nil? && alive? end
Shut down this event queue and clean it up
@example
event_queue.shutdown
@return [Boolean]
@api public
# File lib/finite_machine/message_queue.rb, line 130 def shutdown raise EventQueueDeadError, "event queue already dead" if @dead queue = [] @mutex.synchronize do @dead = true @not_empty.broadcast queue = @queue @queue.clear end while !queue.empty? discard_message(queue.pop) end true end
Get number of events waiting for processing
@example
event_queue.size
@return [Integer]
@api public
# File lib/finite_machine/message_queue.rb, line 155 def size @mutex.synchronize { @queue.size } end
Spawn new background thread
@api private
# File lib/finite_machine/message_queue.rb, line 41 def spawn_thread @thread = Thread.new do Thread.current.abort_on_exception = true process_events end end
Start a new thread with a queue of callback events to run
@api private
# File lib/finite_machine/message_queue.rb, line 32 def start return if running? @mutex.synchronize { spawn_thread } end
Add listener to the queue to receive messages
@api public
# File lib/finite_machine/message_queue.rb, line 76 def subscribe(*args, &block) @mutex.synchronize do listener = Listener.new(*args) listener.on_delivery(&block) @listeners << listener end end
Private Instance Methods
# File lib/finite_machine/message_queue.rb, line 198 def discard_message(message) Logger.debug "Discarded message: #{message}" if $DEBUG end
Notify consumers about process event
@param [AsyncCall] event
@api private
# File lib/finite_machine/message_queue.rb, line 172 def notify_listeners(event) @listeners.each { |listener| listener.handle_delivery(event) } end
Process all the events
@return [Thread]
@api private
# File lib/finite_machine/message_queue.rb, line 181 def process_events until @dead @mutex.synchronize do while @queue.empty? @not_empty.wait(@mutex) end event = @queue.pop break unless event notify_listeners(event) event.dispatch end end rescue Exception => ex Logger.error "Error while running event: #{Logger.format_error(ex)}" end