class Avro::IO::DatumReader
Attributes
readers_schema[RW]
writers_schema[RW]
Public Class Methods
match_schemas(writers_schema, readers_schema)
click to toggle source
# File lib/avro/io.rb 240 def self.match_schemas(writers_schema, readers_schema) 241 Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema) 242 end
new(writers_schema=nil, readers_schema=nil)
click to toggle source
# File lib/avro/io.rb 246 def initialize(writers_schema=nil, readers_schema=nil) 247 @writers_schema = writers_schema 248 @readers_schema = readers_schema 249 end
Public Instance Methods
read(decoder)
click to toggle source
# File lib/avro/io.rb 251 def read(decoder) 252 self.readers_schema = writers_schema unless readers_schema 253 read_data(writers_schema, readers_schema, decoder) 254 end
read_array(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 313 def read_array(writers_schema, readers_schema, decoder) 314 read_items = [] 315 block_count = decoder.read_long 316 while block_count != 0 317 if block_count < 0 318 block_count = -block_count 319 _block_size = decoder.read_long 320 end 321 block_count.times do 322 read_items << read_data(writers_schema.items, 323 readers_schema.items, 324 decoder) 325 end 326 block_count = decoder.read_long 327 end 328 329 read_items 330 end
read_data(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 256 def read_data(writers_schema, readers_schema, decoder) 257 # schema matching 258 unless self.class.match_schemas(writers_schema, readers_schema) 259 raise SchemaMatchException.new(writers_schema, readers_schema) 260 end 261 262 # schema resolution: reader's schema is a union, writer's 263 # schema is not 264 if writers_schema.type_sym != :union && readers_schema.type_sym == :union 265 rs = readers_schema.schemas.find{|s| 266 self.class.match_schemas(writers_schema, s) 267 } 268 return read_data(writers_schema, rs, decoder) if rs 269 raise SchemaMatchException.new(writers_schema, readers_schema) 270 end 271 272 # function dispatch for reading data based on type of writer's 273 # schema 274 datum = case writers_schema.type_sym 275 when :null; decoder.read_null 276 when :boolean; decoder.read_boolean 277 when :string; decoder.read_string 278 when :int; decoder.read_int 279 when :long; decoder.read_long 280 when :float; decoder.read_float 281 when :double; decoder.read_double 282 when :bytes; decoder.read_bytes 283 when :fixed; read_fixed(writers_schema, readers_schema, decoder) 284 when :enum; read_enum(writers_schema, readers_schema, decoder) 285 when :array; read_array(writers_schema, readers_schema, decoder) 286 when :map; read_map(writers_schema, readers_schema, decoder) 287 when :union; read_union(writers_schema, readers_schema, decoder) 288 when :record, :error, :request; read_record(writers_schema, readers_schema, decoder) 289 else 290 raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}" 291 end 292 293 readers_schema.type_adapter.decode(datum) 294 end
read_default_value(field_schema, default_value)
click to toggle source
# File lib/avro/io.rb 391 def read_default_value(field_schema, default_value) 392 # Basically a JSON Decoder? 393 datum = case field_schema.type_sym 394 when :null 395 nil 396 when :int, :long 397 Integer(default_value) 398 when :float, :double 399 Float(default_value) 400 when :boolean, :enum, :fixed, :string, :bytes 401 default_value 402 when :array 403 read_array = [] 404 default_value.each do |json_val| 405 item_val = read_default_value(field_schema.items, json_val) 406 read_array << item_val 407 end 408 read_array 409 when :map 410 read_map = {} 411 default_value.each do |key, json_val| 412 map_val = read_default_value(field_schema.values, json_val) 413 read_map[key] = map_val 414 end 415 read_map 416 when :union 417 read_default_value(field_schema.schemas[0], default_value) 418 when :record, :error 419 read_record = {} 420 field_schema.fields.each do |field| 421 json_val = default_value[field.name] 422 json_val = field.default unless json_val 423 field_val = read_default_value(field.type, json_val) 424 read_record[field.name] = field_val 425 end 426 read_record 427 else 428 fail_msg = "Unknown type: #{field_schema.type}" 429 raise AvroError, fail_msg 430 end 431 432 field_schema.type_adapter.decode(datum) 433 end
read_enum(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 300 def read_enum(writers_schema, readers_schema, decoder) 301 index_of_symbol = decoder.read_int 302 read_symbol = writers_schema.symbols[index_of_symbol] 303 304 if !readers_schema.symbols.include?(read_symbol) && readers_schema.default 305 read_symbol = readers_schema.default 306 end 307 308 # This implementation deviates from the spec by always returning 309 # a symbol. 310 read_symbol 311 end
read_fixed(writers_schema, _readers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 296 def read_fixed(writers_schema, _readers_schema, decoder) 297 decoder.read(writers_schema.size) 298 end
read_map(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 332 def read_map(writers_schema, readers_schema, decoder) 333 read_items = {} 334 block_count = decoder.read_long 335 while block_count != 0 336 if block_count < 0 337 block_count = -block_count 338 _block_size = decoder.read_long 339 end 340 block_count.times do 341 key = decoder.read_string 342 read_items[key] = read_data(writers_schema.values, 343 readers_schema.values, 344 decoder) 345 end 346 block_count = decoder.read_long 347 end 348 349 read_items 350 end
read_record(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 359 def read_record(writers_schema, readers_schema, decoder) 360 readers_fields_hash = readers_schema.fields_hash 361 read_record = {} 362 writers_schema.fields.each do |field| 363 readers_field = readers_fields_hash[field.name] 364 if readers_field 365 field_val = read_data(field.type, readers_field.type, decoder) 366 read_record[field.name] = field_val 367 elsif readers_schema.fields_by_alias.key?(field.name) 368 readers_field = readers_schema.fields_by_alias[field.name] 369 field_val = read_data(field.type, readers_field.type, decoder) 370 read_record[readers_field.name] = field_val 371 else 372 skip_data(field.type, decoder) 373 end 374 end 375 376 # fill in the default values 377 readers_fields_hash.each do |field_name, field| 378 next if read_record.key?(field_name) 379 380 if field.default? 381 field_val = read_default_value(field.type, field.default) 382 read_record[field.name] = field_val 383 else 384 raise AvroError, "Missing data for #{field.type} with no default" 385 end 386 end 387 388 read_record 389 end
read_union(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 352 def read_union(writers_schema, readers_schema, decoder) 353 index_of_schema = decoder.read_long 354 selected_writers_schema = writers_schema.schemas[index_of_schema] 355 356 read_data(selected_writers_schema, readers_schema, decoder) 357 end
skip_array(writers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 483 def skip_array(writers_schema, decoder) 484 skip_blocks(decoder) { skip_data(writers_schema.items, decoder) } 485 end
skip_data(writers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 435 def skip_data(writers_schema, decoder) 436 case writers_schema.type_sym 437 when :null 438 decoder.skip_null 439 when :boolean 440 decoder.skip_boolean 441 when :string 442 decoder.skip_string 443 when :int 444 decoder.skip_int 445 when :long 446 decoder.skip_long 447 when :float 448 decoder.skip_float 449 when :double 450 decoder.skip_double 451 when :bytes 452 decoder.skip_bytes 453 when :fixed 454 skip_fixed(writers_schema, decoder) 455 when :enum 456 skip_enum(writers_schema, decoder) 457 when :array 458 skip_array(writers_schema, decoder) 459 when :map 460 skip_map(writers_schema, decoder) 461 when :union 462 skip_union(writers_schema, decoder) 463 when :record, :error, :request 464 skip_record(writers_schema, decoder) 465 else 466 raise AvroError, "Unknown schema type: #{writers_schema.type}" 467 end 468 end
skip_enum(_writers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 474 def skip_enum(_writers_schema, decoder) 475 decoder.skip_int 476 end
skip_fixed(writers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 470 def skip_fixed(writers_schema, decoder) 471 decoder.skip(writers_schema.size) 472 end
skip_map(writers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 487 def skip_map(writers_schema, decoder) 488 skip_blocks(decoder) { 489 decoder.skip_string 490 skip_data(writers_schema.values, decoder) 491 } 492 end
skip_record(writers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 494 def skip_record(writers_schema, decoder) 495 writers_schema.fields.each{|f| skip_data(f.type, decoder) } 496 end
skip_union(writers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 478 def skip_union(writers_schema, decoder) 479 index = decoder.read_long 480 skip_data(writers_schema.schemas[index], decoder) 481 end
Private Instance Methods
skip_blocks(decoder, &blk)
click to toggle source
# File lib/avro/io.rb 499 def skip_blocks(decoder, &blk) 500 block_count = decoder.read_long 501 while block_count != 0 502 if block_count < 0 503 decoder.skip(decoder.read_long) 504 else 505 block_count.times(&blk) 506 end 507 block_count = decoder.read_long 508 end 509 end