class Avro::DataFile::Writer
Attributes
block_count[RW]
buffer_encoder[R]
buffer_writer[R]
codec[R]
datum_writer[R]
encoder[R]
meta[R]
sync_marker[R]
writer[R]
Public Class Methods
generate_sync_marker()
click to toggle source
# File lib/avro/data_file.rb 89 def self.generate_sync_marker 90 OpenSSL::Random.random_bytes(16) 91 end
new(writer, datum_writer, writers_schema=nil, codec=nil, meta={})
click to toggle source
# File lib/avro/data_file.rb 96 def initialize(writer, datum_writer, writers_schema=nil, codec=nil, meta={}) 97 # If writers_schema is not present, presume we're appending 98 @writer = writer 99 @encoder = IO::BinaryEncoder.new(@writer) 100 @datum_writer = datum_writer 101 @meta = meta 102 @buffer_writer = StringIO.new('', 'w') 103 @buffer_writer.set_encoding('BINARY') if @buffer_writer.respond_to?(:set_encoding) 104 @buffer_encoder = IO::BinaryEncoder.new(@buffer_writer) 105 @block_count = 0 106 107 if writers_schema 108 @sync_marker = Writer.generate_sync_marker 109 @codec = DataFile.get_codec(codec) 110 @meta['avro.codec'] = @codec.codec_name.to_s 111 @meta['avro.schema'] = writers_schema.to_s 112 datum_writer.writers_schema = writers_schema 113 write_header 114 else 115 # open writer for reading to collect metadata 116 dfr = Reader.new(writer, Avro::IO::DatumReader.new) 117 118 # FIXME(jmhodges): collect arbitrary metadata 119 # collect metadata 120 @sync_marker = dfr.sync_marker 121 @meta['avro.codec'] = dfr.meta['avro.codec'] 122 @codec = DataFile.get_codec(meta['avro.codec']) 123 124 # get schema used to write existing file 125 schema_from_file = dfr.meta['avro.schema'] 126 @meta['avro.schema'] = schema_from_file 127 datum_writer.writers_schema = Schema.parse(schema_from_file) 128 129 # seek to the end of the file and prepare for writing 130 writer.seek(0,2) 131 end 132 end
Public Instance Methods
<<(datum)
click to toggle source
Append a datum to the file
# File lib/avro/data_file.rb 135 def <<(datum) 136 datum_writer.write(datum, buffer_encoder) 137 self.block_count += 1 138 139 # if the data to write is larger than the sync interval, write 140 # the block 141 if buffer_writer.tell >= SYNC_INTERVAL 142 write_block 143 end 144 end
close()
click to toggle source
# File lib/avro/data_file.rb 160 def close 161 flush 162 writer.close 163 end
flush()
click to toggle source
Flush the current state of the file, including metadata
# File lib/avro/data_file.rb 155 def flush 156 write_block 157 writer.flush 158 end
sync()
click to toggle source
Return the current position as a value that may be passed to DataFileReader.seek(long). Forces the end of the current block, emitting a synchronization marker.
# File lib/avro/data_file.rb 149 def sync 150 write_block 151 writer.tell 152 end
Private Instance Methods
write_block()
click to toggle source
TODO(jmhodges): make a schema for blocks and use datum_writer
TODO(jmhodges): do we really need the number of items in the block?
# File lib/avro/data_file.rb 180 def write_block 181 if block_count > 0 182 # write number of items in block and block size in bytes 183 encoder.write_long(block_count) 184 to_write = codec.compress(buffer_writer.string) 185 encoder.write_long(to_write.respond_to?(:bytesize) ? to_write.bytesize : to_write.size) 186 187 # write block contents 188 writer.write(to_write) 189 190 # write sync marker 191 writer.write(sync_marker) 192 193 # reset buffer 194 buffer_writer.truncate(0) 195 buffer_writer.rewind 196 self.block_count = 0 197 end 198 end
write_header()
click to toggle source
# File lib/avro/data_file.rb 167 def write_header 168 # write magic 169 writer.write(MAGIC) 170 171 # write metadata 172 datum_writer.write_data(META_SCHEMA, meta, encoder) 173 174 # write sync marker 175 writer.write(sync_marker) 176 end