class Worker::Base
Attributes
socket[RW]
Public Class Methods
new(port)
click to toggle source
# File lib/spark/worker/worker.rb, line 27 def initialize(port) # Open socket to Spark @socket = TCPSocket.open('localhost', port) # Send back worker ID socket.write_long(id) end
Public Instance Methods
run()
click to toggle source
# File lib/spark/worker/worker.rb, line 35 def run begin compute rescue => e send_error(e) else successful_finish end end
Private Instance Methods
before_end()
click to toggle source
# File lib/spark/worker/worker.rb, line 51 def before_end # Should be implemented in sub-classes end
before_start()
click to toggle source
# File lib/spark/worker/worker.rb, line 47 def before_start # Should be implemented in sub-classes end
compute()
click to toggle source
These methods must be on one method because iterator is Lazy which mean that exception can be raised at `serializer` or `compute`
# File lib/spark/worker/worker.rb, line 57 def compute before_start # Load split index @split_index = socket.read_int # Load files SparkFiles.root_directory = socket.read_string # Load broadcast count = socket.read_int count.times do Spark::Broadcast.register(socket.read_long, socket.read_string) end # Load command @command = socket.read_data # Load iterator @iterator = @command.deserializer.load_from_io(socket).lazy # Compute @iterator = @command.execute(@iterator, @split_index) # Result is not iterable @iterator = [@iterator] unless @iterator.respond_to?(:each) # Send result @command.serializer.dump_to_io(@iterator, socket) end
log(message=nil)
click to toggle source
# File lib/spark/worker/worker.rb, line 131 def log(message=nil) return if !$DEBUG $stdout.puts %{==> #{Time.now.strftime('%H:%M:%S')} [#{id}] #{message}} $stdout.flush end
send_error(e)
click to toggle source
# File lib/spark/worker/worker.rb, line 88 def send_error(e) # Flag socket.write_int(WORKER_ERROR) # Message socket.write_string(e.message) # Backtrace socket.write_int(e.backtrace.size) e.backtrace.each do |item| socket.write_string(item) end socket.flush # Wait for spark # Socket is closed before throwing an exception # Singal that ruby exception was fully received until socket.closed? sleep(0.1) end # Depend on type of worker kill_worker end
successful_finish()
click to toggle source
# File lib/spark/worker/worker.rb, line 114 def successful_finish # Finish socket.write_int(WORKER_DONE) # Send changed accumulator changed = Spark::Accumulator.changed socket.write_int(changed.size) changed.each do |accumulator| socket.write_data([accumulator.id, accumulator.value]) end # Send it socket.flush before_end end