class FiniteMachine::MessageQueue

Allows for storage of asynchronous messages such as events and callbacks.

Used internally by {Observer} and {StateMachine}

@api private

Public Class Methods

new() click to toggle source

Initialize an event queue in separate thread

@example

MessageQueue.new

@api public

# File lib/finite_machine/message_queue.rb, line 20
def initialize
  @not_empty = ConditionVariable.new
  @mutex     = Mutex.new
  @queue     = Queue.new
  @dead      = false
  @listeners = []
  @thread    = nil
end

Public Instance Methods

<<(event) click to toggle source

Add asynchronous event to the event queue to process

@example

event_queue << AsyncCall.build(...)

@param [AsyncCall] event

@return [nil]

@api public

# File lib/finite_machine/message_queue.rb, line 62
def <<(event)
  @mutex.synchronize do
    if @dead
      discard_message(event)
    else
      @queue << event
      @not_empty.signal
    end
  end
end
alive?() click to toggle source

Check if the event queue is alive

@example

event_queue.alive?

@return [Boolean]

@api public

# File lib/finite_machine/message_queue.rb, line 102
def alive?
  @mutex.synchronize { !@dead }
end
empty?() click to toggle source

Check if there are any events to handle

@example

event_queue.empty?

@api public

# File lib/finite_machine/message_queue.rb, line 90
def empty?
  @mutex.synchronize { @queue.empty? }
end
inspect() click to toggle source
# File lib/finite_machine/message_queue.rb, line 159
def inspect
  @mutex.synchronize do
    "#<#{self.class}:#{object_id.to_s(16)} @size=#{size}, @dead=#{@dead}>"
  end
end
join(timeout = nil) click to toggle source

Join the event queue from current thread

@param [Fixnum] timeout

@example

event_queue.join

@return [nil, Thread]

@api public

# File lib/finite_machine/message_queue.rb, line 116
def join(timeout = nil)
  return unless @thread

  timeout.nil? ? @thread.join : @thread.join(timeout)
end
running?() click to toggle source
# File lib/finite_machine/message_queue.rb, line 48
def running?
  !@thread.nil? && alive?
end
shutdown() click to toggle source

Shut down this event queue and clean it up

@example

event_queue.shutdown

@return [Boolean]

@api public

# File lib/finite_machine/message_queue.rb, line 130
def shutdown
  raise EventQueueDeadError, "event queue already dead" if @dead

  queue = []
  @mutex.synchronize do
    @dead = true
    @not_empty.broadcast

    queue = @queue
    @queue.clear
  end
  while !queue.empty?
    discard_message(queue.pop)
  end
  true
end
size() click to toggle source

Get number of events waiting for processing

@example

event_queue.size

@return [Integer]

@api public

# File lib/finite_machine/message_queue.rb, line 155
def size
  @mutex.synchronize { @queue.size }
end
spawn_thread() click to toggle source

Spawn new background thread

@api private

# File lib/finite_machine/message_queue.rb, line 41
def spawn_thread
  @thread = Thread.new do
    Thread.current.abort_on_exception = true
    process_events
  end
end
start() click to toggle source

Start a new thread with a queue of callback events to run

@api private

# File lib/finite_machine/message_queue.rb, line 32
def start
  return if running?

  @mutex.synchronize { spawn_thread }
end
subscribe(*args, &block) click to toggle source

Add listener to the queue to receive messages

@api public

# File lib/finite_machine/message_queue.rb, line 76
def subscribe(*args, &block)
  @mutex.synchronize do
    listener = Listener.new(*args)
    listener.on_delivery(&block)
    @listeners << listener
  end
end

Private Instance Methods

discard_message(message) click to toggle source
# File lib/finite_machine/message_queue.rb, line 198
def discard_message(message)
  Logger.debug "Discarded message: #{message}" if $DEBUG
end
notify_listeners(event) click to toggle source

Notify consumers about process event

@param [AsyncCall] event

@api private

# File lib/finite_machine/message_queue.rb, line 172
def notify_listeners(event)
  @listeners.each { |listener| listener.handle_delivery(event) }
end
process_events() click to toggle source

Process all the events

@return [Thread]

@api private

# File lib/finite_machine/message_queue.rb, line 181
def process_events
  until @dead
    @mutex.synchronize do
      while @queue.empty?
        @not_empty.wait(@mutex)
      end
      event = @queue.pop
      break unless event

      notify_listeners(event)
      event.dispatch
    end
  end
rescue Exception => ex
  Logger.error "Error while running event: #{Logger.format_error(ex)}"
end