class Avro::DataFile::Reader

Read files written by DataFileWriter

Attributes

block_count[RW]
block_decoder[R]

The binary decoder for the contents of a block (after codec decompression)

codec[R]
datum_reader[R]
decoder[R]

The reader and binary decoder for the raw file stream

file_length[R]
meta[R]
reader[R]

The reader and binary decoder for the raw file stream

sync_marker[R]

Public Class Methods

new(reader, datum_reader) click to toggle source
    # File lib/avro/data_file.rb
215 def initialize(reader, datum_reader)
216   @reader = reader
217   @decoder = IO::BinaryDecoder.new(reader)
218   @datum_reader = datum_reader
219 
220   # read the header: magic, meta, sync
221   read_header
222 
223   @codec = DataFile.get_codec(meta['avro.codec'])
224 
225   # get ready to read
226   @block_count = 0
227   datum_reader.writers_schema = Schema.parse meta['avro.schema']
228 end

Public Instance Methods

close() click to toggle source
    # File lib/avro/data_file.rb
253 def close
254   reader.close
255 end
each() { |datum| ... } click to toggle source

Iterates through each datum in this file TODO(jmhodges): handle block of length zero

    # File lib/avro/data_file.rb
232 def each
233   loop do
234     if block_count == 0
235       case
236       when eof?; break
237       when skip_sync
238         break if eof?
239         read_block_header
240       else
241         read_block_header
242       end
243     end
244 
245     datum = datum_reader.read(block_decoder)
246     self.block_count -= 1
247     yield(datum)
248   end
249 end
eof?() click to toggle source
    # File lib/avro/data_file.rb
251 def eof?; reader.eof?; end

Private Instance Methods

read_block_header() click to toggle source
    # File lib/avro/data_file.rb
280 def read_block_header
281   self.block_count = decoder.read_long
282   block_bytes = decoder.read_long
283   data = codec.decompress(reader.read(block_bytes))
284   @block_decoder = IO::BinaryDecoder.new(StringIO.new(data))
285 end
read_header() click to toggle source
    # File lib/avro/data_file.rb
258 def read_header
259   # seek to the beginning of the file to get magic block
260   reader.seek(0, 0)
261 
262   # check magic number
263   magic_in_file = reader.read(MAGIC_SIZE)
264   if magic_in_file.size < MAGIC_SIZE
265     msg = 'Not an Avro data file: shorter than the Avro magic block'
266     raise DataFileError, msg
267   elsif magic_in_file != MAGIC
268     msg = "Not an Avro data file: #{magic_in_file.inspect} doesn't match #{MAGIC.inspect}"
269     raise DataFileError, msg
270   end
271 
272   # read metadata
273   @meta = datum_reader.read_data(META_SCHEMA,
274                                  META_SCHEMA,
275                                  decoder)
276   # read sync marker
277   @sync_marker = reader.read(SYNC_SIZE)
278 end
skip_sync() click to toggle source

read the length of the sync marker; if it matches the sync marker, return true. Otherwise, seek back to where we started and return false

    # File lib/avro/data_file.rb
290 def skip_sync
291   proposed_sync_marker = reader.read(SYNC_SIZE)
292   if proposed_sync_marker != sync_marker
293     reader.seek(-SYNC_SIZE, 1)
294     false
295   else
296     true
297   end
298 end