class Spark::Context

Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.

Attributes

jaccumulator[R]
jcontext[R]
temp_dir[R]

Public Class Methods

new() click to toggle source

Constructor for Ruby context. Configuration is automatically is taken from Spark. Config will be automatically set to default if user start context first.

# File lib/spark/context.rb, line 21
def initialize
  Spark.config.valid!
  @jcontext = JavaSparkContext.new(Spark.config.spark_conf)
  @jcontext.addJar(Spark.ruby_spark_jar)

  # Does not work on 1.2
  # ui.attachTab(RubyTab.new(ui, to_java_hash(RbConfig::CONFIG)))

  spark_local_dir = JUtils.getLocalDir(sc.conf)
  @temp_dir = JUtils.createTempDir(spark_local_dir, 'ruby').getAbsolutePath

  accum_server = Spark::Accumulator::Server
  accum_server.start
  @jaccumulator = @jcontext.accumulator(ArrayList.new, RubyAccumulatorParam.new(accum_server.host, accum_server.port))

  log_info("Ruby accumulator server is running on port #{accum_server.port}")

  set_call_site('Ruby') # description of stage
end

Public Instance Methods

accumulator(value, accum_param=:+, zero_value=0) click to toggle source

Create an Accumulator with the given initial value, using a given accum_param helper object to define how to add values of the data type if provided.

Example:

accum = $sc.accumulator(7)

rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(accum: accum)
rdd = rdd.map_partitions(lambda{|_| accum.add(1) })
rdd = rdd.collect

accum.value
# => 11
# File lib/spark/context.rb, line 188
def accumulator(value, accum_param=:+, zero_value=0)
  Spark::Accumulator.new(value, accum_param, zero_value)
end
addFile(*files)
Alias for: add_file
add_file(*files) click to toggle source

Add a file to be downloaded with this Spark job on every node. The path of file passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI.

To access the file in Spark jobs, use `SparkFiles.get(file_name)` with the filename to find its download location.

Example:

`echo 10 > test.txt`

$sc.add_file('test.txt')
$sc.parallelize(0..5).map(lambda{|x| x * SparkFiles.get_content('test.txt').to_i}).collect
# => [0, 10, 20, 30, 40, 50]
# File lib/spark/context.rb, line 149
def add_file(*files)
  files.each do |file|
    sc.addFile(file)
  end
end
Also aliased as: addFile
broadcast(value) click to toggle source

Broadcast a read-only variable to the cluster, returning a Spark::Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.

Example:

broadcast1 = $sc.broadcast('a')
broadcast2 = $sc.broadcast('b')

rdd = $sc.parallelize(0..5, 4)
rdd = rdd.bind(broadcast1: broadcast1, broadcast2: broadcast2)
rdd = rdd.map_partitions_with_index(lambda{|part, index| [broadcast1.value * index, broadcast2.value * index] })
rdd.collect
# => ["", "", "a", "b", "aa", "bb", "aaa", "bbb"]
# File lib/spark/context.rb, line 169
def broadcast(value)
  Spark::Broadcast.new(self, value)
end
clearCallSite()
Alias for: clear_call_site
clear_call_site() click to toggle source
# File lib/spark/context.rb, line 120
def clear_call_site
  jcontext.clearCallSite
end
Also aliased as: clearCallSite
config(key=nil) click to toggle source

Return a copy of this SparkContext's configuration. The configuration cannot be changed at runtime.

# File lib/spark/context.rb, line 127
def config(key=nil)
  if key
    Spark.config.get(key)
  else
    Spark.config
  end
end
defaultParallelism()
Alias for: default_parallelism
default_batch_size() click to toggle source
# File lib/spark/context.rb, line 92
def default_batch_size
  size = config('spark.ruby.serializer.batch_size').to_i
  if size >= 1
    size
  else
    'auto'
  end
end
default_parallelism() click to toggle source

Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD)

# File lib/spark/context.rb, line 63
def default_parallelism
  sc.defaultParallelism
end
Also aliased as: defaultParallelism
default_serializer() click to toggle source

Default serializer

Batch -> Compress -> Basic

# File lib/spark/context.rb, line 71
def default_serializer
  # Basic
  serializer = Spark::Serializer.find!(config('spark.ruby.serializer')).new

  # Compress
  if config('spark.ruby.serializer.compress')
    serializer = Spark::Serializer.compressed(serializer)
  end

  # Bactching
  batch_size = default_batch_size
  if batch_size == 'auto'
    serializer = Spark::Serializer.auto_batched(serializer)
  else
    serializer = Spark::Serializer.batched(serializer, batch_size)
  end

  # Finally, "container" contains serializers
  serializer
end
getLocalProperty(key)
Alias for: get_local_property
get_local_property(key) click to toggle source

Get a local property set in this thread, or null if it is missing

# File lib/spark/context.rb, line 110
def get_local_property(key)
  jcontext.getLocalProperty(key)
end
Also aliased as: getLocalProperty
inspect() click to toggle source
# File lib/spark/context.rb, line 41
def inspect
  result  = %{#<#{self.class.name}:0x#{object_id}\n}
  result << %{Tempdir: "#{temp_dir}">}
  result
end
parallelize(data, num_slices=nil, serializer=nil) click to toggle source

Distribute a local Ruby collection to form an RDD Direct method can be slow so be careful, this method update data inplace

Parameters:

data

Range or Array

num_slices

number of slice

serializer

custom serializer (default: serializer based on configuration)

Examples:

$sc.parallelize(["1", "2", "3"]).map(lambda{|x| x.to_i}).collect
#=> [1, 2, 3]

$sc.parallelize(1..3).map(:to_s).collect
#=> ["1", "2", "3"]
# File lib/spark/context.rb, line 207
def parallelize(data, num_slices=nil, serializer=nil)
  num_slices ||= default_parallelism
  serializer ||= default_serializer

  serializer.check_each(data)

  # Through file
  file = Tempfile.new('to_parallelize', temp_dir)
  serializer.dump_to_io(data, file)
  file.close # not unlink
  jrdd = RubyRDD.readRDDFromFile(jcontext, file.path, num_slices)

  Spark::RDD.new(jrdd, self, serializer)
ensure
  file && file.unlink
end
runJob(rdd, f, partitions=nil, allow_local=false)
Alias for: run_job
runJobWithCommand(rdd, partitions, allow_local, command, *args)
run_job(rdd, f, partitions=nil, allow_local=false) click to toggle source

Executes the given partition function f on the specified set of partitions, returning the result as an array of elements.

If partitions is not specified, this will run over all partitions.

Example:

rdd = $sc.parallelize(0..10, 5)
$sc.run_job(rdd, lambda{|x| x.to_s}, [0,2])
# => ["[0, 1]", "[4, 5]"]
# File lib/spark/context.rb, line 278
def run_job(rdd, f, partitions=nil, allow_local=false)
  run_job_with_command(rdd, partitions, allow_local, Spark::Command::MapPartitions, f)
end
Also aliased as: runJob
run_job_with_command(rdd, partitions, allow_local, command, *args) click to toggle source

Execute the given command on specific set of partitions.

# File lib/spark/context.rb, line 284
def run_job_with_command(rdd, partitions, allow_local, command, *args)
  if !partitions.nil? && !partitions.is_a?(Array)
    raise Spark::ContextError, 'Partitions must be nil or Array'
  end

  partitions_size = rdd.partitions_size

  # Execute all parts
  if partitions.nil?
    partitions = (0...partitions_size).to_a
  end

  # Can happend when you use coalesce
  partitions.delete_if {|part| part >= partitions_size}

  # Rjb represent Fixnum as Integer but Jruby as Long
  partitions = to_java_array_list(convert_to_java_int(partitions))

  # File for result
  file = Tempfile.new('collect', temp_dir)

  mapped = rdd.new_rdd_from_command(command, *args)
  RubyRDD.runJob(rdd.context.sc, mapped.jrdd, partitions, allow_local, file.path)

  mapped.collect_from_file(file)
end
Also aliased as: runJobWithCommand
sc() click to toggle source
# File lib/spark/context.rb, line 53
def sc
  @jcontext.sc
end
setCallSite(site)
Alias for: set_call_site
setLocalProperty(key, value)
Alias for: set_local_property
set_call_site(site) click to toggle source

Support function for API backtraces.

# File lib/spark/context.rb, line 116
def set_call_site(site)
  jcontext.setCallSite(site)
end
Also aliased as: setCallSite
set_local_property(key, value) click to toggle source

Set a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.

# File lib/spark/context.rb, line 104
def set_local_property(key, value)
  jcontext.setLocalProperty(key, value)
end
Also aliased as: setLocalProperty
stop() click to toggle source
# File lib/spark/context.rb, line 47
def stop
  Spark::Accumulator::Server.stop
  log_info('Ruby accumulator server was stopped')
  @jcontext.stop
end
textFile(path, min_partitions=nil, encoding=Encoding::UTF_8, serializer=nil)

Aliases

Alias for: text_file
text_file(path, min_partitions=nil, encoding=Encoding::UTF_8, serializer=nil) click to toggle source

Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

Example:

f = Tempfile.new("test")
f.puts("1")
f.puts("2")
f.close

$sc.text_file(f.path).map(lambda{|x| x.to_i}).collect
# => [1, 2]
# File lib/spark/context.rb, line 236
def text_file(path, min_partitions=nil, encoding=Encoding::UTF_8, serializer=nil)
  min_partitions ||= default_parallelism
  serializer     ||= default_serializer
  deserializer     = Spark::Serializer.build { __text__(encoding) }

  Spark::RDD.new(@jcontext.textFile(path, min_partitions), self, serializer, deserializer)
end
Also aliased as: textFile
ui() click to toggle source
# File lib/spark/context.rb, line 57
def ui
  sc.ui
end
wholeTextFiles(path, min_partitions=nil, serializer=nil)
Alias for: whole_text_files
whole_text_files(path, min_partitions=nil, serializer=nil) click to toggle source

Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

Example:

dir = Dir.mktmpdir
f1 = Tempfile.new("test1", dir)
f2 = Tempfile.new("test2", dir)
f1.puts("1"); f1.puts("2");
f2.puts("3"); f2.puts("4");
f1.close
f2.close

$sc.whole_text_files(dir).flat_map(lambda{|key, value| value.split}).collect
# => ["1", "2", "3", "4"]
# File lib/spark/context.rb, line 260
def whole_text_files(path, min_partitions=nil, serializer=nil)
  min_partitions ||= default_parallelism
  serializer     ||= default_serializer
  deserializer     = Spark::Serializer.build{ __pair__(__text__, __text__) }

  Spark::RDD.new(@jcontext.wholeTextFiles(path, min_partitions), self, serializer, deserializer)
end
Also aliased as: wholeTextFiles