class Tros::DataFile::Writer

Attributes

block_count[RW]
buffer_encoder[R]
buffer_writer[R]
codec[R]
datum_writer[R]
encoder[R]
meta[R]
sync_marker[R]
writer[R]

Public Class Methods

generate_sync_marker() click to toggle source
   # File lib/tros/data_file.rb
88 def self.generate_sync_marker
89   OpenSSL::Random.random_bytes(16)
90 end
new(writer, datum_writer, writers_schema=nil, codec=nil) click to toggle source
    # File lib/tros/data_file.rb
 95 def initialize(writer, datum_writer, writers_schema=nil, codec=nil)
 96   # If writers_schema is not present, presume we're appending
 97   @writer = writer
 98   @encoder = IO::BinaryEncoder.new(@writer)
 99   @datum_writer = datum_writer
100   @buffer_writer = StringIO.new('', 'w')
101   @buffer_encoder = IO::BinaryEncoder.new(@buffer_writer)
102   @block_count = 0
103 
104   @meta = {}
105 
106   if writers_schema
107     @sync_marker = Writer.generate_sync_marker
108     @codec = DataFile.get_codec(codec)
109     meta['tros.codec'] = @codec.codec_name.to_s
110     meta['tros.schema'] = writers_schema.to_s
111     datum_writer.writers_schema = writers_schema
112     write_header
113   else
114     # open writer for reading to collect metadata
115     dfr = Reader.new(writer, Tros::IO::DatumReader.new)
116 
117     # FIXME(jmhodges): collect arbitrary metadata
118     # collect metadata
119     @sync_marker = dfr.sync_marker
120     meta['tros.codec'] = dfr.meta['tros.codec']
121     @codec = DataFile.get_codec(meta['tros.codec'])
122 
123     # get schema used to write existing file
124     schema_from_file = dfr.meta['tros.schema']
125     meta['tros.schema'] = schema_from_file
126     datum_writer.writers_schema = Schema.parse(schema_from_file)
127 
128     # seek to the end of the file and prepare for writing
129     writer.seek(0,2)
130   end
131 end

Public Instance Methods

<<(datum) click to toggle source

Append a datum to the file

    # File lib/tros/data_file.rb
134 def <<(datum)
135   datum_writer.write(datum, buffer_encoder)
136   self.block_count += 1
137 
138   # if the data to write is larger than the sync interval, write
139   # the block
140   if buffer_writer.tell >= SYNC_INTERVAL
141     write_block
142   end
143 end
close() click to toggle source
    # File lib/tros/data_file.rb
159 def close
160   flush
161   writer.close
162 end
flush() click to toggle source

Flush the current state of the file, including metadata

    # File lib/tros/data_file.rb
154 def flush
155   write_block
156   writer.flush
157 end
sync() click to toggle source

Return the current position as a value that may be passed to DataFileReader.seek(long). Forces the end of the current block, emitting a synchronization marker.

    # File lib/tros/data_file.rb
148 def sync
149   write_block
150   writer.tell
151 end

Private Instance Methods

write_block() click to toggle source

TODO(jmhodges): make a schema for blocks and use datum_writer TODO(jmhodges): do we really need the number of items in the block?

    # File lib/tros/data_file.rb
179 def write_block
180   if block_count > 0
181     # write number of items in block and block size in bytes
182     encoder.write_long(block_count)
183     to_write = codec.compress(buffer_writer.string)
184     encoder.write_long(to_write.bytesize)
185 
186     # write block contents
187     writer.write(to_write)
188 
189     # write sync marker
190     writer.write(sync_marker)
191 
192     # reset buffer
193     buffer_writer.truncate(0)
194     buffer_writer.rewind
195     self.block_count = 0
196   end
197 end
write_header() click to toggle source
    # File lib/tros/data_file.rb
166 def write_header
167   # write magic
168   writer.write(MAGIC)
169 
170   # write metadata
171   datum_writer.write_data(META_SCHEMA, meta, encoder)
172 
173   # write sync marker
174   writer.write(sync_marker)
175 end