class Avro::DataFile::Reader
Read files written by DataFileWriter
Attributes
block_count[RW]
block_decoder[R]
The binary decoder for the contents of a block (after codec decompression)
codec[R]
datum_reader[R]
decoder[R]
The reader and binary decoder for the raw file stream
file_length[R]
meta[R]
reader[R]
The reader and binary decoder for the raw file stream
sync_marker[R]
Public Class Methods
new(reader, datum_reader)
click to toggle source
# File lib/avro/data_file.rb 215 def initialize(reader, datum_reader) 216 @reader = reader 217 @decoder = IO::BinaryDecoder.new(reader) 218 @datum_reader = datum_reader 219 220 # read the header: magic, meta, sync 221 read_header 222 223 @codec = DataFile.get_codec(meta['avro.codec']) 224 225 # get ready to read 226 @block_count = 0 227 datum_reader.writers_schema = Schema.parse meta['avro.schema'] 228 end
Public Instance Methods
close()
click to toggle source
# File lib/avro/data_file.rb 253 def close 254 reader.close 255 end
each() { |datum| ... }
click to toggle source
Iterates through each datum in this file TODO(jmhodges): handle block of length zero
# File lib/avro/data_file.rb 232 def each 233 loop do 234 if block_count == 0 235 case 236 when eof?; break 237 when skip_sync 238 break if eof? 239 read_block_header 240 else 241 read_block_header 242 end 243 end 244 245 datum = datum_reader.read(block_decoder) 246 self.block_count -= 1 247 yield(datum) 248 end 249 end
eof?()
click to toggle source
# File lib/avro/data_file.rb 251 def eof?; reader.eof?; end
Private Instance Methods
read_block_header()
click to toggle source
# File lib/avro/data_file.rb 280 def read_block_header 281 self.block_count = decoder.read_long 282 block_bytes = decoder.read_long 283 data = codec.decompress(reader.read(block_bytes)) 284 @block_decoder = IO::BinaryDecoder.new(StringIO.new(data)) 285 end
read_header()
click to toggle source
# File lib/avro/data_file.rb 258 def read_header 259 # seek to the beginning of the file to get magic block 260 reader.seek(0, 0) 261 262 # check magic number 263 magic_in_file = reader.read(MAGIC_SIZE) 264 if magic_in_file.size < MAGIC_SIZE 265 msg = 'Not an Avro data file: shorter than the Avro magic block' 266 raise DataFileError, msg 267 elsif magic_in_file != MAGIC 268 msg = "Not an Avro data file: #{magic_in_file.inspect} doesn't match #{MAGIC.inspect}" 269 raise DataFileError, msg 270 end 271 272 # read metadata 273 @meta = datum_reader.read_data(META_SCHEMA, 274 META_SCHEMA, 275 decoder) 276 # read sync marker 277 @sync_marker = reader.read(SYNC_SIZE) 278 end
skip_sync()
click to toggle source
read the length of the sync marker; if it matches the sync marker, return true. Otherwise, seek back to where we started and return false
# File lib/avro/data_file.rb 290 def skip_sync 291 proposed_sync_marker = reader.read(SYNC_SIZE) 292 if proposed_sync_marker != sync_marker 293 reader.seek(-SYNC_SIZE, 1) 294 false 295 else 296 true 297 end 298 end