class AvroUtils
Public Class Methods
all_names_valid?(string_schema)
click to toggle source
# File lib/utils/avro_utils.rb, line 76 def all_names_valid?(string_schema) result = { } names = string_schema.scan(/"name":"[\w][^"]+/).map { |x| x.split(':"')[1] } invalid_pt = /[^A-Za-z0-9_]/ names.each do |name| invalid_chars = name.scan(invalid_pt) if invalid_chars.any? result.store(name, invalid_chars) end end if result.any? error = "The following names have invalid characters:\n" result.each do |key, value| error += "#{key}: #{value}\n" end raise InvalidDataException.new(error) end true end
avro_schema(hash_data, record_type_name)
click to toggle source
# File lib/utils/avro_utils.rb, line 26 def avro_schema(hash_data, record_type_name) Avro::Schema.parse(avro_schema_hash(hash_data, record_type_name).to_json) end
avro_schema_hash(hash_data, record_type_name)
click to toggle source
# File lib/utils/avro_utils.rb, line 4 def avro_schema_hash(hash_data, record_type_name) process_data do result = { type: 'record', name: record_type_name, fields: [] } hash_data.each do |key, value| if value.is_a? Hash result[:fields] << { name: key, type: avro_schema_hash(value, key.to_s.singularize.camelize) } elsif value.is_a? Array result[:fields] << { name: key, type: { type: 'array', items: RUBY_AVRO_TYPE_MAPPING[value[0].class.name.to_sym] } } else result[:fields] << { name: key, type: RUBY_AVRO_TYPE_MAPPING[value.class.name.to_sym] } end end result end end
bulk_json_to_avro(collection, record_type_name, filename = nil)
click to toggle source
# File lib/utils/avro_utils.rb, line 47 def bulk_json_to_avro(collection, record_type_name, filename = nil) process_data do buffer = (filename.nil?)? StringIO.new : File.new(filename, 'wb') schema = avro_schema(hash_with_string_keys(collection.first), record_type_name) file_writer = Avro::DataFile::Writer.new(buffer, Avro::IO::DatumWriter.new(schema), schema) collection.each do |json_data| hash_data = hash_with_string_keys(json_data) file_writer << hash_data end file_writer.flush buffer.rewind buffer.close if buffer.is_a? File buffer end end
hash_with_string_keys(json_data)
click to toggle source
Avro assumes that all the hashes use strings for keys. It does not accept Indifferent Hash. So the conversion bellow ensures that the hash data uses strings for keys.
# File lib/utils/avro_utils.rb, line 102 def hash_with_string_keys(json_data) process_data do json_data = json_data.to_json if json_data.is_a? Hash hash_data = JSON.parse(json_data) end end
is_valid_schema?(schema)
click to toggle source
# File lib/utils/avro_utils.rb, line 67 def is_valid_schema?(schema) process_data do working_schema = ( [Hash, Array].include?(schema.class) )? schema.to_json : schema.dup all_names_valid?(working_schema) Avro::Schema.parse(working_schema) true end end
json_to_avro(json_data, record_type_name, filename = nil)
click to toggle source
# File lib/utils/avro_utils.rb, line 30 def json_to_avro(json_data, record_type_name, filename = nil) process_data do buffer = (filename.nil?)? StringIO.new : File.new(filename, 'wb') hash_data = hash_with_string_keys(json_data) schema = avro_schema(hash_data, record_type_name) file_writer = Avro::DataFile::Writer.new(buffer, Avro::IO::DatumWriter.new(schema), schema) file_writer << hash_data file_writer.flush buffer.rewind buffer.close if buffer.is_a? File buffer end end
Private Class Methods
process_data() { || ... }
click to toggle source
# File lib/utils/avro_utils.rb, line 111 def process_data yield rescue StandardError => error puts("Error: #{error.message}\n#{error.backtrace.join("\n")}") raise InvalidDataException.new(error.message) end