class DRbQS::Worker::SimpleForkedProcess

Public Class Methods

new(io_r, io_w) click to toggle source
# File lib/drbqs/worker/forked_process.rb, line 4
def initialize(io_r, io_w)
  @io_r = io_r
  @io_w = io_w
  @queue = []
  @special_task_number = 0
end

Public Instance Methods

calculate(marshal_obj, method_sym, args) click to toggle source
# File lib/drbqs/worker/forked_process.rb, line 11
def calculate(marshal_obj, method_sym, args)
  DRbQS::Task.execute_task(marshal_obj, method_sym, args)
end
start() click to toggle source
# File lib/drbqs/worker/forked_process.rb, line 34
def start
  unpacker = DRbQS::Worker::Serialize::Unpacker.new
  loop do
    if @queue.empty?
      begin
        chunk = @io_r.readpartial(READ_BYTE_SIZE)
        unpacker.feed_each(chunk) do |ary|
          @queue << ary
        end
      rescue EOFError
        break
      end
    else
      obj = @queue.shift
      case obj
      when Array
        handle_task(obj)
      when :prepare_to_exit
        send_response([:finish_preparing_to_exit])
      when :exit
        break
      else
        send_response([:node_error, "Invalid object from server."])
      end  
    end
  end
end

Private Instance Methods

handle_task(obj) click to toggle source
# File lib/drbqs/worker/forked_process.rb, line 21
def handle_task(obj)
  task_id, marshal_obj, method_sym, args = obj
  begin
    res = calculate(marshal_obj, method_sym, args)
    if task_id
      send_response([:result, [task_id, res]])
    end
  rescue => err
    send_response([:node_error, err])
  end
end
send_response(obj) click to toggle source
# File lib/drbqs/worker/forked_process.rb, line 15
def send_response(obj)
  @io_w.print DRbQS::Worker::Serialize.dump(obj)
  @io_w.flush
end