class Tros::IO::DatumReader
Attributes
readers_schema[RW]
writers_schema[RW]
Public Class Methods
match_schemas(writers_schema, readers_schema)
click to toggle source
# File lib/tros/io.rb 218 def self.match_schemas(writers_schema, readers_schema) 219 w_type = writers_schema.type_sym 220 r_type = readers_schema.type_sym 221 222 # This conditional is begging for some OO love. 223 if w_type == :union || r_type == :union 224 return true 225 end 226 227 if w_type == r_type 228 return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type) 229 230 case r_type 231 when :record 232 return writers_schema.fullname == readers_schema.fullname 233 when :error 234 return writers_schema.fullname == readers_schema.fullname 235 when :request 236 return true 237 when :fixed 238 return writers_schema.fullname == readers_schema.fullname && 239 writers_schema.size == readers_schema.size 240 when :enum 241 return writers_schema.fullname == readers_schema.fullname 242 when :map 243 return writers_schema.values.type == readers_schema.values.type 244 when :array 245 return writers_schema.items.type == readers_schema.items.type 246 end 247 end 248 249 # Handle schema promotion 250 if w_type == :int && [:long, :float, :double].include?(r_type) 251 return true 252 elsif w_type == :long && [:float, :double].include?(r_type) 253 return true 254 elsif w_type == :float && r_type == :double 255 return true 256 end 257 258 return false 259 end
new(writers_schema=nil, readers_schema=nil)
click to toggle source
# File lib/tros/io.rb 263 def initialize(writers_schema=nil, readers_schema=nil) 264 @writers_schema = writers_schema 265 @readers_schema = readers_schema 266 end
Public Instance Methods
read(decoder)
click to toggle source
# File lib/tros/io.rb 268 def read(decoder) 269 self.readers_schema = writers_schema unless readers_schema 270 read_data(writers_schema, readers_schema, decoder) 271 end
read_array(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/tros/io.rb 328 def read_array(writers_schema, readers_schema, decoder) 329 read_items = [] 330 block_count = decoder.read_long 331 while block_count != 0 332 if block_count < 0 333 block_count = -block_count 334 block_size = decoder.read_long 335 end 336 block_count.times do 337 read_items << read_data(writers_schema.items, 338 readers_schema.items, 339 decoder) 340 end 341 block_count = decoder.read_long 342 end 343 344 read_items 345 end
read_data(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/tros/io.rb 273 def read_data(writers_schema, readers_schema, decoder) 274 # schema matching 275 unless self.class.match_schemas(writers_schema, readers_schema) 276 raise SchemaMatchException.new(writers_schema, readers_schema) 277 end 278 279 # schema resolution: reader's schema is a union, writer's 280 # schema is not 281 if writers_schema.type_sym != :union && readers_schema.type_sym == :union 282 rs = readers_schema.schemas.find{|s| 283 self.class.match_schemas(writers_schema, s) 284 } 285 return read_data(writers_schema, rs, decoder) if rs 286 raise SchemaMatchException.new(writers_schema, readers_schema) 287 end 288 289 # function dispatch for reading data based on type of writer's 290 # schema 291 case writers_schema.type_sym 292 when :null; decoder.read_null 293 when :boolean; decoder.read_boolean 294 when :string; decoder.read_string 295 when :int; decoder.read_int 296 when :long; decoder.read_long 297 when :float; decoder.read_float 298 when :double; decoder.read_double 299 when :bytes; decoder.read_bytes 300 when :fixed; read_fixed(writers_schema, readers_schema, decoder) 301 when :enum; read_enum(writers_schema, readers_schema, decoder) 302 when :array; read_array(writers_schema, readers_schema, decoder) 303 when :map; read_map(writers_schema, readers_schema, decoder) 304 when :union; read_union(writers_schema, readers_schema, decoder) 305 when :record, :error, :request; read_record(writers_schema, readers_schema, decoder) 306 else 307 raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}" 308 end 309 end
read_default_value(field_schema, default_value)
click to toggle source
# File lib/tros/io.rb 404 def read_default_value(field_schema, default_value) 405 # Basically a JSON Decoder? 406 case field_schema.type_sym 407 when :null 408 return nil 409 when :boolean 410 return default_value 411 when :int, :long 412 return Integer(default_value) 413 when :float, :double 414 return Float(default_value) 415 when :enum, :fixed, :string, :bytes 416 return default_value 417 when :array 418 read_array = [] 419 default_value.each do |json_val| 420 item_val = read_default_value(field_schema.items, json_val) 421 read_array << item_val 422 end 423 return read_array 424 when :map 425 read_map = {} 426 default_value.each do |key, json_val| 427 map_val = read_default_value(field_schema.values, json_val) 428 read_map[key] = map_val 429 end 430 return read_map 431 when :union 432 return read_default_value(field_schema.schemas[0], default_value) 433 when :record, :error 434 read_record = {} 435 field_schema.fields.each do |field| 436 json_val = default_value[field.name] 437 json_val = field.default unless json_val 438 field_val = read_default_value(field.type, json_val) 439 read_record[field.name] = field_val 440 end 441 return read_record 442 else 443 fail_msg = "Unknown type: #{field_schema.type}" 444 raise AvroError, fail_msg 445 end 446 end
read_enum(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/tros/io.rb 315 def read_enum(writers_schema, readers_schema, decoder) 316 index_of_symbol = decoder.read_int 317 read_symbol = writers_schema.symbols[index_of_symbol] 318 319 # TODO(jmhodges): figure out what unset means for resolution 320 # schema resolution 321 unless readers_schema.symbols.include?(read_symbol) 322 # 'unset' here 323 end 324 325 read_symbol 326 end
read_fixed(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/tros/io.rb 311 def read_fixed(writers_schema, readers_schema, decoder) 312 decoder.read(writers_schema.size) 313 end
read_map(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/tros/io.rb 347 def read_map(writers_schema, readers_schema, decoder) 348 read_items = {} 349 block_count = decoder.read_long 350 while block_count != 0 351 if block_count < 0 352 block_count = -block_count 353 block_size = decoder.read_long 354 end 355 block_count.times do 356 key = decoder.read_string 357 read_items[key] = read_data(writers_schema.values, 358 readers_schema.values, 359 decoder) 360 end 361 block_count = decoder.read_long 362 end 363 364 read_items 365 end
read_record(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/tros/io.rb 374 def read_record(writers_schema, readers_schema, decoder) 375 readers_fields_hash = readers_schema.fields_hash 376 read_record = {} 377 writers_schema.fields.each do |field| 378 if readers_field = readers_fields_hash[field.name] 379 field_val = read_data(field.type, readers_field.type, decoder) 380 read_record[field.name] = field_val 381 else 382 skip_data(field.type, decoder) 383 end 384 end 385 386 # fill in the default values 387 if readers_fields_hash.size > read_record.size 388 writers_fields_hash = writers_schema.fields_hash 389 readers_fields_hash.each do |field_name, field| 390 unless writers_fields_hash.has_key? field_name 391 if !field.default.nil? 392 field_val = read_default_value(field.type, field.default) 393 read_record[field.name] = field_val 394 else 395 # FIXME(jmhodges) another 'unset' here 396 end 397 end 398 end 399 end 400 401 read_record 402 end
read_union(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/tros/io.rb 367 def read_union(writers_schema, readers_schema, decoder) 368 index_of_schema = decoder.read_long 369 selected_writers_schema = writers_schema.schemas[index_of_schema] 370 371 read_data(selected_writers_schema, readers_schema, decoder) 372 end
skip_array(writers_schema, decoder)
click to toggle source
# File lib/tros/io.rb 496 def skip_array(writers_schema, decoder) 497 skip_blocks(decoder) { skip_data(writers_schema.items, decoder) } 498 end
skip_data(writers_schema, decoder)
click to toggle source
# File lib/tros/io.rb 448 def skip_data(writers_schema, decoder) 449 case writers_schema.type_sym 450 when :null 451 decoder.skip_null 452 when :boolean 453 decoder.skip_boolean 454 when :string 455 decoder.skip_string 456 when :int 457 decoder.skip_int 458 when :long 459 decoder.skip_long 460 when :float 461 decoder.skip_float 462 when :double 463 decoder.skip_double 464 when :bytes 465 decoder.skip_bytes 466 when :fixed 467 skip_fixed(writers_schema, decoder) 468 when :enum 469 skip_enum(writers_schema, decoder) 470 when :array 471 skip_array(writers_schema, decoder) 472 when :map 473 skip_map(writers_schema, decoder) 474 when :union 475 skip_union(writers_schema, decoder) 476 when :record, :error, :request 477 skip_record(writers_schema, decoder) 478 else 479 raise AvroError, "Unknown schema type: #{writers_schema.type}" 480 end 481 end
skip_enum(writers_schema, decoder)
click to toggle source
# File lib/tros/io.rb 487 def skip_enum(writers_schema, decoder) 488 decoder.skip_int 489 end
skip_fixed(writers_schema, decoder)
click to toggle source
# File lib/tros/io.rb 483 def skip_fixed(writers_schema, decoder) 484 decoder.skip(writers_schema.size) 485 end
skip_map(writers_schema, decoder)
click to toggle source
# File lib/tros/io.rb 500 def skip_map(writers_schema, decoder) 501 skip_blocks(decoder) { 502 decoder.skip_string 503 skip_data(writers_schema.values, decoder) 504 } 505 end
skip_record(writers_schema, decoder)
click to toggle source
# File lib/tros/io.rb 507 def skip_record(writers_schema, decoder) 508 writers_schema.fields.each{|f| skip_data(f.type, decoder) } 509 end
skip_union(writers_schema, decoder)
click to toggle source
# File lib/tros/io.rb 491 def skip_union(writers_schema, decoder) 492 index = decoder.read_long 493 skip_data(writers_schema.schemas[index], decoder) 494 end
Private Instance Methods
skip_blocks(decoder, &blk)
click to toggle source
# File lib/tros/io.rb 512 def skip_blocks(decoder, &blk) 513 block_count = decoder.read_long 514 while block_count != 0 515 if block_count < 0 516 decoder.skip(decoder.read_long) 517 else 518 block_count.times &blk 519 end 520 block_count = decoder.read_long 521 end 522 end