class Spark::Accumulator
A shared variable that can be accumulated, i.e., has a commutative and associative “add” operation. Worker
tasks on a Spark
cluster can add values to an Accumulator
with the `+=` operator, but only the driver program is allowed to access its value, using value. Updates from the workers get propagated automatically to the driver program.
Arguments:¶ ↑
- value
-
Initial value for accumulator. This values is stored only on driver process
accum_param
-
How merge 2 value on worker or driver process. Symbol or Proc (or String)
zero_value
-
Initial value for worker process
Examples:¶ ↑
accum1 = $sc.accumulator(1) accum2 = $sc.accumulator(2, :*, 1) accum3 = $sc.accumulator(3, lambda{|max, val| val > max ? val : max}) accum1 += 1 accum2.add(2) accum2.add(2) accum2.add(2) accum3.add(9) accum3.add(6) accum3.add(7) accum1.value # => 2 accum2.value # => 16 accum3.value # => 9 func = Proc.new do |_, index| accum1.add(1) accum2.add(2) accum3.add(index * 10) end rdd = $sc.parallelize(0..4, 4) rdd = rdd.bind(accum1: accum1, accum2: accum2, accum3: accum3) rdd = rdd.map_partitions_with_index(func) rdd.collect accum1.value # => 6 accum2.value # => 256 accum3.value # => 30
Constants
- SUPPORTED_SYMBOLS
Attributes
accum_param[R]
id[R]
value[R]
zero_value[R]
Public Class Methods
changed()
click to toggle source
# File lib/spark/accumulator.rb, line 88 def self.changed @@changed end
instances()
click to toggle source
# File lib/spark/accumulator.rb, line 92 def self.instances @@instances end
new(value, accum_param=:+, zero_value=0)
click to toggle source
¶ ↑
Creating and selecting Spark::Accumulator
# File lib/spark/accumulator.rb, line 68 def initialize(value, accum_param=:+, zero_value=0) @id = object_id @value = value @accum_param = accum_param @zero_value = zero_value @driver = true valid_accum_param @@instances[@id] = self end
Public Instance Methods
+(term)
click to toggle source
# File lib/spark/accumulator.rb, line 147 def +(term) add(term) self end
add(term)
click to toggle source
add_by_symbol(term)
click to toggle source
# File lib/spark/accumulator.rb, line 152 def add_by_symbol(term) case @accum_param when :+ @value += term when :- @value -= term when :* @value *= term when :/ @value /= term when :** @value **= term end end
driver?()
click to toggle source
Driver process or worker
# File lib/spark/accumulator.rb, line 127 def driver? @driver end
inspect()
click to toggle source
# File lib/spark/accumulator.rb, line 80 def inspect result = %{#<#{self.class.name}:0x#{object_id}\n} result << %{ ID: #{@id}\n} result << %{ Zero: #{@zero_value.to_s[0, 10]}\n} result << %{Value: #{@value.to_s[0, 10]}>} result end
load_accum_param()
click to toggle source
# File lib/spark/accumulator.rb, line 183 def load_accum_param if @serialized_accum_param.is_a?(String) @accum_param = eval(@serialized_accum_param) else @accum_param = @serialized_accum_param end end
marshal_dump()
click to toggle source
marshal_load(array)
click to toggle source
# File lib/spark/accumulator.rb, line 175 def marshal_load(array) @id, @zero_value, @serialized_accum_param = array @value = @zero_value @driver = false load_accum_param end
valid_accum_param()
click to toggle source
# File lib/spark/accumulator.rb, line 96 def valid_accum_param if @accum_param.is_a?(Symbol) raise Spark::AccumulatorError, "Unsupported symbol #{@accum_param}" unless SUPPORTED_SYMBOLS.include?(@accum_param) @serialized_accum_param = @accum_param return end if @accum_param.is_a?(Proc) begin @serialized_accum_param = @accum_param.to_source return rescue raise Spark::SerializeError, 'Proc can not be serialized. Use String instead.' end end if @accum_param.is_a?(String) @serialized_accum_param = @accum_param @accum_param = eval(@accum_param) unless @accum_param.is_a?(Proc) raise Spark::SerializeError, 'Yours param is not a Proc.' end return end raise Spark::AccumulatorError, 'Unsupported param. Use Symbol, Proc or String.' end