class DRbQS::Worker::SimpleForkedProcess
Public Class Methods
new(io_r, io_w)
click to toggle source
# File lib/drbqs/worker/forked_process.rb, line 4 def initialize(io_r, io_w) @io_r = io_r @io_w = io_w @queue = [] @special_task_number = 0 end
Public Instance Methods
calculate(marshal_obj, method_sym, args)
click to toggle source
# File lib/drbqs/worker/forked_process.rb, line 11 def calculate(marshal_obj, method_sym, args) DRbQS::Task.execute_task(marshal_obj, method_sym, args) end
start()
click to toggle source
# File lib/drbqs/worker/forked_process.rb, line 34 def start unpacker = DRbQS::Worker::Serialize::Unpacker.new loop do if @queue.empty? begin chunk = @io_r.readpartial(READ_BYTE_SIZE) unpacker.feed_each(chunk) do |ary| @queue << ary end rescue EOFError break end else obj = @queue.shift case obj when Array handle_task(obj) when :prepare_to_exit send_response([:finish_preparing_to_exit]) when :exit break else send_response([:node_error, "Invalid object from server."]) end end end end
Private Instance Methods
handle_task(obj)
click to toggle source
# File lib/drbqs/worker/forked_process.rb, line 21 def handle_task(obj) task_id, marshal_obj, method_sym, args = obj begin res = calculate(marshal_obj, method_sym, args) if task_id send_response([:result, [task_id, res]]) end rescue => err send_response([:node_error, err]) end end
send_response(obj)
click to toggle source
# File lib/drbqs/worker/forked_process.rb, line 15 def send_response(obj) @io_w.print DRbQS::Worker::Serialize.dump(obj) @io_w.flush end