class Avro::IO::DatumReader
Attributes
readers_schema[RW]
writers_schema[RW]
Public Class Methods
match_schemas(writers_schema, readers_schema)
click to toggle source
# File lib/avro/io.rb 223 def self.match_schemas(writers_schema, readers_schema) 224 Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema) 225 end
new(writers_schema=nil, readers_schema=nil)
click to toggle source
# File lib/avro/io.rb 229 def initialize(writers_schema=nil, readers_schema=nil) 230 @writers_schema = writers_schema 231 @readers_schema = readers_schema 232 end
Public Instance Methods
read(decoder)
click to toggle source
# File lib/avro/io.rb 234 def read(decoder) 235 self.readers_schema = writers_schema unless readers_schema 236 read_data(writers_schema, readers_schema, decoder) 237 end
read_array(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 296 def read_array(writers_schema, readers_schema, decoder) 297 read_items = [] 298 block_count = decoder.read_long 299 while block_count != 0 300 if block_count < 0 301 block_count = -block_count 302 block_size = decoder.read_long 303 end 304 block_count.times do 305 read_items << read_data(writers_schema.items, 306 readers_schema.items, 307 decoder) 308 end 309 block_count = decoder.read_long 310 end 311 312 read_items 313 end
read_data(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 239 def read_data(writers_schema, readers_schema, decoder) 240 # schema matching 241 unless self.class.match_schemas(writers_schema, readers_schema) 242 raise SchemaMatchException.new(writers_schema, readers_schema) 243 end 244 245 # schema resolution: reader's schema is a union, writer's 246 # schema is not 247 if writers_schema.type_sym != :union && readers_schema.type_sym == :union 248 rs = readers_schema.schemas.find{|s| 249 self.class.match_schemas(writers_schema, s) 250 } 251 return read_data(writers_schema, rs, decoder) if rs 252 raise SchemaMatchException.new(writers_schema, readers_schema) 253 end 254 255 # function dispatch for reading data based on type of writer's 256 # schema 257 datum = case writers_schema.type_sym 258 when :null; decoder.read_null 259 when :boolean; decoder.read_boolean 260 when :string; decoder.read_string 261 when :int; decoder.read_int 262 when :long; decoder.read_long 263 when :float; decoder.read_float 264 when :double; decoder.read_double 265 when :bytes; decoder.read_bytes 266 when :fixed; read_fixed(writers_schema, readers_schema, decoder) 267 when :enum; read_enum(writers_schema, readers_schema, decoder) 268 when :array; read_array(writers_schema, readers_schema, decoder) 269 when :map; read_map(writers_schema, readers_schema, decoder) 270 when :union; read_union(writers_schema, readers_schema, decoder) 271 when :record, :error, :request; read_record(writers_schema, readers_schema, decoder) 272 else 273 raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}" 274 end 275 276 readers_schema.type_adapter.decode(datum) 277 end
read_default_value(field_schema, default_value)
click to toggle source
# File lib/avro/io.rb 372 def read_default_value(field_schema, default_value) 373 if default_value == :no_default 374 raise AvroError, "Missing data for #{field_schema} with no default" 375 end 376 377 # Basically a JSON Decoder? 378 case field_schema.type_sym 379 when :null 380 return nil 381 when :boolean 382 return default_value 383 when :int, :long 384 return Integer(default_value) 385 when :float, :double 386 return Float(default_value) 387 when :enum, :fixed, :string, :bytes 388 return default_value 389 when :array 390 read_array = [] 391 default_value.each do |json_val| 392 item_val = read_default_value(field_schema.items, json_val) 393 read_array << item_val 394 end 395 return read_array 396 when :map 397 read_map = {} 398 default_value.each do |key, json_val| 399 map_val = read_default_value(field_schema.values, json_val) 400 read_map[key] = map_val 401 end 402 return read_map 403 when :union 404 return read_default_value(field_schema.schemas[0], default_value) 405 when :record, :error 406 read_record = {} 407 field_schema.fields.each do |field| 408 json_val = default_value[field.name] 409 json_val = field.default unless json_val 410 field_val = read_default_value(field.type, json_val) 411 read_record[field.name] = field_val 412 end 413 return read_record 414 else 415 fail_msg = "Unknown type: #{field_schema.type}" 416 raise AvroError, fail_msg 417 end 418 end
read_enum(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 283 def read_enum(writers_schema, readers_schema, decoder) 284 index_of_symbol = decoder.read_int 285 read_symbol = writers_schema.symbols[index_of_symbol] 286 287 # TODO(jmhodges): figure out what unset means for resolution 288 # schema resolution 289 unless readers_schema.symbols.include?(read_symbol) 290 # 'unset' here 291 end 292 293 read_symbol 294 end
read_fixed(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 279 def read_fixed(writers_schema, readers_schema, decoder) 280 decoder.read(writers_schema.size) 281 end
read_map(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 315 def read_map(writers_schema, readers_schema, decoder) 316 read_items = {} 317 block_count = decoder.read_long 318 while block_count != 0 319 if block_count < 0 320 block_count = -block_count 321 block_size = decoder.read_long 322 end 323 block_count.times do 324 key = decoder.read_string 325 read_items[key] = read_data(writers_schema.values, 326 readers_schema.values, 327 decoder) 328 end 329 block_count = decoder.read_long 330 end 331 332 read_items 333 end
read_record(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 342 def read_record(writers_schema, readers_schema, decoder) 343 readers_fields_hash = readers_schema.fields_hash 344 read_record = {} 345 writers_schema.fields.each do |field| 346 if readers_field = readers_fields_hash[field.name] 347 field_val = read_data(field.type, readers_field.type, decoder) 348 read_record[field.name] = field_val 349 else 350 skip_data(field.type, decoder) 351 end 352 end 353 354 # fill in the default values 355 if readers_fields_hash.size > read_record.size 356 writers_fields_hash = writers_schema.fields_hash 357 readers_fields_hash.each do |field_name, field| 358 unless writers_fields_hash.has_key? field_name 359 if field.default? 360 field_val = read_default_value(field.type, field.default) 361 read_record[field.name] = field_val 362 else 363 raise AvroError, "Missing data for #{field.type} with no default" 364 end 365 end 366 end 367 end 368 369 read_record 370 end
read_union(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 335 def read_union(writers_schema, readers_schema, decoder) 336 index_of_schema = decoder.read_long 337 selected_writers_schema = writers_schema.schemas[index_of_schema] 338 339 read_data(selected_writers_schema, readers_schema, decoder) 340 end
skip_array(writers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 468 def skip_array(writers_schema, decoder) 469 skip_blocks(decoder) { skip_data(writers_schema.items, decoder) } 470 end
skip_data(writers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 420 def skip_data(writers_schema, decoder) 421 case writers_schema.type_sym 422 when :null 423 decoder.skip_null 424 when :boolean 425 decoder.skip_boolean 426 when :string 427 decoder.skip_string 428 when :int 429 decoder.skip_int 430 when :long 431 decoder.skip_long 432 when :float 433 decoder.skip_float 434 when :double 435 decoder.skip_double 436 when :bytes 437 decoder.skip_bytes 438 when :fixed 439 skip_fixed(writers_schema, decoder) 440 when :enum 441 skip_enum(writers_schema, decoder) 442 when :array 443 skip_array(writers_schema, decoder) 444 when :map 445 skip_map(writers_schema, decoder) 446 when :union 447 skip_union(writers_schema, decoder) 448 when :record, :error, :request 449 skip_record(writers_schema, decoder) 450 else 451 raise AvroError, "Unknown schema type: #{writers_schema.type}" 452 end 453 end
skip_enum(writers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 459 def skip_enum(writers_schema, decoder) 460 decoder.skip_int 461 end
skip_fixed(writers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 455 def skip_fixed(writers_schema, decoder) 456 decoder.skip(writers_schema.size) 457 end
skip_map(writers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 472 def skip_map(writers_schema, decoder) 473 skip_blocks(decoder) { 474 decoder.skip_string 475 skip_data(writers_schema.values, decoder) 476 } 477 end
skip_record(writers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 479 def skip_record(writers_schema, decoder) 480 writers_schema.fields.each{|f| skip_data(f.type, decoder) } 481 end
skip_union(writers_schema, decoder)
click to toggle source
# File lib/avro/io.rb 463 def skip_union(writers_schema, decoder) 464 index = decoder.read_long 465 skip_data(writers_schema.schemas[index], decoder) 466 end
Private Instance Methods
skip_blocks(decoder, &blk)
click to toggle source
# File lib/avro/io.rb 484 def skip_blocks(decoder, &blk) 485 block_count = decoder.read_long 486 while block_count != 0 487 if block_count < 0 488 decoder.skip(decoder.read_long) 489 else 490 block_count.times &blk 491 end 492 block_count = decoder.read_long 493 end 494 end