class FiniteMachine::MessageQueue

Responsible for storage of asynchronous messages such as events and callbacks.

Used internally by {Observer}

@api private

Public Class Methods

new() click to toggle source

Initialize a MessageQueue

@example

message_queue = FiniteMachine::MessageQueue.new

@api public

# File lib/finite_machine/message_queue.rb, line 20
def initialize
  @not_empty = ConditionVariable.new
  @mutex     = Mutex.new
  @queue     = Queue.new
  @dead      = false
  @listeners = []
  @thread    = nil
end

Public Instance Methods

<<(event) click to toggle source

Add an asynchronous event to the message queue to process

@example

message_queue << AsyncCall.build(...)

@param [FiniteMachine::AsyncCall] event

the event to add

@return [void]

@api public

# File lib/finite_machine/message_queue.rb, line 78
def <<(event)
  @mutex.synchronize do
    if @dead
      discard_message(event)
    else
      @queue << event
      @not_empty.signal
    end
  end
end
alive?() click to toggle source

Check whether or not the message queue is alive

@example

message_queue.alive?

@return [Boolean]

@api public

# File lib/finite_machine/message_queue.rb, line 123
def alive?
  @mutex.synchronize { !@dead }
end
empty?() click to toggle source

Check whether or not there are any messages to handle

@example

message_queue.empty?

@api public

# File lib/finite_machine/message_queue.rb, line 111
def empty?
  @mutex.synchronize { @queue.empty? }
end
inspect() click to toggle source

Inspect this message queue

@example

message_queue.inspect

@return [String]

@api public

# File lib/finite_machine/message_queue.rb, line 191
def inspect
  @mutex.synchronize do
    "#<#{self.class}:#{object_id.to_s(16)} @size=#{size}, @dead=#{@dead}>"
  end
end
join(timeout = nil) click to toggle source

Join the message queue from the current thread

@param [Fixnum] timeout

the time limit

@example

message_queue.join

@return [Thread, nil]

@api public

# File lib/finite_machine/message_queue.rb, line 138
def join(timeout = nil)
  return unless @thread

  timeout.nil? ? @thread.join : @thread.join(timeout)
end
running?() click to toggle source

Check whether or not the message queue is running

@example

message_queue.running?

@return [Boolean]

@api public

# File lib/finite_machine/message_queue.rb, line 63
def running?
  !@thread.nil? && alive?
end
shutdown() click to toggle source

Shut down this message queue and clean it up

@example

message_queue.shutdown

@raise [FiniteMachine::MessageQueueDeadError]

@return [Boolean]

@api public

# File lib/finite_machine/message_queue.rb, line 154
def shutdown
  raise MessageQueueDeadError, "message queue already dead" if @dead

  queue = []
  @mutex.synchronize do
    @dead = true
    @not_empty.broadcast

    queue = @queue
    @queue.clear
  end
  while !queue.empty?
    discard_message(queue.pop)
  end
  true
end
size() click to toggle source

The number of messages waiting for processing

@example

message_queue.size

@return [Integer]

@api public

# File lib/finite_machine/message_queue.rb, line 179
def size
  @mutex.synchronize { @queue.size }
end
spawn_thread() click to toggle source

Spawn a new background thread

@return [Thread]

@api private

# File lib/finite_machine/message_queue.rb, line 48
def spawn_thread
  @thread = Thread.new do
    Thread.current.abort_on_exception = true
    process_events
  end
end
start() click to toggle source

Start a new thread with a queue of callback events to run

@example

message_queue.start

@return [Thread, nil]

@api private

# File lib/finite_machine/message_queue.rb, line 37
def start
  return if running?

  @mutex.synchronize { spawn_thread }
end
subscribe(*args, &block) click to toggle source

Add a listener for the message queue to receive notifications

@example

message_queue.subscribe { |event| ... }

@return [void]

@api public

# File lib/finite_machine/message_queue.rb, line 97
def subscribe(*args, &block)
  @mutex.synchronize do
    listener = Listener.new(*args)
    listener.on_delivery(&block)
    @listeners << listener
  end
end

Private Instance Methods

discard_message(message) click to toggle source

Log discarded message

@param [FiniteMachine::AsyncCall] message

the message to discard

@return [void]

@api private

# File lib/finite_machine/message_queue.rb, line 241
def discard_message(message)
  Logger.debug "Discarded message: #{message}" if $DEBUG
end
notify_listeners(event) click to toggle source

Notify listeners about the event

@param [FiniteMachine::AsyncCall] event

the event to notify listeners about

@return [void]

@api private

# File lib/finite_machine/message_queue.rb, line 207
def notify_listeners(event)
  @listeners.each { |listener| listener.handle_delivery(event) }
end
process_events() click to toggle source

Process all the events

@return [Thread]

@api private

# File lib/finite_machine/message_queue.rb, line 216
def process_events
  until @dead
    @mutex.synchronize do
      while @queue.empty?
        @not_empty.wait(@mutex)
      end
      event = @queue.pop
      break unless event

      notify_listeners(event)
      event.dispatch
    end
  end
rescue Exception => ex
  Logger.error "Error while running event: #{Logger.format_error(ex)}"
end