class Spark::Command
Container which includes all commands and other things for worker Every RDD
have own copy of Command
Attributes
bound_objects[RW]
commands[RW]
deserializer[RW]
libraries[RW]
serializer[RW]
Public Class Methods
new()
click to toggle source
# File lib/spark/command.rb, line 10 def initialize @serializer = nil @deserializer = nil @commands = [] @libraries = [] @bound_objects = {} end
Public Instance Methods
execute(iterator, split_index)
click to toggle source
# File lib/spark/command.rb, line 18 def execute(iterator, split_index) # Require necessary libraries libraries.each{|lib| require lib} # Prepare bound objects @commands.each do |command| command.__objects__ = bound_objects end # Prepare for running @commands.each(&:prepare) # Run all task @commands.each do |command| iterator = command.execute(iterator, split_index) end # Return changed iterator. This is not be necessary for some tasks # because of using inplace changing but some task can return # only one value (for example reduce). iterator end
last()
click to toggle source
# File lib/spark/command.rb, line 41 def last @commands.last end
marshal_dump()
click to toggle source
Bound objects can depend on library which is loaded during @execute In that case worker raise “undefined class/module”
# File lib/spark/command.rb, line 61 def marshal_dump [@serializer, @deserializer, @commands, @libraries, serialized_bound_objects] end
marshal_load(array)
click to toggle source
# File lib/spark/command.rb, line 65 def marshal_load(array) @serializer = array.shift @deserializer = array.shift @commands = array.shift @libraries = array.shift @serialized_bound_objects = array.shift end
Private Instance Methods
serialized_bound_objects()
click to toggle source
# File lib/spark/command.rb, line 75 def serialized_bound_objects @serialized_bound_objects ||= Marshal.dump(@bound_objects) end