class Rbgo::Actor

Attributes

children[RW]
close_mutex[RW]
close_reason[R]
handler[RW]
linked_actors[RW]
mail_box[RW]
once_for_close[RW]
once_for_msg_loop[RW]
parent[RW]
supervisor_actors[RW]

Public Class Methods

new(&handler) click to toggle source
# File lib/rbgo/actor.rb, line 26
def initialize(&handler)
  self.handler           = handler
  @close_reason          = nil
  self.mail_box          = Queue.new
  self.once_for_msg_loop = Once.new
  self.once_for_close    = Once.new
  self.close_mutex       = ReentrantMutex.new

  self.children          = SyncSet.new
  self.linked_actors     = SyncSet.new
  self.supervisor_actors = SyncSet.new
end

Public Instance Methods

close(reason = nil) click to toggle source
# File lib/rbgo/actor.rb, line 52
def close(reason = nil)
  once_for_close.do do
    close_mutex.synchronize do
      @close_reason = reason
      mail_box.close
      mail_box.clear

      (parent&.send :children)&.delete(self)
      self.parent   = nil
      self.children = nil

      linked_actors.each do |l|
        CoRun::Routine.new(new_thread: false, queue_tag: :default) do
          l.close
        end
      end
      self.linked_actors = nil

      supervisor_actors.each do |sup|
        sup.send_msg(ActorClosedMsg.new(self)) rescue nil
      end
      self.supervisor_actors = nil

      nil
    end
  end
end
closed?() click to toggle source
# File lib/rbgo/actor.rb, line 80
def closed?
  mail_box.closed?
end
demonitor(actor) click to toggle source
# File lib/rbgo/actor.rb, line 158
def demonitor(actor)
  return self if self.equal?(actor)
  close_mutex.synchronize do
    raise "can not demonitor from a closed actor" if closed?
    actor.send(:close_mutex).synchronize do
      actor.send(:supervisor_actors)&.delete(self)
    end
    self
  end
end
monitor(actor) click to toggle source
# File lib/rbgo/actor.rb, line 143
def monitor(actor)
  return self if self.equal?(actor)
  close_mutex.synchronize do
    raise "can not monitor from a closed actor" if closed?
    actor.send(:close_mutex).synchronize do
      if actor.closed?
        send_msg(ActorClosedMsg.new(actor))
      else
        actor.send(:supervisor_actors).add(self)
      end
    end
    self
  end
end
send_children(msg) click to toggle source
# File lib/rbgo/actor.rb, line 45
def send_children(msg)
  children.each do |child|
    child.send_msg(msg) rescue nil
  end
  self
end
send_msg(msg) click to toggle source
# File lib/rbgo/actor.rb, line 39
def send_msg(msg)
  mail_box << msg
  start_msg_loop
  self
end
spawn_child(&handler) click to toggle source
# File lib/rbgo/actor.rb, line 84
def spawn_child(&handler)
  close_mutex.synchronize do
    raise "can not spawn from a closed actor" if closed?
    child = Actor.new(&handler)
    child.send :parent=, self
    children.add(child)
    child
  end
end
spawn_monitor(&handler) click to toggle source
# File lib/rbgo/actor.rb, line 134
def spawn_monitor(&handler)
  close_mutex.synchronize do
    raise "can not spawn from a closed actor" if closed?
    m = spawn_child(&handler)
    m.send(:supervisor_actors).add(self)
    m
  end
end

Private Instance Methods

call_handler(msg) click to toggle source
# File lib/rbgo/actor.rb, line 190
def call_handler(msg)
  handler.call(msg, self) if handler
rescue Exception => ex
  close(ex)
  Rbgo.logger&.error('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
end
start_msg_loop() click to toggle source
# File lib/rbgo/actor.rb, line 171
def start_msg_loop
  once_for_msg_loop.do do
    CoRun::Routine.new(new_thread: false, queue_tag: :default) do
      loop do
        begin
          msg = mail_box.deq(true)
        rescue ThreadError
          self.once_for_msg_loop = Once.new
          start_msg_loop unless (mail_box.empty? || mail_box.closed?)
          break
        else
          call_handler(msg)
        end                            
        Fiber.yield
      end
    end
  end
end