class Avro::DataFile::Reader
Read files written by DataFileWriter
Attributes
block_count[RW]
block_decoder[R]
The binary decoder for the contents of a block (after codec decompression)
codec[R]
datum_reader[R]
decoder[R]
The reader and binary decoder for the raw file stream
file_length[R]
meta[R]
reader[R]
The reader and binary decoder for the raw file stream
sync_marker[R]
Public Class Methods
new(reader, datum_reader)
click to toggle source
# File lib/avro/data_file.rb 214 def initialize(reader, datum_reader) 215 @reader = reader 216 @decoder = IO::BinaryDecoder.new(reader) 217 @datum_reader = datum_reader 218 219 # read the header: magic, meta, sync 220 read_header 221 222 @codec = DataFile.get_codec(meta['avro.codec']) 223 224 # get ready to read 225 @block_count = 0 226 datum_reader.writers_schema = Schema.parse meta['avro.schema'] 227 end
Public Instance Methods
close()
click to toggle source
# File lib/avro/data_file.rb 252 def close 253 reader.close 254 end
each() { |datum| ... }
click to toggle source
Iterates through each datum in this file TODO(jmhodges): handle block of length zero
# File lib/avro/data_file.rb 231 def each 232 loop do 233 if block_count == 0 234 case 235 when eof?; break 236 when skip_sync 237 break if eof? 238 read_block_header 239 else 240 read_block_header 241 end 242 end 243 244 datum = datum_reader.read(block_decoder) 245 self.block_count -= 1 246 yield(datum) 247 end 248 end
eof?()
click to toggle source
# File lib/avro/data_file.rb 250 def eof?; reader.eof?; end
Private Instance Methods
read_block_header()
click to toggle source
# File lib/avro/data_file.rb 279 def read_block_header 280 self.block_count = decoder.read_long 281 block_bytes = decoder.read_long 282 data = codec.decompress(reader.read(block_bytes)) 283 @block_decoder = IO::BinaryDecoder.new(StringIO.new(data)) 284 end
read_header()
click to toggle source
# File lib/avro/data_file.rb 257 def read_header 258 # seek to the beginning of the file to get magic block 259 reader.seek(0, 0) 260 261 # check magic number 262 magic_in_file = reader.read(MAGIC_SIZE) 263 if magic_in_file.size < MAGIC_SIZE 264 msg = 'Not an Avro data file: shorter than the Avro magic block' 265 raise DataFileError, msg 266 elsif magic_in_file != MAGIC 267 msg = "Not an Avro data file: #{magic_in_file.inspect} doesn't match #{MAGIC.inspect}" 268 raise DataFileError, msg 269 end 270 271 # read metadata 272 @meta = datum_reader.read_data(META_SCHEMA, 273 META_SCHEMA, 274 decoder) 275 # read sync marker 276 @sync_marker = reader.read(SYNC_SIZE) 277 end
skip_sync()
click to toggle source
read the length of the sync marker; if it matches the sync marker, return true. Otherwise, seek back to where we started and return false
# File lib/avro/data_file.rb 289 def skip_sync 290 proposed_sync_marker = reader.read(SYNC_SIZE) 291 if proposed_sync_marker != sync_marker 292 reader.seek(-SYNC_SIZE, 1) 293 false 294 else 295 true 296 end 297 end