class Avro::DataFile::Writer
Attributes
block_count[RW]
buffer_encoder[R]
buffer_writer[R]
codec[R]
datum_writer[R]
encoder[R]
meta[R]
sync_marker[R]
writer[R]
Public Class Methods
generate_sync_marker()
click to toggle source
# File lib/avro/data_file.rb 90 def self.generate_sync_marker 91 OpenSSL::Random.random_bytes(16) 92 end
new(writer, datum_writer, writers_schema=nil, codec=nil, meta={})
click to toggle source
# File lib/avro/data_file.rb 97 def initialize(writer, datum_writer, writers_schema=nil, codec=nil, meta={}) 98 # If writers_schema is not present, presume we're appending 99 @writer = writer 100 @encoder = IO::BinaryEncoder.new(@writer) 101 @datum_writer = datum_writer 102 @meta = meta 103 @buffer_writer = StringIO.new(+'', 'w') 104 @buffer_writer.set_encoding('BINARY') if @buffer_writer.respond_to?(:set_encoding) 105 @buffer_encoder = IO::BinaryEncoder.new(@buffer_writer) 106 @block_count = 0 107 108 if writers_schema 109 @sync_marker = Writer.generate_sync_marker 110 @codec = DataFile.get_codec(codec) 111 @meta['avro.codec'] = @codec.codec_name.to_s 112 @meta['avro.schema'] = writers_schema.to_s 113 datum_writer.writers_schema = writers_schema 114 write_header 115 else 116 # open writer for reading to collect metadata 117 dfr = Reader.new(writer, Avro::IO::DatumReader.new) 118 119 # FIXME(jmhodges): collect arbitrary metadata 120 # collect metadata 121 @sync_marker = dfr.sync_marker 122 @meta['avro.codec'] = dfr.meta['avro.codec'] 123 @codec = DataFile.get_codec(meta['avro.codec']) 124 125 # get schema used to write existing file 126 schema_from_file = dfr.meta['avro.schema'] 127 @meta['avro.schema'] = schema_from_file 128 datum_writer.writers_schema = Schema.parse(schema_from_file) 129 130 # seek to the end of the file and prepare for writing 131 writer.seek(0,2) 132 end 133 end
Public Instance Methods
<<(datum)
click to toggle source
Append a datum to the file
# File lib/avro/data_file.rb 136 def <<(datum) 137 datum_writer.write(datum, buffer_encoder) 138 self.block_count += 1 139 140 # if the data to write is larger than the sync interval, write 141 # the block 142 if buffer_writer.tell >= SYNC_INTERVAL 143 write_block 144 end 145 end
close()
click to toggle source
# File lib/avro/data_file.rb 161 def close 162 flush 163 writer.close 164 end
flush()
click to toggle source
Flush the current state of the file, including metadata
# File lib/avro/data_file.rb 156 def flush 157 write_block 158 writer.flush 159 end
sync()
click to toggle source
Return the current position as a value that may be passed to DataFileReader.seek(long). Forces the end of the current block, emitting a synchronization marker.
# File lib/avro/data_file.rb 150 def sync 151 write_block 152 writer.tell 153 end
Private Instance Methods
write_block()
click to toggle source
TODO(jmhodges): make a schema for blocks and use datum_writer
TODO(jmhodges): do we really need the number of items in the block?
# File lib/avro/data_file.rb 181 def write_block 182 if block_count > 0 183 # write number of items in block and block size in bytes 184 encoder.write_long(block_count) 185 to_write = codec.compress(buffer_writer.string) 186 encoder.write_long(to_write.respond_to?(:bytesize) ? to_write.bytesize : to_write.size) 187 188 # write block contents 189 writer.write(to_write) 190 191 # write sync marker 192 writer.write(sync_marker) 193 194 # reset buffer 195 buffer_writer.truncate(0) 196 buffer_writer.rewind 197 self.block_count = 0 198 end 199 end
write_header()
click to toggle source
# File lib/avro/data_file.rb 168 def write_header 169 # write magic 170 writer.write(MAGIC) 171 172 # write metadata 173 datum_writer.write_data(META_SCHEMA, meta, encoder) 174 175 # write sync marker 176 writer.write(sync_marker) 177 end