class Spark::Serializer::Batched

Attributes

serializer[W]

Public Class Methods

new(serializer, batch_size=nil) click to toggle source
# File lib/spark/serializer/batched.rb, line 7
def initialize(serializer, batch_size=nil)
  batch_size ||= Spark::Serializer::DEFAULT_BATCH_SIZE

  @serializer = serializer
  @batch_size = batch_size.to_i

  error('Batch size must be greater than 0') if @batch_size < 1
end

Public Instance Methods

batched?() click to toggle source

Really batched

# File lib/spark/serializer/batched.rb, line 17
def batched?
  @batch_size > 1
end
dump(data) click to toggle source
# File lib/spark/serializer/batched.rb, line 29
def dump(data)
  @serializer.dump(data)
end
dump_to_io(data, io) click to toggle source

Dump ==============================================================

# File lib/spark/serializer/batched.rb, line 44
def dump_to_io(data, io)
  check_each(data)

  if batched?
    data = data.each_slice(@batch_size)
  end

  data.each do |item|
    serialized = dump(item)
    io.write_string(serialized)
  end

  io.flush
end
load(data) click to toggle source
# File lib/spark/serializer/batched.rb, line 25
def load(data)
  @serializer.load(data)
end
load_from_io(io) { |item| ... } click to toggle source

Load ==============================================================

# File lib/spark/serializer/batched.rb, line 62
def load_from_io(io)
  return to_enum(__callee__, io) unless block_given?

  loop do
    size = io.read_int_or_eof
    break if size == Spark::Constant::DATA_EOF

    data = io.read(size)
    data = load(data)

    if batched?
      data.each{|item| yield item }
    else
      yield data
    end
  end
end
name() click to toggle source
# File lib/spark/serializer/batched.rb, line 33
def name
  "Batched(#{@batch_size})"
end
to_s() click to toggle source
# File lib/spark/serializer/batched.rb, line 37
def to_s
  "#{name} -> #{@serializer}"
end
unbatch!() click to toggle source
# File lib/spark/serializer/batched.rb, line 21
def unbatch!
  @batch_size = 1
end