class Spark::Serializer::Batched
Attributes
serializer[W]
Public Class Methods
new(serializer, batch_size=nil)
click to toggle source
# File lib/spark/serializer/batched.rb, line 7 def initialize(serializer, batch_size=nil) batch_size ||= Spark::Serializer::DEFAULT_BATCH_SIZE @serializer = serializer @batch_size = batch_size.to_i error('Batch size must be greater than 0') if @batch_size < 1 end
Public Instance Methods
batched?()
click to toggle source
Really batched
# File lib/spark/serializer/batched.rb, line 17 def batched? @batch_size > 1 end
dump(data)
click to toggle source
# File lib/spark/serializer/batched.rb, line 29 def dump(data) @serializer.dump(data) end
dump_to_io(data, io)
click to toggle source
Dump ==============================================================¶ ↑
# File lib/spark/serializer/batched.rb, line 44 def dump_to_io(data, io) check_each(data) if batched? data = data.each_slice(@batch_size) end data.each do |item| serialized = dump(item) io.write_string(serialized) end io.flush end
load(data)
click to toggle source
# File lib/spark/serializer/batched.rb, line 25 def load(data) @serializer.load(data) end
load_from_io(io) { |item| ... }
click to toggle source
Load ==============================================================¶ ↑
# File lib/spark/serializer/batched.rb, line 62 def load_from_io(io) return to_enum(__callee__, io) unless block_given? loop do size = io.read_int_or_eof break if size == Spark::Constant::DATA_EOF data = io.read(size) data = load(data) if batched? data.each{|item| yield item } else yield data end end end
name()
click to toggle source
# File lib/spark/serializer/batched.rb, line 33 def name "Batched(#{@batch_size})" end
to_s()
click to toggle source
# File lib/spark/serializer/batched.rb, line 37 def to_s "#{name} -> #{@serializer}" end
unbatch!()
click to toggle source
# File lib/spark/serializer/batched.rb, line 21 def unbatch! @batch_size = 1 end