class Celluloid::Actor

Actors are Celluloid's concurrency primitive. They're implemented as normal Ruby objects wrapped in threads which communicate with asynchronous messages.

Attributes

mailbox[R]
name[R]
proxy[R]
subject[R]
tasks[R]
thread[R]

Public Class Methods

all() click to toggle source

Obtain all running actors in the system

# File lib/celluloid/actor.rb, line 78
def all
  actors = []
  Celluloid.internal_pool.each do |t|
    next unless t.role == :actor
    actors << t.actor.proxy if t.actor && t.actor.respond_to?(:proxy)
  end
  actors
end
async(mailbox, meth, *args, &block) click to toggle source

Invoke a method asynchronously on an actor via its mailbox

# File lib/celluloid/actor.rb, line 66
def async(mailbox, meth, *args, &block)
  proxy = AsyncProxy.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end
call(mailbox, meth, *args, &block) click to toggle source

Invoke a method on the given actor via its mailbox

# File lib/celluloid/actor.rb, line 60
def call(mailbox, meth, *args, &block)
  proxy = SyncProxy.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end
clear_registry() click to toggle source
# File lib/celluloid/actor.rb, line 41
def clear_registry
  Registry.root.clear
end
current() click to toggle source

Obtain the current actor

# File lib/celluloid/actor.rb, line 46
def current
  actor = Thread.current[:celluloid_actor]
  raise NotActorError, "not in actor scope" unless actor
  actor.proxy
end
future(mailbox, meth, *args, &block) click to toggle source

Call a method asynchronously and retrieve its value later

# File lib/celluloid/actor.rb, line 72
def future(mailbox, meth, *args, &block)
  proxy = FutureProxy.new(mailbox, "UnknownClass")
  proxy.method_missing(meth, *args, &block)
end
join(actor, timeout = nil) click to toggle source

Wait for an actor to terminate

# File lib/celluloid/actor.rb, line 128
def join(actor, timeout = nil)
  actor.thread.join(timeout)
  actor
end
kill(actor) click to toggle source

Forcibly kill a given actor

# File lib/celluloid/actor.rb, line 122
def kill(actor)
  actor.thread.kill
  actor.mailbox.shutdown if actor.mailbox.alive?
end
linked_to?(actor) click to toggle source

Are we bidirectionally linked to the given actor?

# File lib/celluloid/actor.rb, line 117
def linked_to?(actor)
  monitoring?(actor) && Thread.current[:celluloid_actor].links.include?(actor)
end
monitor(actor) click to toggle source

Watch for exit events from another actor

# File lib/celluloid/actor.rb, line 88
def monitor(actor)
  raise NotActorError, "can't link outside actor context" unless Celluloid.actor?
  Thread.current[:celluloid_actor].linking_request(actor, :link)
end
monitoring?(actor) click to toggle source

Are we monitoring the given actor?

# File lib/celluloid/actor.rb, line 112
def monitoring?(actor)
  actor.links.include? Actor.current
end
name() click to toggle source

Obtain the name of the current actor

# File lib/celluloid/actor.rb, line 53
def name
  actor = Thread.current[:celluloid_actor]
  raise NotActorError, "not in actor scope" unless actor
  actor.name
end
new(subject, options = {}) click to toggle source

Wrap the given subject with an Actor

# File lib/celluloid/actor.rb, line 135
def initialize(subject, options = {})
  @subject = subject

  @mailbox          = options.fetch(:mailbox_class, Mailbox).new
  @mailbox.max_size = options.fetch(:mailbox_size, nil)

  @task_class   = options[:task_class] || Celluloid.task_class
  @exit_handler = options[:exit_handler]
  @exclusives   = options[:exclusive_methods]
  @receiver_block_executions = options[:receiver_block_executions]

  @tasks     = TaskSet.new
  @links     = Links.new
  @signals   = Signals.new
  @receivers = Receivers.new
  @timers    = Timers.new
  @running   = true
  @exclusive = false
  @name      = nil

  @thread = ThreadHandle.new(:actor) do
    setup_thread
    run
  end

  @proxy = (options[:proxy_class] || ActorProxy).new(self)
  @subject.instance_variable_set(OWNER_IVAR, self)
end
registered() click to toggle source
# File lib/celluloid/actor.rb, line 37
def registered
  Registry.root.names
end
unmonitor(actor) click to toggle source

Stop waiting for exit events from another actor

# File lib/celluloid/actor.rb, line 94
def unmonitor(actor)
  raise NotActorError, "can't link outside actor context" unless Celluloid.actor?
  Thread.current[:celluloid_actor].linking_request(actor, :unlink)
end

Public Instance Methods

after(interval, &block) click to toggle source

Schedule a block to run at the given time

# File lib/celluloid/actor.rb, line 262
def after(interval, &block)
  @timers.after(interval) { task(:timer, &block) }
end
cleanup(exit_event) click to toggle source

Clean up after this actor

# File lib/celluloid/actor.rb, line 396
def cleanup(exit_event)
  @mailbox.shutdown
  @links.each do |actor|
    if actor.mailbox.alive?
      actor.mailbox << exit_event
    end
  end

  tasks.to_a.each { |task| task.terminate }
rescue => ex
  Logger.crash("#{@subject.class}: CLEANUP CRASHED!", ex)
end
every(interval, &block) click to toggle source

Schedule a block to run at the given time

# File lib/celluloid/actor.rb, line 267
def every(interval, &block)
  @timers.every(interval) { task(:timer, &block) }
end
handle_crash(exception) click to toggle source

Handle any exceptions that occur within a running actor

# File lib/celluloid/actor.rb, line 365
def handle_crash(exception)
  Logger.crash("#{@subject.class} crashed!", exception)
  shutdown ExitEvent.new(@proxy, exception)
rescue => ex
  Logger.crash("#{@subject.class}: ERROR HANDLER CRASHED!", ex)
end
handle_exit_event(event) click to toggle source

Handle exit events received by this actor

# File lib/celluloid/actor.rb, line 353
def handle_exit_event(event)
  @links.delete event.actor

  # Run the exit handler if available
  return @subject.send(@exit_handler, event.actor, event.reason) if @exit_handler

  # Reraise exceptions from linked actors
  # If no reason is given, actor terminated cleanly
  raise event.reason if event.reason
end
handle_message(message) click to toggle source

Handle standard low-priority messages

# File lib/celluloid/actor.rb, line 306
def handle_message(message)
  case message
  when SystemEvent
    handle_system_event message
  when Call
    meth = message.method
    if meth == :__send__
      meth = message.arguments.first
    end
    if @receiver_block_executions && meth
      if @receiver_block_executions.include?(meth.to_sym)
        message.execute_block_on_receiver
      end
    end

    task(:call, :method_name => meth, :dangerous_suspend => meth == :initialize) {
      message.dispatch(@subject)
    }
  when BlockCall
    task(:invoke_block) { message.dispatch }
  when BlockResponse, Response
    message.dispatch
  else
    unless @receivers.handle_message(message)
      Logger.debug "Discarded message (unhandled): #{message}" if $CELLULOID_DEBUG
    end
  end
  message
end
handle_system_event(event) click to toggle source

Handle high-priority system event messages

# File lib/celluloid/actor.rb, line 337
def handle_system_event(event)
  case event
  when ExitEvent
    task(:exit_handler, :method_name => @exit_handler) { handle_exit_event event }
  when LinkingRequest
    event.process(links)
  when NamingRequest
    @name = event.name
  when TerminationRequest
    terminate
  when SignalConditionRequest
    event.call
  end
end
linking_request(receiver, type) click to toggle source

Perform a linking request with another actor

# File lib/celluloid/actor.rb, line 197
def linking_request(receiver, type)
  Celluloid.exclusive do
    start_time = Time.now
    receiver.mailbox << LinkingRequest.new(Actor.current, type)
    system_events = []

    loop do
      wait_interval = start_time + LINKING_TIMEOUT - Time.now
      message = @mailbox.receive(wait_interval) do |msg|
        msg.is_a?(LinkingResponse) &&
        msg.actor.mailbox.address == receiver.mailbox.address &&
        msg.type == type
      end

      case message
      when LinkingResponse
        # We're done!
        system_events.each { |ev| handle_system_event(ev) }
        return
      when NilClass
        raise TimeoutError, "linking timeout of #{LINKING_TIMEOUT} seconds exceeded"
      when SystemEvent
        # Queue up pending system events to be processed after we've successfully linked
        system_events << message
      else raise 'wtf'
      end
    end
  end
end
receive(timeout = nil, &block) click to toggle source

Receive an asynchronous message

# File lib/celluloid/actor.rb, line 238
def receive(timeout = nil, &block)
  loop do
    message = @receivers.receive(timeout, &block)
    break message unless message.is_a?(SystemEvent)

    handle_system_event(message)
  end
end
run() click to toggle source

Run the actor loop

# File lib/celluloid/actor.rb, line 170
def run
  begin
    while @running
      if message = @mailbox.receive(timeout_interval)
        handle_message message
      else
        # No message indicates a timeout
        @timers.fire
        @receivers.fire_timers
      end
    end
  rescue MailboxShutdown
    # If the mailbox detects shutdown, exit the actor
  end

  shutdown
rescue Exception => ex
  handle_crash(ex)
  raise unless ex.is_a? StandardError
end
run_finalizer() click to toggle source

Run the user-defined finalizer, if one is set

# File lib/celluloid/actor.rb, line 382
def run_finalizer
  finalizer = @subject.class.finalizer
  return unless finalizer && @subject.respond_to?(finalizer, true)

  task(:finalizer, :method_name => finalizer, :dangerous_suspend => true) do
    begin
      @subject.__send__(finalizer)
    rescue => ex
      Logger.crash("#{@subject.class}#finalize crashed!", ex)
    end
  end
end
setup_thread() click to toggle source
# File lib/celluloid/actor.rb, line 164
def setup_thread
  Thread.current[:celluloid_actor]   = self
  Thread.current[:celluloid_mailbox] = @mailbox
end
shutdown(exit_event = ExitEvent.new(@proxy)) click to toggle source

Handle cleaning up this actor after it exits

# File lib/celluloid/actor.rb, line 373
def shutdown(exit_event = ExitEvent.new(@proxy))
  run_finalizer
  cleanup exit_event
ensure
  Thread.current[:celluloid_actor]   = nil
  Thread.current[:celluloid_mailbox] = nil
end
signal(name, value = nil) click to toggle source

Send a signal with the given name to all waiting methods

# File lib/celluloid/actor.rb, line 228
def signal(name, value = nil)
  @signals.broadcast name, value
end
sleep(interval) click to toggle source

Sleep for the given amount of time

# File lib/celluloid/actor.rb, line 300
def sleep(interval)
  sleeper = Sleeper.new(@timers, interval)
  Celluloid.suspend(:sleeping, sleeper)
end
task(task_type, meta = nil) { || ... } click to toggle source

Run a method inside a task unless it's exclusive

# File lib/celluloid/actor.rb, line 410
def task(task_type, meta = nil)
  method_name = meta && meta.fetch(:method_name, nil)
  @task_class.new(task_type, meta) {
    if @exclusives && (@exclusives == :all || (method_name && @exclusives.include?(method_name.to_sym)))
      Celluloid.exclusive { yield }
    else
      yield
    end
  }.resume
end
terminate() click to toggle source

Terminate this actor

# File lib/celluloid/actor.rb, line 192
def terminate
  @running = false
end
timeout(duration) { || ... } click to toggle source
# File lib/celluloid/actor.rb, line 271
def timeout(duration)
  bt = caller
  task = Task.current
  timer = @timers.after(duration) do
    exception = Task::TimeoutError.new("execution expired")
    exception.set_backtrace bt
    task.resume exception
  end
  yield
ensure
  timer.cancel if timer
end
timeout_interval() click to toggle source

How long to wait until the next timer fires

# File lib/celluloid/actor.rb, line 248
def timeout_interval
  i1 = @timers.wait_interval
  i2 = @receivers.wait_interval

  if i1 and i2
    i1 < i2 ? i1 : i2
  elsif i1
    i1
  else
    i2
  end
end
wait(name) click to toggle source

Wait for the given signal

# File lib/celluloid/actor.rb, line 233
def wait(name)
  @signals.wait name
end