class Avro::DataFile::Reader

Read files written by DataFileWriter

Attributes

block_count[RW]
block_decoder[R]

The binary decoder for the contents of a block (after codec decompression)

codec[R]
datum_reader[R]
decoder[R]

The reader and binary decoder for the raw file stream

file_length[R]
meta[R]
reader[R]

The reader and binary decoder for the raw file stream

sync_marker[R]

Public Class Methods

new(reader, datum_reader) click to toggle source
    # File lib/avro/data_file.rb
214 def initialize(reader, datum_reader)
215   @reader = reader
216   @decoder = IO::BinaryDecoder.new(reader)
217   @datum_reader = datum_reader
218 
219   # read the header: magic, meta, sync
220   read_header
221 
222   @codec = DataFile.get_codec(meta['avro.codec'])
223 
224   # get ready to read
225   @block_count = 0
226   datum_reader.writers_schema = Schema.parse meta['avro.schema']
227 end

Public Instance Methods

close() click to toggle source
    # File lib/avro/data_file.rb
252 def close
253   reader.close
254 end
each() { |datum| ... } click to toggle source

Iterates through each datum in this file TODO(jmhodges): handle block of length zero

    # File lib/avro/data_file.rb
231 def each
232   loop do
233     if block_count == 0
234       case
235       when eof?; break
236       when skip_sync
237         break if eof?
238         read_block_header
239       else
240         read_block_header
241       end
242     end
243 
244     datum = datum_reader.read(block_decoder)
245     self.block_count -= 1
246     yield(datum)
247   end
248 end
eof?() click to toggle source
    # File lib/avro/data_file.rb
250 def eof?; reader.eof?; end

Private Instance Methods

read_block_header() click to toggle source
    # File lib/avro/data_file.rb
279 def read_block_header
280   self.block_count = decoder.read_long
281   block_bytes = decoder.read_long
282   data = codec.decompress(reader.read(block_bytes))
283   @block_decoder = IO::BinaryDecoder.new(StringIO.new(data))
284 end
read_header() click to toggle source
    # File lib/avro/data_file.rb
257 def read_header
258   # seek to the beginning of the file to get magic block
259   reader.seek(0, 0)
260 
261   # check magic number
262   magic_in_file = reader.read(MAGIC_SIZE)
263   if magic_in_file.size < MAGIC_SIZE
264     msg = 'Not an Avro data file: shorter than the Avro magic block'
265     raise DataFileError, msg
266   elsif magic_in_file != MAGIC
267     msg = "Not an Avro data file: #{magic_in_file.inspect} doesn't match #{MAGIC.inspect}"
268     raise DataFileError, msg
269   end
270 
271   # read metadata
272   @meta = datum_reader.read_data(META_SCHEMA,
273                                  META_SCHEMA,
274                                  decoder)
275   # read sync marker
276   @sync_marker = reader.read(SYNC_SIZE)
277 end
skip_sync() click to toggle source

read the length of the sync marker; if it matches the sync marker, return true. Otherwise, seek back to where we started and return false

    # File lib/avro/data_file.rb
289 def skip_sync
290   proposed_sync_marker = reader.read(SYNC_SIZE)
291   if proposed_sync_marker != sync_marker
292     reader.seek(-SYNC_SIZE, 1)
293     false
294   else
295     true
296   end
297 end