class Celluloid::Mailbox

Actors communicate with asynchronous messages. Messages are buffered in Mailboxes until Actors can act upon them.

Attributes

address[R]

A unique address at which this mailbox can be found

max_size[RW]

Public Class Methods

new() click to toggle source
# File lib/celluloid/mailbox.rb, line 14
def initialize
  @address   = Celluloid.uuid
  @messages  = []
  @mutex     = Mutex.new
  @dead      = false
  @condition = ConditionVariable.new
  @max_size  = nil
end

Public Instance Methods

<<(message) click to toggle source

Add a message to the Mailbox

# File lib/celluloid/mailbox.rb, line 24
def <<(message)
  @mutex.lock
  begin
    if mailbox_full || @dead
      dead_letter(message)
      return
    end
    if message.is_a?(SystemEvent)
      # SystemEvents are high priority messages so they get added to the
      # head of our message queue instead of the end
      @messages.unshift message
    else
      @messages << message
    end

    @condition.signal
    nil
  ensure
    begin
      @mutex.unlock
    rescue
      nil
    end
  end
end
alive?() click to toggle source

Is the mailbox alive?

# File lib/celluloid/mailbox.rb, line 116
def alive?
  !@dead
end
check(timeout = nil, &block) click to toggle source

Receive a message from the Mailbox. May return nil and may return before the specified timeout.

# File lib/celluloid/mailbox.rb, line 52
def check(timeout = nil, &block)
  message = nil

  @mutex.lock
  begin
    raise MailboxDead, "attempted to receive from a dead mailbox" if @dead

    message = nil
    Timers::Wait.for(timeout) do |remaining|
      message = next_message(&block)

      break if message

      @condition.wait(@mutex, remaining)
    end
  ensure
    begin
      @mutex.unlock
    rescue
      nil
    end
  end

  message
end
each(&block) click to toggle source

Iterate through the mailbox

# File lib/celluloid/mailbox.rb, line 126
def each(&block)
  to_a.each(&block)
end
inspect() click to toggle source

Inspect the contents of the Mailbox

# File lib/celluloid/mailbox.rb, line 131
def inspect
  "#<#{self.class}:#{object_id.to_s(16)} @messages=[#{map(&:inspect).join(', ')}]>"
end
receive(timeout = nil, &block) click to toggle source

Receive a letter from the mailbox. Guaranteed to return a message. If timeout is exceeded, raise a TaskTimeout.

# File lib/celluloid/mailbox.rb, line 80
def receive(timeout = nil, &block)
  message = nil
  Timers::Wait.for(timeout) do |_remaining|
    message = check(timeout, &block)
    break if message
  end
  return message if message
  raise TaskTimeout, "receive timeout exceeded"
end
shutdown() { || ... } click to toggle source

Shut down this mailbox and clean up its contents

# File lib/celluloid/mailbox.rb, line 91
def shutdown
  raise MailboxDead, "mailbox already shutdown" if @dead

  @mutex.lock
  begin
    yield if block_given?
    messages = @messages
    @messages = []
    @dead = true
  ensure
    begin
      @mutex.unlock
    rescue
      nil
    end
  end

  messages.each do |msg|
    dead_letter msg
    msg.cleanup if msg.respond_to? :cleanup
  end
  true
end
size() click to toggle source

Number of messages in the Mailbox

# File lib/celluloid/mailbox.rb, line 136
def size
  @mutex.synchronize { @messages.size }
end
to_a() click to toggle source

Cast to an array

# File lib/celluloid/mailbox.rb, line 121
def to_a
  @mutex.synchronize { @messages.dup }
end

Private Instance Methods

dead_letter(message) click to toggle source
# File lib/celluloid/mailbox.rb, line 159
def dead_letter(message)
  # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
  # rubocop:disable Style/GlobalVars
  Internals::Logger.debug "Discarded message (mailbox is dead): #{message}" if $CELLULOID_DEBUG
  # rubocop:enable Style/GlobalVars
end
mailbox_full() click to toggle source
# File lib/celluloid/mailbox.rb, line 166
def mailbox_full
  @max_size && @messages.size >= @max_size
end
next_message() { |msg| ... } click to toggle source

Retrieve the next message in the mailbox

# File lib/celluloid/mailbox.rb, line 143
def next_message
  message = nil

  if block_given?
    index = @messages.index do |msg|
      yield(msg) || msg.is_a?(SystemEvent)
    end

    message = @messages.slice!(index, 1).first if index
  else
    message = @messages.shift
  end

  message
end