class Celluloid::Mailbox
Actors communicate with asynchronous messages. Messages are buffered in Mailboxes until Actors can act upon them.
Attributes
address[R]
A unique address at which this mailbox can be found
max_size[RW]
Public Class Methods
new()
click to toggle source
# File lib/celluloid/mailbox.rb, line 14 def initialize @address = Celluloid.uuid @messages = [] @mutex = Mutex.new @dead = false @condition = ConditionVariable.new @max_size = nil end
Public Instance Methods
<<(message)
click to toggle source
Add a message to the Mailbox
# File lib/celluloid/mailbox.rb, line 24 def <<(message) @mutex.lock begin if mailbox_full || @dead dead_letter(message) return end if message.is_a?(SystemEvent) # SystemEvents are high priority messages so they get added to the # head of our message queue instead of the end @messages.unshift message else @messages << message end @condition.signal nil ensure begin @mutex.unlock rescue nil end end end
alive?()
click to toggle source
Is the mailbox alive?
# File lib/celluloid/mailbox.rb, line 116 def alive? !@dead end
check(timeout = nil, &block)
click to toggle source
Receive a message from the Mailbox
. May return nil and may return before the specified timeout.
# File lib/celluloid/mailbox.rb, line 52 def check(timeout = nil, &block) message = nil @mutex.lock begin raise MailboxDead, "attempted to receive from a dead mailbox" if @dead message = nil Timers::Wait.for(timeout) do |remaining| message = next_message(&block) break if message @condition.wait(@mutex, remaining) end ensure begin @mutex.unlock rescue nil end end message end
each(&block)
click to toggle source
Iterate through the mailbox
# File lib/celluloid/mailbox.rb, line 126 def each(&block) to_a.each(&block) end
inspect()
click to toggle source
Inspect the contents of the Mailbox
# File lib/celluloid/mailbox.rb, line 131 def inspect "#<#{self.class}:#{object_id.to_s(16)} @messages=[#{map(&:inspect).join(', ')}]>" end
receive(timeout = nil, &block)
click to toggle source
Receive a letter from the mailbox. Guaranteed to return a message. If timeout is exceeded, raise a TaskTimeout
.
# File lib/celluloid/mailbox.rb, line 80 def receive(timeout = nil, &block) message = nil Timers::Wait.for(timeout) do |_remaining| message = check(timeout, &block) break if message end return message if message raise TaskTimeout, "receive timeout exceeded" end
shutdown() { || ... }
click to toggle source
Shut down this mailbox and clean up its contents
# File lib/celluloid/mailbox.rb, line 91 def shutdown raise MailboxDead, "mailbox already shutdown" if @dead @mutex.lock begin yield if block_given? messages = @messages @messages = [] @dead = true ensure begin @mutex.unlock rescue nil end end messages.each do |msg| dead_letter msg msg.cleanup if msg.respond_to? :cleanup end true end
size()
click to toggle source
Number of messages in the Mailbox
# File lib/celluloid/mailbox.rb, line 136 def size @mutex.synchronize { @messages.size } end
to_a()
click to toggle source
Cast to an array
# File lib/celluloid/mailbox.rb, line 121 def to_a @mutex.synchronize { @messages.dup } end
Private Instance Methods
dead_letter(message)
click to toggle source
# File lib/celluloid/mailbox.rb, line 159 def dead_letter(message) # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!! # rubocop:disable Style/GlobalVars Internals::Logger.debug "Discarded message (mailbox is dead): #{message}" if $CELLULOID_DEBUG # rubocop:enable Style/GlobalVars end
mailbox_full()
click to toggle source
# File lib/celluloid/mailbox.rb, line 166 def mailbox_full @max_size && @messages.size >= @max_size end
next_message() { |msg| ... }
click to toggle source
Retrieve the next message in the mailbox
# File lib/celluloid/mailbox.rb, line 143 def next_message message = nil if block_given? index = @messages.index do |msg| yield(msg) || msg.is_a?(SystemEvent) end message = @messages.slice!(index, 1).first if index else message = @messages.shift end message end