class Tros::DataFile::Writer
Attributes
block_count[RW]
buffer_encoder[R]
buffer_writer[R]
codec[R]
datum_writer[R]
encoder[R]
meta[R]
sync_marker[R]
writer[R]
Public Class Methods
generate_sync_marker()
click to toggle source
# File lib/tros/data_file.rb 88 def self.generate_sync_marker 89 OpenSSL::Random.random_bytes(16) 90 end
new(writer, datum_writer, writers_schema=nil, codec=nil)
click to toggle source
# File lib/tros/data_file.rb 95 def initialize(writer, datum_writer, writers_schema=nil, codec=nil) 96 # If writers_schema is not present, presume we're appending 97 @writer = writer 98 @encoder = IO::BinaryEncoder.new(@writer) 99 @datum_writer = datum_writer 100 @buffer_writer = StringIO.new('', 'w') 101 @buffer_encoder = IO::BinaryEncoder.new(@buffer_writer) 102 @block_count = 0 103 104 @meta = {} 105 106 if writers_schema 107 @sync_marker = Writer.generate_sync_marker 108 @codec = DataFile.get_codec(codec) 109 meta['tros.codec'] = @codec.codec_name.to_s 110 meta['tros.schema'] = writers_schema.to_s 111 datum_writer.writers_schema = writers_schema 112 write_header 113 else 114 # open writer for reading to collect metadata 115 dfr = Reader.new(writer, Tros::IO::DatumReader.new) 116 117 # FIXME(jmhodges): collect arbitrary metadata 118 # collect metadata 119 @sync_marker = dfr.sync_marker 120 meta['tros.codec'] = dfr.meta['tros.codec'] 121 @codec = DataFile.get_codec(meta['tros.codec']) 122 123 # get schema used to write existing file 124 schema_from_file = dfr.meta['tros.schema'] 125 meta['tros.schema'] = schema_from_file 126 datum_writer.writers_schema = Schema.parse(schema_from_file) 127 128 # seek to the end of the file and prepare for writing 129 writer.seek(0,2) 130 end 131 end
Public Instance Methods
<<(datum)
click to toggle source
Append a datum to the file
# File lib/tros/data_file.rb 134 def <<(datum) 135 datum_writer.write(datum, buffer_encoder) 136 self.block_count += 1 137 138 # if the data to write is larger than the sync interval, write 139 # the block 140 if buffer_writer.tell >= SYNC_INTERVAL 141 write_block 142 end 143 end
close()
click to toggle source
# File lib/tros/data_file.rb 159 def close 160 flush 161 writer.close 162 end
flush()
click to toggle source
Flush the current state of the file, including metadata
# File lib/tros/data_file.rb 154 def flush 155 write_block 156 writer.flush 157 end
sync()
click to toggle source
Return the current position as a value that may be passed to DataFileReader.seek(long). Forces the end of the current block, emitting a synchronization marker.
# File lib/tros/data_file.rb 148 def sync 149 write_block 150 writer.tell 151 end
Private Instance Methods
write_block()
click to toggle source
TODO(jmhodges): make a schema for blocks and use datum_writer
TODO(jmhodges): do we really need the number of items in the block?
# File lib/tros/data_file.rb 179 def write_block 180 if block_count > 0 181 # write number of items in block and block size in bytes 182 encoder.write_long(block_count) 183 to_write = codec.compress(buffer_writer.string) 184 encoder.write_long(to_write.bytesize) 185 186 # write block contents 187 writer.write(to_write) 188 189 # write sync marker 190 writer.write(sync_marker) 191 192 # reset buffer 193 buffer_writer.truncate(0) 194 buffer_writer.rewind 195 self.block_count = 0 196 end 197 end
write_header()
click to toggle source
# File lib/tros/data_file.rb 166 def write_header 167 # write magic 168 writer.write(MAGIC) 169 170 # write metadata 171 datum_writer.write_data(META_SCHEMA, meta, encoder) 172 173 # write sync marker 174 writer.write(sync_marker) 175 end