class Tros::IO::DatumReader

Attributes

readers_schema[RW]
writers_schema[RW]

Public Class Methods

match_schemas(writers_schema, readers_schema) click to toggle source
    # File lib/tros/io.rb
218 def self.match_schemas(writers_schema, readers_schema)
219   w_type = writers_schema.type_sym
220   r_type = readers_schema.type_sym
221 
222   # This conditional is begging for some OO love.
223   if w_type == :union || r_type == :union
224     return true
225   end
226 
227   if w_type == r_type
228     return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type)
229 
230     case r_type
231     when :record
232       return writers_schema.fullname == readers_schema.fullname
233     when :error
234       return writers_schema.fullname == readers_schema.fullname
235     when :request
236       return true
237     when :fixed
238       return writers_schema.fullname == readers_schema.fullname &&
239              writers_schema.size == readers_schema.size
240     when :enum
241       return writers_schema.fullname == readers_schema.fullname
242     when :map
243       return writers_schema.values.type == readers_schema.values.type
244     when :array
245       return writers_schema.items.type == readers_schema.items.type
246     end
247   end
248 
249   # Handle schema promotion
250   if w_type == :int && [:long, :float, :double].include?(r_type)
251     return true
252   elsif w_type == :long && [:float, :double].include?(r_type)
253     return true
254   elsif w_type == :float && r_type == :double
255     return true
256   end
257 
258   return false
259 end
new(writers_schema=nil, readers_schema=nil) click to toggle source
    # File lib/tros/io.rb
263 def initialize(writers_schema=nil, readers_schema=nil)
264   @writers_schema = writers_schema
265   @readers_schema = readers_schema
266 end

Public Instance Methods

read(decoder) click to toggle source
    # File lib/tros/io.rb
268 def read(decoder)
269   self.readers_schema = writers_schema unless readers_schema
270   read_data(writers_schema, readers_schema, decoder)
271 end
read_array(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/tros/io.rb
328 def read_array(writers_schema, readers_schema, decoder)
329   read_items = []
330   block_count = decoder.read_long
331   while block_count != 0
332     if block_count < 0
333       block_count = -block_count
334       block_size = decoder.read_long
335     end
336     block_count.times do
337       read_items << read_data(writers_schema.items,
338                               readers_schema.items,
339                               decoder)
340     end
341     block_count = decoder.read_long
342   end
343 
344   read_items
345 end
read_data(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/tros/io.rb
273 def read_data(writers_schema, readers_schema, decoder)
274   # schema matching
275   unless self.class.match_schemas(writers_schema, readers_schema)
276     raise SchemaMatchException.new(writers_schema, readers_schema)
277   end
278 
279   # schema resolution: reader's schema is a union, writer's
280   # schema is not
281   if writers_schema.type_sym != :union && readers_schema.type_sym == :union
282     rs = readers_schema.schemas.find{|s|
283       self.class.match_schemas(writers_schema, s)
284     }
285     return read_data(writers_schema, rs, decoder) if rs
286     raise SchemaMatchException.new(writers_schema, readers_schema)
287   end
288 
289   # function dispatch for reading data based on type of writer's
290   # schema
291   case writers_schema.type_sym
292   when :null;    decoder.read_null
293   when :boolean; decoder.read_boolean
294   when :string;  decoder.read_string
295   when :int;     decoder.read_int
296   when :long;    decoder.read_long
297   when :float;   decoder.read_float
298   when :double;  decoder.read_double
299   when :bytes;   decoder.read_bytes
300   when :fixed;   read_fixed(writers_schema, readers_schema, decoder)
301   when :enum;    read_enum(writers_schema, readers_schema, decoder)
302   when :array;   read_array(writers_schema, readers_schema, decoder)
303   when :map;     read_map(writers_schema, readers_schema, decoder)
304   when :union;   read_union(writers_schema, readers_schema, decoder)
305   when :record, :error, :request;  read_record(writers_schema, readers_schema, decoder)
306   else
307     raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
308   end
309 end
read_default_value(field_schema, default_value) click to toggle source
    # File lib/tros/io.rb
404 def read_default_value(field_schema, default_value)
405   # Basically a JSON Decoder?
406   case field_schema.type_sym
407   when :null
408     return nil
409   when :boolean
410     return default_value
411   when :int, :long
412     return Integer(default_value)
413   when :float, :double
414     return Float(default_value)
415   when :enum, :fixed, :string, :bytes
416     return default_value
417   when :array
418     read_array = []
419     default_value.each do |json_val|
420       item_val = read_default_value(field_schema.items, json_val)
421       read_array << item_val
422     end
423     return read_array
424   when :map
425     read_map = {}
426     default_value.each do |key, json_val|
427       map_val = read_default_value(field_schema.values, json_val)
428       read_map[key] = map_val
429     end
430     return read_map
431   when :union
432     return read_default_value(field_schema.schemas[0], default_value)
433   when :record, :error
434     read_record = {}
435     field_schema.fields.each do |field|
436       json_val = default_value[field.name]
437       json_val = field.default unless json_val
438       field_val = read_default_value(field.type, json_val)
439       read_record[field.name] = field_val
440     end
441     return read_record
442   else
443     fail_msg = "Unknown type: #{field_schema.type}"
444     raise AvroError, fail_msg
445   end
446 end
read_enum(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/tros/io.rb
315 def read_enum(writers_schema, readers_schema, decoder)
316   index_of_symbol = decoder.read_int
317   read_symbol = writers_schema.symbols[index_of_symbol]
318 
319   # TODO(jmhodges): figure out what unset means for resolution
320   # schema resolution
321   unless readers_schema.symbols.include?(read_symbol)
322     # 'unset' here
323   end
324 
325   read_symbol
326 end
read_fixed(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/tros/io.rb
311 def read_fixed(writers_schema, readers_schema, decoder)
312   decoder.read(writers_schema.size)
313 end
read_map(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/tros/io.rb
347 def read_map(writers_schema, readers_schema, decoder)
348   read_items = {}
349   block_count = decoder.read_long
350   while block_count != 0
351     if block_count < 0
352       block_count = -block_count
353       block_size = decoder.read_long
354     end
355     block_count.times do
356       key = decoder.read_string
357       read_items[key] = read_data(writers_schema.values,
358                                   readers_schema.values,
359                                   decoder)
360     end
361     block_count = decoder.read_long
362   end
363 
364   read_items
365 end
read_record(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/tros/io.rb
374 def read_record(writers_schema, readers_schema, decoder)
375   readers_fields_hash = readers_schema.fields_hash
376   read_record = {}
377   writers_schema.fields.each do |field|
378     if readers_field = readers_fields_hash[field.name]
379       field_val = read_data(field.type, readers_field.type, decoder)
380       read_record[field.name] = field_val
381     else
382       skip_data(field.type, decoder)
383     end
384   end
385 
386   # fill in the default values
387   if readers_fields_hash.size > read_record.size
388     writers_fields_hash = writers_schema.fields_hash
389     readers_fields_hash.each do |field_name, field|
390       unless writers_fields_hash.has_key? field_name
391         if !field.default.nil?
392           field_val = read_default_value(field.type, field.default)
393           read_record[field.name] = field_val
394         else
395           # FIXME(jmhodges) another 'unset' here
396         end
397       end
398     end
399   end
400 
401   read_record
402 end
read_union(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/tros/io.rb
367 def read_union(writers_schema, readers_schema, decoder)
368   index_of_schema = decoder.read_long
369   selected_writers_schema = writers_schema.schemas[index_of_schema]
370 
371   read_data(selected_writers_schema, readers_schema, decoder)
372 end
skip_array(writers_schema, decoder) click to toggle source
    # File lib/tros/io.rb
496 def skip_array(writers_schema, decoder)
497   skip_blocks(decoder) { skip_data(writers_schema.items, decoder) }
498 end
skip_data(writers_schema, decoder) click to toggle source
    # File lib/tros/io.rb
448 def skip_data(writers_schema, decoder)
449   case writers_schema.type_sym
450   when :null
451     decoder.skip_null
452   when :boolean
453     decoder.skip_boolean
454   when :string
455     decoder.skip_string
456   when :int
457     decoder.skip_int
458   when :long
459     decoder.skip_long
460   when :float
461     decoder.skip_float
462   when :double
463     decoder.skip_double
464   when :bytes
465     decoder.skip_bytes
466   when :fixed
467     skip_fixed(writers_schema, decoder)
468   when :enum
469     skip_enum(writers_schema, decoder)
470   when :array
471     skip_array(writers_schema, decoder)
472   when :map
473     skip_map(writers_schema, decoder)
474   when :union
475     skip_union(writers_schema, decoder)
476   when :record, :error, :request
477     skip_record(writers_schema, decoder)
478   else
479     raise AvroError, "Unknown schema type: #{writers_schema.type}"
480   end
481 end
skip_enum(writers_schema, decoder) click to toggle source
    # File lib/tros/io.rb
487 def skip_enum(writers_schema, decoder)
488   decoder.skip_int
489 end
skip_fixed(writers_schema, decoder) click to toggle source
    # File lib/tros/io.rb
483 def skip_fixed(writers_schema, decoder)
484   decoder.skip(writers_schema.size)
485 end
skip_map(writers_schema, decoder) click to toggle source
    # File lib/tros/io.rb
500 def skip_map(writers_schema, decoder)
501   skip_blocks(decoder) {
502     decoder.skip_string
503     skip_data(writers_schema.values, decoder)
504   }
505 end
skip_record(writers_schema, decoder) click to toggle source
    # File lib/tros/io.rb
507 def skip_record(writers_schema, decoder)
508   writers_schema.fields.each{|f| skip_data(f.type, decoder) }
509 end
skip_union(writers_schema, decoder) click to toggle source
    # File lib/tros/io.rb
491 def skip_union(writers_schema, decoder)
492   index = decoder.read_long
493   skip_data(writers_schema.schemas[index], decoder)
494 end

Private Instance Methods

skip_blocks(decoder, &blk) click to toggle source
    # File lib/tros/io.rb
512 def skip_blocks(decoder, &blk)
513   block_count = decoder.read_long
514   while block_count != 0
515     if block_count < 0
516       decoder.skip(decoder.read_long)
517     else
518       block_count.times &blk
519     end
520     block_count = decoder.read_long
521   end
522 end