class Spark::Command

Container which includes all commands and other things for worker Every RDD have own copy of Command

Attributes

bound_objects[RW]
commands[RW]
deserializer[RW]
libraries[RW]
serializer[RW]

Public Class Methods

new() click to toggle source
# File lib/spark/command.rb, line 10
def initialize
  @serializer = nil
  @deserializer = nil
  @commands = []
  @libraries = []
  @bound_objects = {}
end

Public Instance Methods

execute(iterator, split_index) click to toggle source
# File lib/spark/command.rb, line 18
def execute(iterator, split_index)
  # Require necessary libraries
  libraries.each{|lib| require lib}

  # Prepare bound objects
  @commands.each do |command|
    command.__objects__ = bound_objects
  end

  # Prepare for running
  @commands.each(&:prepare)

  # Run all task
  @commands.each do |command|
    iterator = command.execute(iterator, split_index)
  end

  # Return changed iterator. This is not be necessary for some tasks
  # because of using inplace changing but some task can return
  # only one value (for example reduce).
  iterator
end
last() click to toggle source
# File lib/spark/command.rb, line 41
def last
  @commands.last
end
marshal_dump() click to toggle source

Bound objects can depend on library which is loaded during @execute In that case worker raise “undefined class/module”

# File lib/spark/command.rb, line 61
def marshal_dump
  [@serializer, @deserializer, @commands, @libraries, serialized_bound_objects]
end
marshal_load(array) click to toggle source
# File lib/spark/command.rb, line 65
def marshal_load(array)
  @serializer = array.shift
  @deserializer = array.shift
  @commands = array.shift
  @libraries = array.shift
  @serialized_bound_objects = array.shift
end

Private Instance Methods

serialized_bound_objects() click to toggle source
# File lib/spark/command.rb, line 75
def serialized_bound_objects
  @serialized_bound_objects ||= Marshal.dump(@bound_objects)
end