class Spark::Broadcast

Broadcast a read-only variable to the cluster, returning a Spark::Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.

Example:

broadcast1 = $sc.broadcast('a')
broadcast2 = $sc.broadcast('b')
broadcast3 = $sc.broadcast([1,2,3])

func = Proc.new do |part, index|
  [
    broadcast1.value * index,
    broadcast2.value * index,
    broadcast3.value.reduce(:+)
  ]
end

rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(broadcast1: broadcast1, broadcast2: broadcast2, broadcast3: broadcast3)
rdd = rdd.map_partitions_with_index(func)
rdd.collect
# => ["", "", 6, "a", "b", 6, "aa", "bb", 6, "aaa", "bbb", 6]

Constants

LOADED
NOT_LOADED
WITHOUT_PATH

Attributes

id[R]
jbroadcast[R]
path[R]
state[R]

Public Class Methods

new(sc, value) click to toggle source

Create new Broadcast and dump value to the disk

b = $sc.broadcast('a')

b.value # => 'a'
b.path
b.jbroadcast
# File lib/spark/broadcast.rb, line 48
def initialize(sc, value)
  @id = object_id
  @value = value
  @state = LOADED

  file = Tempfile.create('broadcast', sc.temp_dir)
  file.binmode
  file.write(Marshal.dump(value))
  file.close

  @path = file.path
  @jbroadcast = RubyRDD.readBroadcastFromFile(sc.jcontext, @path, Spark.jb.to_long(@id))

  ObjectSpace.define_finalizer(self, proc { File.unlink(@path) })
end
register(id, path) click to toggle source
# File lib/spark/broadcast.rb, line 71
def self.register(id, path)
  @@registered[id] = path
end

Public Instance Methods

inspect() click to toggle source
# File lib/spark/broadcast.rb, line 64
def inspect
  result  = %{#<#{self.class.name}:0x#{object_id}\n}
  result << %{   ID: #{@id}\n}
  result << %{Value: #{@value.to_s[0, 10]}>}
  result
end
marshal_dump() click to toggle source
# File lib/spark/broadcast.rb, line 95
def marshal_dump
  @id
end
marshal_load(id) click to toggle source
# File lib/spark/broadcast.rb, line 99
def marshal_load(id)
  @id = id
  @state = WITHOUT_PATH
end
value() click to toggle source
# File lib/spark/broadcast.rb, line 75
def value
  case state
  when LOADED
    @value
  when NOT_LOADED
    @value = Marshal.load(File.read(@path))
    @state = LOADED
    @value
  when WITHOUT_PATH
    @path = @@registered[id]

    if @path
      @state = NOT_LOADED
      value
    else
      raise Spark::BroadcastError, "Broadcast #{@id} do not have registered path."
    end
  end
end