class Celluloid::Actor

Actors are Celluloid's concurrency primitive. They're implemented as normal Ruby objects wrapped in threads which communicate with asynchronous messages.

Attributes

behavior[R]
exit_handler[W]
mailbox[R]
name[R]
proxy[R]
tasks[R]
thread[R]
timers[R]

Public Class Methods

all() click to toggle source

Obtain all running actors in the system

# File lib/celluloid/actor.rb, line 49
def all
  Celluloid.actor_system.running
end
async(mailbox, meth, *args, &block) click to toggle source

Invoke a method asynchronously on an actor via its mailbox

# File lib/celluloid/actor.rb, line 37
def async(mailbox, meth, *args, &block)
  proxy = Proxy::Async.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end
call(mailbox, meth, *args, &block) click to toggle source

Invoke a method on the given actor via its mailbox

# File lib/celluloid/actor.rb, line 31
def call(mailbox, meth, *args, &block)
  proxy = Proxy::Sync.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end
current() click to toggle source

Obtain the current actor

# File lib/celluloid/actor.rb, line 17
def current
  actor = Thread.current[:celluloid_actor]
  raise NotActorError, "not in actor scope" unless actor
  actor.behavior_proxy
end
future(mailbox, meth, *args, &block) click to toggle source

Call a method asynchronously and retrieve its value later

# File lib/celluloid/actor.rb, line 43
def future(mailbox, meth, *args, &block)
  proxy = Proxy::Future.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end
join(actor, timeout = nil) click to toggle source

Wait for an actor to terminate

# File lib/celluloid/actor.rb, line 96
def join(actor, timeout = nil)
  actor.thread.join(timeout)
  actor
end
kill(actor) click to toggle source

Forcibly kill a given actor

# File lib/celluloid/actor.rb, line 89
def kill(actor)
  actor.thread.kill
  actor.mailbox.shutdown if actor.mailbox.alive?
end
linked_to?(actor) click to toggle source

Are we bidirectionally linked to the given actor?

# File lib/celluloid/actor.rb, line 83
def linked_to?(actor)
  monitoring?(actor) && Thread.current[:celluloid_actor].links.include?(actor)
end
monitor(actor) click to toggle source

Watch for exit events from another actor

# File lib/celluloid/actor.rb, line 54
def monitor(actor)
  raise NotActorError, "can't link outside actor context" unless Celluloid.actor?
  Thread.current[:celluloid_actor].linking_request(actor, :link)
end
monitoring?(actor) click to toggle source

Are we monitoring the given actor?

# File lib/celluloid/actor.rb, line 78
def monitoring?(actor)
  actor.links.include? Actor.current
end
new(behavior, options) click to toggle source
# File lib/celluloid/actor.rb, line 102
def initialize(behavior, options)
  @behavior         = behavior

  @actor_system     = options.fetch(:actor_system)
  @mailbox          = options.fetch(:mailbox_class, Mailbox).new
  @mailbox.max_size = options.fetch(:mailbox_size, nil)

  @task_class   = options[:task_class] || Celluloid.task_class
  @exit_handler = method(:default_exit_handler)
  @exclusive    = options.fetch(:exclusive, false)

  @timers    = Timers::Group.new
  @tasks     = Internals::TaskSet.new
  @links     = Internals::Links.new
  @handlers  = Internals::Handlers.new
  @receivers = Internals::Receivers.new(@timers)
  @signals   = Internals::Signals.new
  @running   = false
  @name      = nil

  handle(SystemEvent) do |message|
    handle_system_event message
  end
end
registered_name() click to toggle source

Obtain the name of the current actor

# File lib/celluloid/actor.rb, line 24
def registered_name
  actor = Thread.current[:celluloid_actor]
  raise NotActorError, "not in actor scope" unless actor
  actor.name
end
unmonitor(actor) click to toggle source

Stop waiting for exit events from another actor

# File lib/celluloid/actor.rb, line 60
def unmonitor(actor)
  raise NotActorError, "can't link outside actor context" unless Celluloid.actor?
  Thread.current[:celluloid_actor].linking_request(actor, :unlink)
end

Public Instance Methods

after(interval, &block) click to toggle source

Schedule a block to run at the given time

# File lib/celluloid/actor.rb, line 244
def after(interval, &block)
  @timers.after(interval) { task(:timer, &block) }
end
behavior_proxy() click to toggle source
# File lib/celluloid/actor.rb, line 142
def behavior_proxy
  @behavior.proxy
end
cleanup(exit_event) click to toggle source

Clean up after this actor

# File lib/celluloid/actor.rb, line 320
def cleanup(exit_event)
  # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
  # rubocop:disable Style/GlobalVars
  Celluloid::Probe.actor_died(self) if $CELLULOID_MONITORING
  # rubocop:enable Style/GlobalVars

  @mailbox.shutdown
  @links.each do |actor|
    actor.mailbox << exit_event if actor.mailbox.alive?
  end

  tasks.to_a.each do |task|
    begin
      task.terminate
    rescue DeadTaskError
      # TODO: not tested (failed on Travis)
    end
  end
rescue => ex
  # TODO: metadata
  Internals::Logger.crash("CLEANUP CRASHED!", ex)
end
default_exit_handler(event) click to toggle source
# File lib/celluloid/actor.rb, line 297
def default_exit_handler(event)
  raise event.reason if event.reason
end
every(interval, &block) click to toggle source

Schedule a block to run at the given time

# File lib/celluloid/actor.rb, line 249
def every(interval, &block)
  @timers.every(interval) { task(:timer, &block) }
end
handle(*patterns, &block) click to toggle source

Register a new handler for a given pattern

# File lib/celluloid/actor.rb, line 229
def handle(*patterns, &block)
  @handlers.handle(*patterns, &block)
end
handle_crash(exception) click to toggle source

Handle any exceptions that occur within a running actor

# File lib/celluloid/actor.rb, line 302
def handle_crash(exception)
  # TODO: add meta info
  Internals::Logger.crash("Actor crashed!", exception)
  shutdown ExitEvent.new(behavior_proxy, exception)
rescue => ex
  Internals::Logger.crash("Actor#handle_crash CRASHED!", ex)
end
handle_message(message) click to toggle source

Handle standard low-priority messages

# File lib/celluloid/actor.rb, line 288
def handle_message(message)
  # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
  # rubocop:disable Metrics/LineLength, Style/GlobalVars
  Internals::Logger.debug "Discarded message (unhandled): #{message}" if !@handlers.handle_message(message) && !@receivers.handle_message(message) && $CELLULOID_DEBUG
  # rubocop:enable Metrics/LineLength, Style/GlobalVars

  message
end
handle_system_event(event) click to toggle source

Handle high-priority system event messages

# File lib/celluloid/system_events.rb, line 4
def handle_system_event(event)
  if handler = SystemEvent.handle(event.class)
    send(handler, event)
  else
    # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
    # rubocop:disable Style/GlobalVars
    Internals::Logger.debug "Discarded message (unhandled): #{message}" if $CELLULOID_DEBUG
    # rubocop:enable Style/GlobalVars
  end
end
linking_request(receiver, type) click to toggle source

Perform a linking request with another actor

# File lib/celluloid/actor.rb, line 184
def linking_request(receiver, type)
  Celluloid.exclusive do
    receiver.mailbox << LinkingRequest.new(Actor.current, type)
    system_events = []

    Timers::Wait.for(LINKING_TIMEOUT) do |remaining|
      begin
        message = @mailbox.receive(remaining) do |msg|
          msg.is_a?(LinkingResponse) &&
            msg.actor.mailbox.address == receiver.mailbox.address &&
            msg.type == type
        end
      rescue TaskTimeout
        next # IO reactor did something, no message in queue yet.
      end

      if message.instance_of? LinkingResponse
        # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
        # rubocop:disable Style/GlobalVars
        Celluloid::Probe.actors_linked(self, receiver) if $CELLULOID_MONITORING
        # rubocop:enable Style/GlobalVars
        system_events.each { |ev| @mailbox << ev }
        return
      elsif message.is_a? SystemEvent
        # Queue up pending system events to be processed after we've successfully linked
        system_events << message
      else raise "Unexpected message type: #{message.class}. Expected LinkingResponse, NilClass, SystemEvent."
      end
    end

    raise TaskTimeout, "linking timeout of #{LINKING_TIMEOUT} seconds exceeded with receiver: #{receiver}"
  end
end
receive(timeout = nil, &block) click to toggle source

Receive an asynchronous message

# File lib/celluloid/actor.rb, line 234
def receive(timeout = nil, &block)
  loop do
    message = @receivers.receive(timeout, &block)
    return message unless message.is_a?(SystemEvent)

    handle_system_event(message)
  end
end
run() click to toggle source

Run the actor loop

# File lib/celluloid/actor.rb, line 152
def run
  while @running
    begin
      @timers.wait do |interval|
        interval = 0 if interval && interval < 0

        if message = @mailbox.check(interval)
          handle_message(message)

          break unless @running
        end
      end
    rescue MailboxShutdown
      @running = false
    rescue MailboxDead
      # TODO: not tests (but fails occasionally in tests)
      @running = false
    end
  end

  shutdown
rescue ::Exception => ex
  handle_crash(ex)
  raise unless ex.is_a?(StandardError) || ex.is_a?(Celluloid::Interruption)
end
setup_thread() click to toggle source
# File lib/celluloid/actor.rb, line 146
def setup_thread
  Thread.current[:celluloid_actor]   = self
  Thread.current[:celluloid_mailbox] = @mailbox
end
shutdown(exit_event = ExitEvent.new(behavior_proxy)) click to toggle source

Handle cleaning up this actor after it exits

# File lib/celluloid/actor.rb, line 311
def shutdown(exit_event = ExitEvent.new(behavior_proxy))
  @behavior.shutdown
  cleanup exit_event
ensure
  Thread.current[:celluloid_actor]   = nil
  Thread.current[:celluloid_mailbox] = nil
end
signal(name, value = nil) click to toggle source

Send a signal with the given name to all waiting methods

# File lib/celluloid/actor.rb, line 219
def signal(name, value = nil)
  @signals.broadcast name, value
end
sleep(interval) click to toggle source

Sleep for the given amount of time

# File lib/celluloid/actor.rb, line 282
def sleep(interval)
  sleeper = Sleeper.new(@timers, interval)
  Celluloid.suspend(:sleeping, sleeper)
end
start() click to toggle source
# File lib/celluloid/actor.rb, line 127
def start
  @running = true
  @thread = Internals::ThreadHandle.new(@actor_system, :actor) do
    setup_thread
    run
  end

  @proxy = Proxy::Actor.new(@mailbox, @thread)

  # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
  # rubocop:disable Style/GlobalVars
  Celluloid::Probe.actor_created(self) if $CELLULOID_MONITORING
  # rubocop:enable Style/GlobalVars
end
task(task_type, meta = nil) { || ... } click to toggle source

Run a method inside a task unless it's exclusive

# File lib/celluloid/actor.rb, line 344
def task(task_type, meta = nil)
  @task_class.new(task_type, meta) do
    if @exclusive
      Celluloid.exclusive { yield }
    else
      yield
    end
  end.resume
end
terminate() click to toggle source

Terminate this actor

# File lib/celluloid/actor.rb, line 179
def terminate
  @running = false
end
timeout(duration) { || ... } click to toggle source
# File lib/celluloid/actor.rb, line 253
def timeout(duration)
  bt = caller
  task = Task.current
  timer = @timers.after(duration) do
    exception = TaskTimeout.new("execution expired")
    exception.set_backtrace bt
    task.resume exception
  end
  yield
ensure
  timer.cancel if timer
end
wait(name) click to toggle source

Wait for the given signal

# File lib/celluloid/actor.rb, line 224
def wait(name)
  @signals.wait name
end