class Spark::PipelinedRDD

Pipelined Resilient Distributed Dataset, operations are pipelined and sended to worker

RDD
`-- map
    `-- map
        `-- map

Code is executed from top to bottom

Attributes

command[R]
prev_jrdd[R]

Public Class Methods

new(prev, command) click to toggle source
# File lib/spark/rdd.rb, line 1369
def initialize(prev, command)

  if prev.is_a?(PipelinedRDD) && prev.pipelinable?
    # Second, ... stages
    @prev_jrdd = prev.prev_jrdd
  else
    # First stage
    @prev_jrdd = prev.jrdd
  end

  @cached = false
  @checkpointed = false

  @context = prev.context
  @command = command
end

Public Instance Methods

jrdd() click to toggle source

Serialization necessary things and sent it to RubyRDD (scala extension)

# File lib/spark/rdd.rb, line 1391
def jrdd
  @jrdd ||= _jrdd
end
pipelinable?() click to toggle source
# File lib/spark/rdd.rb, line 1386
def pipelinable?
  !(cached? || checkpointed?)
end

Private Instance Methods

_jrdd() click to toggle source
# File lib/spark/rdd.rb, line 1397
def _jrdd
  command = @command.build

  broadcasts = @command.bound_objects.select{|_, value| value.is_a?(Spark::Broadcast)}.values
  broadcasts = to_java_array_list(broadcasts.map(&:jbroadcast))

  ruby_rdd = RubyRDD.new(@prev_jrdd.rdd, command, broadcasts, @context.jaccumulator)
  ruby_rdd.asJavaRDD
end