class Worker::Base

Attributes

socket[RW]

Public Class Methods

new(port) click to toggle source
# File lib/spark/worker/worker.rb, line 27
def initialize(port)
  # Open socket to Spark
  @socket = TCPSocket.open('localhost', port)

  # Send back worker ID
  socket.write_long(id)
end

Public Instance Methods

run() click to toggle source
# File lib/spark/worker/worker.rb, line 35
def run
  begin
    compute
  rescue => e
    send_error(e)
  else
    successful_finish
  end
end

Private Instance Methods

before_end() click to toggle source
# File lib/spark/worker/worker.rb, line 51
def before_end
  # Should be implemented in sub-classes
end
before_start() click to toggle source
# File lib/spark/worker/worker.rb, line 47
def before_start
  # Should be implemented in sub-classes
end
compute() click to toggle source

These methods must be on one method because iterator is Lazy which mean that exception can be raised at `serializer` or `compute`

# File lib/spark/worker/worker.rb, line 57
def compute
  before_start

  # Load split index
  @split_index = socket.read_int

  # Load files
  SparkFiles.root_directory = socket.read_string

  # Load broadcast
  count = socket.read_int
  count.times do
    Spark::Broadcast.register(socket.read_long, socket.read_string)
  end

  # Load command
  @command = socket.read_data

  # Load iterator
  @iterator = @command.deserializer.load_from_io(socket).lazy

  # Compute
  @iterator = @command.execute(@iterator, @split_index)

  # Result is not iterable
  @iterator = [@iterator] unless @iterator.respond_to?(:each)

  # Send result
  @command.serializer.dump_to_io(@iterator, socket)
end
log(message=nil) click to toggle source
# File lib/spark/worker/worker.rb, line 131
def log(message=nil)
  return if !$DEBUG

  $stdout.puts %{==> #{Time.now.strftime('%H:%M:%S')} [#{id}] #{message}}
  $stdout.flush
end
send_error(e) click to toggle source
# File lib/spark/worker/worker.rb, line 88
def send_error(e)
  # Flag
  socket.write_int(WORKER_ERROR)

  # Message
  socket.write_string(e.message)

  # Backtrace
  socket.write_int(e.backtrace.size)
  e.backtrace.each do |item|
    socket.write_string(item)
  end

  socket.flush

  # Wait for spark
  # Socket is closed before throwing an exception
  # Singal that ruby exception was fully received
  until socket.closed?
    sleep(0.1)
  end

  # Depend on type of worker
  kill_worker
end
successful_finish() click to toggle source
# File lib/spark/worker/worker.rb, line 114
def successful_finish
  # Finish
  socket.write_int(WORKER_DONE)

  # Send changed accumulator
  changed = Spark::Accumulator.changed
  socket.write_int(changed.size)
  changed.each do |accumulator|
    socket.write_data([accumulator.id, accumulator.value])
  end

  # Send it
  socket.flush

  before_end
end