class AvroUtils

Public Class Methods

all_names_valid?(string_schema) click to toggle source
# File lib/utils/avro_utils.rb, line 76
def all_names_valid?(string_schema)
  result = { }  
  names = string_schema.scan(/"name":"[\w][^"]+/).map { |x| x.split(':"')[1] }
  invalid_pt = /[^A-Za-z0-9_]/
  names.each do |name|
    invalid_chars = name.scan(invalid_pt)
    if invalid_chars.any?
      result.store(name, invalid_chars)
    end
  end

  if result.any?
    error = "The following names have invalid characters:\n"
    result.each do |key, value|
      error += "#{key}: #{value}\n"
    end

    raise InvalidDataException.new(error)
  end

  true
end
avro_schema(hash_data, record_type_name) click to toggle source
# File lib/utils/avro_utils.rb, line 26
def avro_schema(hash_data, record_type_name)
  Avro::Schema.parse(avro_schema_hash(hash_data, record_type_name).to_json)
end
avro_schema_hash(hash_data, record_type_name) click to toggle source
# File lib/utils/avro_utils.rb, line 4
def avro_schema_hash(hash_data, record_type_name)
  process_data do  
    result = { 
      type: 'record',
      name: record_type_name,
      fields: []
    }

    hash_data.each do |key, value|
      if value.is_a? Hash 
        result[:fields] << { name: key, type: avro_schema_hash(value, key.to_s.singularize.camelize) }
      elsif value.is_a? Array
        result[:fields] << { name: key, type: { type: 'array', items: RUBY_AVRO_TYPE_MAPPING[value[0].class.name.to_sym] } }
      else
        result[:fields] << { name: key, type: RUBY_AVRO_TYPE_MAPPING[value.class.name.to_sym] }
      end
    end
    
    result
  end
end
bulk_json_to_avro(collection, record_type_name, filename = nil) click to toggle source
# File lib/utils/avro_utils.rb, line 47
def bulk_json_to_avro(collection, record_type_name, filename = nil)
  process_data do
    buffer = (filename.nil?)? StringIO.new : File.new(filename, 'wb')

    schema = avro_schema(hash_with_string_keys(collection.first), record_type_name)
    file_writer  = Avro::DataFile::Writer.new(buffer, Avro::IO::DatumWriter.new(schema), schema)

    collection.each do |json_data|
      hash_data = hash_with_string_keys(json_data)
      file_writer << hash_data
    end

    file_writer.flush

    buffer.rewind
    buffer.close if buffer.is_a? File
    buffer
  end
end
hash_with_string_keys(json_data) click to toggle source

Avro assumes that all the hashes use strings for keys. It does not accept Indifferent Hash. So the conversion bellow ensures that the hash data uses strings for keys.

# File lib/utils/avro_utils.rb, line 102
def hash_with_string_keys(json_data)
  process_data do
    json_data = json_data.to_json if json_data.is_a? Hash
    hash_data = JSON.parse(json_data)
  end
end
is_valid_schema?(schema) click to toggle source
# File lib/utils/avro_utils.rb, line 67
def is_valid_schema?(schema)
  process_data do
    working_schema = ( [Hash, Array].include?(schema.class) )? schema.to_json : schema.dup
    all_names_valid?(working_schema)
    Avro::Schema.parse(working_schema)
    true
  end
end
json_to_avro(json_data, record_type_name, filename = nil) click to toggle source
# File lib/utils/avro_utils.rb, line 30
def json_to_avro(json_data, record_type_name, filename = nil)
  process_data do  
    buffer = (filename.nil?)? StringIO.new : File.new(filename, 'wb')
    hash_data = hash_with_string_keys(json_data)

    schema = avro_schema(hash_data, record_type_name)

    file_writer  = Avro::DataFile::Writer.new(buffer, Avro::IO::DatumWriter.new(schema), schema)
    file_writer << hash_data
    file_writer.flush

    buffer.rewind
    buffer.close if buffer.is_a? File
    buffer
  end
end

Private Class Methods

process_data() { || ... } click to toggle source
# File lib/utils/avro_utils.rb, line 111
def process_data
  yield
rescue StandardError => error
  puts("Error: #{error.message}\n#{error.backtrace.join("\n")}")
  raise InvalidDataException.new(error.message)
end