class Avro::IO::DatumReader

Attributes

readers_schema[RW]
writers_schema[RW]

Public Class Methods

match_schemas(writers_schema, readers_schema) click to toggle source
    # File lib/avro/io.rb
223 def self.match_schemas(writers_schema, readers_schema)
224   Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema)
225 end
new(writers_schema=nil, readers_schema=nil) click to toggle source
    # File lib/avro/io.rb
229 def initialize(writers_schema=nil, readers_schema=nil)
230   @writers_schema = writers_schema
231   @readers_schema = readers_schema
232 end

Public Instance Methods

read(decoder) click to toggle source
    # File lib/avro/io.rb
234 def read(decoder)
235   self.readers_schema = writers_schema unless readers_schema
236   read_data(writers_schema, readers_schema, decoder)
237 end
read_array(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
296 def read_array(writers_schema, readers_schema, decoder)
297   read_items = []
298   block_count = decoder.read_long
299   while block_count != 0
300     if block_count < 0
301       block_count = -block_count
302       block_size = decoder.read_long
303     end
304     block_count.times do
305       read_items << read_data(writers_schema.items,
306                               readers_schema.items,
307                               decoder)
308     end
309     block_count = decoder.read_long
310   end
311 
312   read_items
313 end
read_data(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
239 def read_data(writers_schema, readers_schema, decoder)
240   # schema matching
241   unless self.class.match_schemas(writers_schema, readers_schema)
242     raise SchemaMatchException.new(writers_schema, readers_schema)
243   end
244 
245   # schema resolution: reader's schema is a union, writer's
246   # schema is not
247   if writers_schema.type_sym != :union && readers_schema.type_sym == :union
248     rs = readers_schema.schemas.find{|s|
249       self.class.match_schemas(writers_schema, s)
250     }
251     return read_data(writers_schema, rs, decoder) if rs
252     raise SchemaMatchException.new(writers_schema, readers_schema)
253   end
254 
255   # function dispatch for reading data based on type of writer's
256   # schema
257   datum = case writers_schema.type_sym
258   when :null;    decoder.read_null
259   when :boolean; decoder.read_boolean
260   when :string;  decoder.read_string
261   when :int;     decoder.read_int
262   when :long;    decoder.read_long
263   when :float;   decoder.read_float
264   when :double;  decoder.read_double
265   when :bytes;   decoder.read_bytes
266   when :fixed;   read_fixed(writers_schema, readers_schema, decoder)
267   when :enum;    read_enum(writers_schema, readers_schema, decoder)
268   when :array;   read_array(writers_schema, readers_schema, decoder)
269   when :map;     read_map(writers_schema, readers_schema, decoder)
270   when :union;   read_union(writers_schema, readers_schema, decoder)
271   when :record, :error, :request;  read_record(writers_schema, readers_schema, decoder)
272   else
273     raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
274   end
275 
276   readers_schema.type_adapter.decode(datum)
277 end
read_default_value(field_schema, default_value) click to toggle source
    # File lib/avro/io.rb
372 def read_default_value(field_schema, default_value)
373   if default_value == :no_default
374     raise AvroError, "Missing data for #{field_schema} with no default"
375   end
376 
377   # Basically a JSON Decoder?
378   case field_schema.type_sym
379   when :null
380     return nil
381   when :boolean
382     return default_value
383   when :int, :long
384     return Integer(default_value)
385   when :float, :double
386     return Float(default_value)
387   when :enum, :fixed, :string, :bytes
388     return default_value
389   when :array
390     read_array = []
391     default_value.each do |json_val|
392       item_val = read_default_value(field_schema.items, json_val)
393       read_array << item_val
394     end
395     return read_array
396   when :map
397     read_map = {}
398     default_value.each do |key, json_val|
399       map_val = read_default_value(field_schema.values, json_val)
400       read_map[key] = map_val
401     end
402     return read_map
403   when :union
404     return read_default_value(field_schema.schemas[0], default_value)
405   when :record, :error
406     read_record = {}
407     field_schema.fields.each do |field|
408       json_val = default_value[field.name]
409       json_val = field.default unless json_val
410       field_val = read_default_value(field.type, json_val)
411       read_record[field.name] = field_val
412     end
413     return read_record
414   else
415     fail_msg = "Unknown type: #{field_schema.type}"
416     raise AvroError, fail_msg
417   end
418 end
read_enum(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
283 def read_enum(writers_schema, readers_schema, decoder)
284   index_of_symbol = decoder.read_int
285   read_symbol = writers_schema.symbols[index_of_symbol]
286 
287   # TODO(jmhodges): figure out what unset means for resolution
288   # schema resolution
289   unless readers_schema.symbols.include?(read_symbol)
290     # 'unset' here
291   end
292 
293   read_symbol
294 end
read_fixed(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
279 def read_fixed(writers_schema, readers_schema, decoder)
280   decoder.read(writers_schema.size)
281 end
read_map(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
315 def read_map(writers_schema, readers_schema, decoder)
316   read_items = {}
317   block_count = decoder.read_long
318   while block_count != 0
319     if block_count < 0
320       block_count = -block_count
321       block_size = decoder.read_long
322     end
323     block_count.times do
324       key = decoder.read_string
325       read_items[key] = read_data(writers_schema.values,
326                                   readers_schema.values,
327                                   decoder)
328     end
329     block_count = decoder.read_long
330   end
331 
332   read_items
333 end
read_record(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
342 def read_record(writers_schema, readers_schema, decoder)
343   readers_fields_hash = readers_schema.fields_hash
344   read_record = {}
345   writers_schema.fields.each do |field|
346     if readers_field = readers_fields_hash[field.name]
347       field_val = read_data(field.type, readers_field.type, decoder)
348       read_record[field.name] = field_val
349     else
350       skip_data(field.type, decoder)
351     end
352   end
353 
354   # fill in the default values
355   if readers_fields_hash.size > read_record.size
356     writers_fields_hash = writers_schema.fields_hash
357     readers_fields_hash.each do |field_name, field|
358       unless writers_fields_hash.has_key? field_name
359         if field.default?
360           field_val = read_default_value(field.type, field.default)
361           read_record[field.name] = field_val
362         else
363           raise AvroError, "Missing data for #{field.type} with no default"
364         end
365       end
366     end
367   end
368 
369   read_record
370 end
read_union(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
335 def read_union(writers_schema, readers_schema, decoder)
336   index_of_schema = decoder.read_long
337   selected_writers_schema = writers_schema.schemas[index_of_schema]
338 
339   read_data(selected_writers_schema, readers_schema, decoder)
340 end
skip_array(writers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
468 def skip_array(writers_schema, decoder)
469   skip_blocks(decoder) { skip_data(writers_schema.items, decoder) }
470 end
skip_data(writers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
420 def skip_data(writers_schema, decoder)
421   case writers_schema.type_sym
422   when :null
423     decoder.skip_null
424   when :boolean
425     decoder.skip_boolean
426   when :string
427     decoder.skip_string
428   when :int
429     decoder.skip_int
430   when :long
431     decoder.skip_long
432   when :float
433     decoder.skip_float
434   when :double
435     decoder.skip_double
436   when :bytes
437     decoder.skip_bytes
438   when :fixed
439     skip_fixed(writers_schema, decoder)
440   when :enum
441     skip_enum(writers_schema, decoder)
442   when :array
443     skip_array(writers_schema, decoder)
444   when :map
445     skip_map(writers_schema, decoder)
446   when :union
447     skip_union(writers_schema, decoder)
448   when :record, :error, :request
449     skip_record(writers_schema, decoder)
450   else
451     raise AvroError, "Unknown schema type: #{writers_schema.type}"
452   end
453 end
skip_enum(writers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
459 def skip_enum(writers_schema, decoder)
460   decoder.skip_int
461 end
skip_fixed(writers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
455 def skip_fixed(writers_schema, decoder)
456   decoder.skip(writers_schema.size)
457 end
skip_map(writers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
472 def skip_map(writers_schema, decoder)
473   skip_blocks(decoder) {
474     decoder.skip_string
475     skip_data(writers_schema.values, decoder)
476   }
477 end
skip_record(writers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
479 def skip_record(writers_schema, decoder)
480   writers_schema.fields.each{|f| skip_data(f.type, decoder) }
481 end
skip_union(writers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
463 def skip_union(writers_schema, decoder)
464   index = decoder.read_long
465   skip_data(writers_schema.schemas[index], decoder)
466 end

Private Instance Methods

skip_blocks(decoder, &blk) click to toggle source
    # File lib/avro/io.rb
484 def skip_blocks(decoder, &blk)
485   block_count = decoder.read_long
486   while block_count != 0
487     if block_count < 0
488       decoder.skip(decoder.read_long)
489     else
490       block_count.times &blk
491     end
492     block_count = decoder.read_long
493   end
494 end