class Slave

the Slave class encapsulates the work of setting up a drb server in another process running on localhost via unix domain sockets. the slave process is attached to it's parent via a LifeLine which is designed such that the slave cannot out-live it's parent and become a zombie, even if the parent dies and early death, such as by 'kill -9'. the concept and purpose of the Slave class is to be able to setup any server object in another process so easily that using a multi-process, drb/ipc, based design is as easy, or easier, than a multi-threaded one. eg

class Server
  def add_two n
    n + 2
  end
end

slave = Slave.new 'object' => Server.new
server = slave.object

p server.add_two(40) #=> 42

two other methods of providing server objects exist:

a) server = Server.new “this is called the parent” }

Slave.new(:object=>server){|s| puts "#{ s.inspect } passed to block in child process"}

b) Slave.new{ Server.new “this is called only in the child” }

of the two 'b' is preferred.

the Slave class encapsulates the work of setting up a drb server in another process running on localhost via unix domain sockets. the slave process is attached to it's parent via a LifeLine which is designed such that the slave cannot out-live it's parent and become a zombie, even if the parent dies and early death, such as by 'kill -9'. the concept and purpose of the Slave class is to be able to setup any server object in another process so easily that using a multi-process, drb/ipc, based design is as easy, or easier, than a multi-threaded one. eg

class Server
  def add_two n
    n + 2
  end
end

slave = Slave.new 'object' => Server.new
server = slave.object

p server.add_two(40) #=> 42

two other methods of providing server objects exist:

a) server = Server.new “this is called the parent” }

Slave.new(:object=>server){|s| puts "#{ s.inspect } passed to block in child process"}

b) Slave.new{ Server.new “this is called only in the child” }

of the two 'b' is preferred.

Constants

DEFAULT_DEBUG
DEFAULT_SOCKET_CREATION_ATTEMPTS

env config

DEFAULT_THREADSAFE
VERSION

Attributes

debug[RW]

if this is true and you are running from a terminal information is printed on STDERR

socket_creation_attempts[RW]

defineds how many attempts will be made to create a temporary unix domain socket

threadsafe[RW]

if this is true all slave objects will be wrapped such that any call to the object is threadsafe. if you do not use this you must ensure that your objects are threadsafe __yourself__ as this is required of any object acting as a drb server

at_exit[R]
debug[R]
dumped[R]
obj[R]

attrs

object[R]
pid[R]
ppid[R]
psname[R]
socket[R]
socket_creation_attempts[R]
status[R]
uri[R]

Public Class Methods

default(key) click to toggle source

get a default value

# File lib/slave-1.2.1.rb, line 77
      def default key
#--{{{
        send key
#--}}}
      end
description() click to toggle source
# File lib/slave.rb, line 45
def Slave.description
  'easily start a drb server in another process'
end
fork(&b) click to toggle source

just fork with out silly warnings

# File lib/slave-1.2.1.rb, line 98
      def fork &b
#--{{{
        v = $VERBOSE
        begin
          $VERBOSE = nil
          Process::fork(&b)
        ensure
        $VERBOSE = v
        end
#--}}}
      end
getopts(opts) click to toggle source
# File lib/slave-1.2.1.rb, line 83
      def getopts opts
#--{{{
        raise ArgumentError, opts.class unless
          opts.respond_to?('has_key?') and opts.respond_to?('[]')

        lambda do |key, *defval|
          defval = defval.shift
          keys = [key, key.to_s, key.to_s.intern]
          key = keys.detect{|k| opts.has_key? k } and break opts[key]
          defval
        end
#--}}}
      end
new(opts = {}) click to toggle source

sets up a child process serving any object as a DRb server running locally on unix domain sockets. the child process has a LifeLine established between it and the parent, making it impossible for the child to outlive the parent (become a zombie). the object to serve is specfied either directly using the 'object'/:object keyword

Slave.new :object => MyServer.new

or, preferably, using the block form

Slave.new{ MyServer.new }

when the block form is used the object is contructed in the child process itself. this is quite advantageous if the child object consumes resources or opens file handles (db connections, etc). by contructing the object in the child any resources are consumed from the child's address space and things like open file handles will not be carried into subsequent child processes (via standard unix fork semantics). in the event that a block is specified but the object cannot be constructed and, instead, throws and Exception, that exception will be propogated to the parent process.

opts may contain the following keys, as either strings or symbols

object : specify the slave object.  otherwise block value is used.
socket_creation_attempts : specify how many attempts to create a unix domain socket will be made 
debug : turn on some logging to STDERR
psname : specify the name that will appear in 'top' ($0)
at_exit : specify a lambda to be called in the *parent* when the child dies
dumped : specify that the slave object should *not* be DRbUndumped (default is DRbUndumped) 
threadsafe : wrap the slave object with ThreadSafe to implement gross thread safety
# File lib/slave-1.2.1.rb, line 318
    def initialize opts = {}, &block
#--{{{
      getopt = getopts opts

      @obj = getopt['object']
      @socket_creation_attempts = getopt['socket_creation_attempts'] || default('socket_creation_attempts')
      @debug = getopt['debug'] || default('debug')
      @psname = getopt['psname']
      @at_exit = getopt['at_exit']
      @dumped = getopt['dumped']
      @threadsafe = getopt['threadsafe'] || default('threadsafe')

      raise ArgumentError, 'no slave object or slave object block provided!' if 
        @obj.nil? and block.nil?

      @shutdown = false
      @waiter = @status = nil
      @lifeline = LifeLine.new

      # weird syntax because dot/rdoc chokes on this!?!?
      init_failure = lambda do |e|
        trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
        o = Object.new
        class << o
          attr_accessor '__slave_object_failure__'
        end
        o.__slave_object_failure__ = Marshal.dump [e.class, e.message, e.backtrace]
        @object = o
      end

    #
    # child
    #
      unless((@pid = Slave::fork))
        e = nil
        begin
          Kernel.at_exit{ Kernel.exit! }
          @lifeline.catch

          if @obj
            @object = @obj
          else
            begin
              @object = block.call 
            rescue Exception => e
              init_failure[e]
            end
          end

          if block and @obj
            begin
              block[@obj]
            rescue Exception => e
              init_failure[e]
            end
          end

          $0 = (@psname ||= gen_psname(@object))

          unless @dumped or @object.respond_to?('__slave_object_failure__')
            @object.extend DRbUndumped
          end

          if @threadsafe
            @object = ThreadSafe.new @object
          end

          @ppid, @pid = Process::ppid, Process::pid
          @socket = nil
          @uri = nil

          tmpdir, basename = Dir::tmpdir, File::basename(@psname)

          @socket_creation_attempts.times do |attempt|
            se = nil
            begin
              s = File::join(tmpdir, "#{ basename }_#{ attempt }_#{ rand }")
              u = "drbunix://#{ s }"
              DRb::start_service u, @object 
              @socket = s
              @uri = u
              trace{ "child - socket <#{ @socket }>" }
              trace{ "child - uri <#{ @uri }>" }
              break
            rescue Errno::EADDRINUSE => se
              nil
            end
          end

          if @socket and @uri
            trap('SIGUSR2') do
              DBb::thread.kill rescue nil
              FileUtils::rm_f @socket rescue nil
              exit
            end

            @lifeline.puts @socket 
            @lifeline.cling
          else
            @lifeline.release
            warn "slave(#{ $$ }) could not create socket!"
            exit
          end
        rescue Exception => e
          trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
        ensure
          status = e.respond_to?('status') ? e.status : 1
          exit(status)
        end
    #
    # parent
    #
      else
        detach
        @lifeline.throw

        buf = @lifeline.gets
        raise "failed to find slave socket" if buf.nil? or buf.strip.empty?
        @socket = buf.strip
        trace{ "parent - socket <#{ @socket }>" }

        if @at_exit
          @at_exit_thread = @lifeline.on_cut{ 
            @at_exit.respond_to?('call') ? @at_exit.call(self) : send(@at_exit.to_s, self)
          }
        end

        if @socket and File::exist? @socket
          Kernel.at_exit{ FileUtils::rm_f @socket }
          @uri = "drbunix://#{ socket }"
          trace{ "parent - uri <#{ @uri }>" }
        #
        # starting drb on localhost avoids dns lookups!
        #
          DRb::start_service('druby://localhost:0', nil) unless DRb::thread
          @object = DRbObject::new nil, @uri
          if @object.respond_to? '__slave_object_failure__'
            c, m, bt = Marshal.load @object.__slave_object_failure__
            (e = c.new(m)).set_backtrace bt
            trace{ %Q[#{ e.message } (#{ e.class })\n#{ e.backtrace.join "\n" }] }
            raise e 
          end
          @psname ||= gen_psname(@object)
        else
          raise "failed to find slave socket <#{ @socket }>"
        end
      end
#--}}}
    end
object(opts = {}) click to toggle source

a simple convenience method which returns an object from another process. the object returned is the result of the supplied block. eg

object = Slave.object{ processor_intensive_object_built_in_child_process() }

eg.

the call can be made asynchronous via the 'async'/:async keyword

thread = Slave.object(:async=>true){ long_processor_intensive_object_built_in_child_process() }

# go on about your coding business then, later

object = thread.value
# File lib/slave-1.2.1.rb, line 586
    def self.object opts = {}, &b
#--{{{
      async = opts.delete('async') || opts.delete(:async) 

      opts['object'] = opts[:object] = lambda(&b)
      opts['dumped'] = opts[:dumped] = true 

      slave = Slave.new opts

      value = lambda do |slave|
        begin
          slave.object.call
        ensure
          slave.shutdown
        end
      end

      async ? Thread.new{ value[slave] } : value[slave] 
#--}}}
    end
version() click to toggle source
# File lib/slave-1.2.1.rb, line 44
def self.version() VERSION end

Public Instance Methods

default(key) click to toggle source

see docs for Slave.default

# File lib/slave-1.2.1.rb, line 545
    def default key
#--{{{
      self.class.default key
#--}}}
    end
detach() click to toggle source

starts a thread to collect the child status and sets up at_exit handler to prevent zombies. the at_exit handler is canceled if the thread is able to collect the status

# File lib/slave-1.2.1.rb, line 472
    def detach
#--{{{
      reap = lambda do |cid|
        begin
          @status = Process::waitpid2(cid).last
        rescue Exception => e 
          m, c, b = e.message, e.class, e.backtrace.join("\n")
          warn "#{ m } (#{ c })\n#{ b }"  unless e.is_a? Errno::ECHILD
        end
      end

      Kernel.at_exit do
        shutdown rescue nil
        reap[@pid] rescue nil
      end

      @waiter = 
        Thread.new do
          begin
            @status = Process::waitpid2(@pid).last
          ensure
            reap = lambda{|cid| 'no-op' }
          end
        end
#--}}}
    end
gen_psname(obj) click to toggle source

generate a default name to appear in ps/top

# File lib/slave-1.2.1.rb, line 537
    def gen_psname obj
#--{{{
      "slave_#{ obj.class }_#{ obj.object_id }_#{ Process::ppid }_#{ Process::pid }".downcase.gsub(%r/\s+/,'_')
#--}}}
    end
getopts(opts) click to toggle source

see docs for Slave.getopts

# File lib/slave-1.2.1.rb, line 553
    def getopts opts 
#--{{{
      self.class.getopts opts 
#--}}}
    end
shutdown(opts = {}) click to toggle source

cuts the lifeline and kills the child process - give the key 'quiet' to ignore errors shutting down, including having already shutdown

# File lib/slave-1.2.1.rb, line 516
    def shutdown opts = {}
#--{{{
      quiet = getopts(opts)['quiet']
      raise "already shutdown" if @shutdown unless quiet
      begin; Process::kill 'SIGUSR2', @pid; rescue Exception => e; end
      begin; @lifeline.cut; rescue Exception; end
      raise e if e unless quiet
      @shutdown = true
#--}}}
    end
shutdown?() click to toggle source

true

# File lib/slave-1.2.1.rb, line 529
    def shutdown?
#--{{{
      @shutdown
#--}}}
    end
trace() { || ... } click to toggle source

debugging output - ENV=1 to enable

# File lib/slave-1.2.1.rb, line 561
    def trace
#--{{{
      if @debug 
        STDERR.puts yield
        STDERR.flush
      end
#--}}}
    end
wait(opts = {}) click to toggle source

wait for slave to finish. if the keyword 'non_block'=>true is given a thread is returned to do the waiting in an async fashion. eg

thread = slave.wait(:non_block=>true){|value| "background <#{ value }>"}
# File lib/slave-1.2.1.rb, line 504
    def wait opts = {}, &b
#--{{{
      b ||= lambda{|exit_status|}
      non_block = getopts(opts)['non_block']
      non_block ? Thread.new{ b[ @waiter.value ] } : b[ @waiter.value ]
#--}}}
    end
Also aliased as: wait2, wait2
wait2(opts = {})
Alias for: wait