class Avro::DataFile::Writer

Attributes

block_count[RW]
buffer_encoder[R]
buffer_writer[R]
codec[R]
datum_writer[R]
encoder[R]
meta[R]
sync_marker[R]
writer[R]

Public Class Methods

generate_sync_marker() click to toggle source
   # File lib/avro/data_file.rb
89 def self.generate_sync_marker
90   OpenSSL::Random.random_bytes(16)
91 end
new(writer, datum_writer, writers_schema=nil, codec=nil, meta={}) click to toggle source
    # File lib/avro/data_file.rb
 96 def initialize(writer, datum_writer, writers_schema=nil, codec=nil, meta={})
 97   # If writers_schema is not present, presume we're appending
 98   @writer = writer
 99   @encoder = IO::BinaryEncoder.new(@writer)
100   @datum_writer = datum_writer
101   @meta = meta
102   @buffer_writer = StringIO.new('', 'w')
103   @buffer_writer.set_encoding('BINARY') if @buffer_writer.respond_to?(:set_encoding)
104   @buffer_encoder = IO::BinaryEncoder.new(@buffer_writer)
105   @block_count = 0
106 
107   if writers_schema
108     @sync_marker = Writer.generate_sync_marker
109     @codec = DataFile.get_codec(codec)
110     @meta['avro.codec'] = @codec.codec_name.to_s
111     @meta['avro.schema'] = writers_schema.to_s
112     datum_writer.writers_schema = writers_schema
113     write_header
114   else
115     # open writer for reading to collect metadata
116     dfr = Reader.new(writer, Avro::IO::DatumReader.new)
117 
118     # FIXME(jmhodges): collect arbitrary metadata
119     # collect metadata
120     @sync_marker = dfr.sync_marker
121     @meta['avro.codec'] = dfr.meta['avro.codec']
122     @codec = DataFile.get_codec(meta['avro.codec'])
123 
124     # get schema used to write existing file
125     schema_from_file = dfr.meta['avro.schema']
126     @meta['avro.schema'] = schema_from_file
127     datum_writer.writers_schema = Schema.parse(schema_from_file)
128 
129     # seek to the end of the file and prepare for writing
130     writer.seek(0,2)
131   end
132 end

Public Instance Methods

<<(datum) click to toggle source

Append a datum to the file

    # File lib/avro/data_file.rb
135 def <<(datum)
136   datum_writer.write(datum, buffer_encoder)
137   self.block_count += 1
138 
139   # if the data to write is larger than the sync interval, write
140   # the block
141   if buffer_writer.tell >= SYNC_INTERVAL
142     write_block
143   end
144 end
close() click to toggle source
    # File lib/avro/data_file.rb
160 def close
161   flush
162   writer.close
163 end
flush() click to toggle source

Flush the current state of the file, including metadata

    # File lib/avro/data_file.rb
155 def flush
156   write_block
157   writer.flush
158 end
sync() click to toggle source

Return the current position as a value that may be passed to DataFileReader.seek(long). Forces the end of the current block, emitting a synchronization marker.

    # File lib/avro/data_file.rb
149 def sync
150   write_block
151   writer.tell
152 end

Private Instance Methods

write_block() click to toggle source

TODO(jmhodges): make a schema for blocks and use datum_writer TODO(jmhodges): do we really need the number of items in the block?

    # File lib/avro/data_file.rb
180 def write_block
181   if block_count > 0
182     # write number of items in block and block size in bytes
183     encoder.write_long(block_count)
184     to_write = codec.compress(buffer_writer.string)
185     encoder.write_long(to_write.respond_to?(:bytesize) ? to_write.bytesize : to_write.size)
186 
187     # write block contents
188     writer.write(to_write)
189 
190     # write sync marker
191     writer.write(sync_marker)
192 
193     # reset buffer
194     buffer_writer.truncate(0)
195     buffer_writer.rewind
196     self.block_count = 0
197   end
198 end
write_header() click to toggle source
    # File lib/avro/data_file.rb
167 def write_header
168   # write magic
169   writer.write(MAGIC)
170 
171   # write metadata
172   datum_writer.write_data(META_SCHEMA, meta, encoder)
173 
174   # write sync marker
175   writer.write(sync_marker)
176 end