class Schema::Inference::SchemaInferrer
Constants
- INT_MAX
- NumericTypes
Attributes
separator[RW]
Public Class Methods
new(separator: '.', convert_types_to_string: false)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 6 def initialize(separator: '.', convert_types_to_string: false) @separator = separator @convert_types_to_string = convert_types_to_string end
Public Instance Methods
infer_schema(dataset: [], batch_count: 0, extended: false) { |i| ... }
click to toggle source
Generate a schema based on this collection's records. We evaluate the schema of each record and then merge all the information together. @param dataset [Array] of samples on which we will
perform the schema analysis.
@param extended [Boolean] Set to true to keep each field as a basic type.
Set to false to reduce the terminal arrays to a single key (under the type array).
@return [Hash] with one entry per 'column'/'field'. The values
contains information about the type and usage.
# File lib/schema/inference/schema_inferrer.rb, line 20 def infer_schema(dataset: [], batch_count: 0, extended: false) # support detecting schemas of single objects dataset = [dataset] if dataset.is_a?(Hash) validate_dataset(dataset) has_dataset = dataset.count > 0 || (block_given? && batch_count > 0) raise ArgumentError, 'a dataset or a block with a batch count must be passed' unless has_dataset if dataset.is_a?(Array) && dataset.count > 0 # divide in batches to process in parallel per_process = (dataset.count / Parallel.processor_count.to_f).ceil batch_count = (dataset.count / per_process.to_f).ceil end results = parallel_map(batch_count.times) { |i| batch = block_given? ? yield(i) : dataset[i*per_process...(i+1)*per_process] { partial_schema: data_schema(batch), count: batch.count } } partial_schemas = results.map { |r| r[:partial_schema] } total_count = results.map { |r| r[:count] }.reduce(:+) table_schema = process_schema_results(partial_schemas, total_count, extended) table_schema.sort_by { |k, v| -v[:usage] }.to_h end
Private Instance Methods
add_value_to_record(record:, key:, value:)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 259 def add_value_to_record(record:, key:, value:) tokens = key.split(separator) current_ref = record previous_token = tokens[0] tokens[1..-1].each_with_index { |token| if is_integer?(token) current_ref[previous_token] ||= [] current_ref = current_ref[previous_token] previous_token = token.to_i else current_ref[previous_token] ||= {} current_ref = current_ref[previous_token] previous_token = token end } current_ref[previous_token] = value end
convert_types_to_string(schema)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 283 def convert_types_to_string(schema) schema.each { |k, v| schema[k][:type] = schema[k][:type].to_s.downcase types = schema[k][:types] schema[k][:types] = types.map { |k1,v1| [k1.to_s.downcase, v1] }.to_h if types.present? } schema end
data_schema(data)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 55 def data_schema(data) table_schema = {} data.each do |record| # fetch the record schema & update the general schema rec_schema = record_schema(record) rec_schema.each do |field| field_schema = table_schema[field[:field]] ||= {type: field[:type], usage_count: 0} field_schema = table_schema[field[:field]] if field_schema[:type] != field[:type] if field_schema[:type] == NilClass # if it was set as nil, we now set it to a concrete type field_schema[:type] = field[:type] elsif field[:type] != nil # if it had a different (non-nil) type, then try to upcast field_schema[:type] = lowest_common_type(field[:type], field_schema[:type]) end end field_schema[:usage_count] += 1 field_schema[:types] ||= {} field_schema[:types][field[:type]] ||= { count: 0 } field_schema[:types][field[:type]][:count] += 1 if type_has_min_max?(field[:type]) field_size = value_length(field[:inferred_value], field[:type]) field_schema[:types][field[:type]][:min] = [field_schema[:types][field[:type]][:min] || INT_MAX, field_size].min field_schema[:types][field[:type]][:max] = [field_schema[:types][field[:type]][:max] || 0, field_size].max end end end table_schema end
detect_type_of(value)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 208 def detect_type_of(value) return Boolean if value.is_a?(TrueClass) || value.is_a?(FalseClass) if value.is_a? Integer return Integer if value.abs <= INT_MAX return String end return Numeric if value.is_a? Numeric return Time if value.is_a? Time return NilClass if value.is_a? NilClass if value.is_a? String return Integer if value =~ /^[-+]?[0-9]+$/ && value.to_i.abs <= INT_MAX return Numeric if value =~ /^[-+]?[0-9]*\.[0-9]+$/ return Boolean if %w(false true).include?(value.downcase) return Time if Timeliness.parse(value) != nil return String end Object end
inferred_value_of(value)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 232 def inferred_value_of(value) return value unless value.is_a?(String) return value.to_i if value =~ /^[-+]?[0-9]+$/ return value.to_f if value =~ /^[-+]?[0-9]*\.?[0-9]+$/ return true if value.downcase == 'true' return false if value.downcase == 'false' time_value = Timeliness.parse(value) return time_value if time_value value end
is_integer?(value)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 279 def is_integer?(value) (/^[+-]?[0-9]+$/ =~ value).present? end
key_access_tokens(key:)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 246 def key_access_tokens(key:) key.split(separator).map { |token| # only parse integers for array indexing next token unless is_integer?(token) token.to_i } end
lowest_common_type(type1, type2)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 183 def lowest_common_type(type1, type2) return type1 if type1 == type2 return Numeric if NumericTypes.include?(type1) && NumericTypes.include?(type2) Object end
parallel_map(itr, &block)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 292 def parallel_map(itr, &block) # set to true to debug code in the iteration is_debugging_impl = ENV['DEBUG'] if is_debugging_impl itr.map do |arg| block.call(arg) end else Parallel.map(itr) do |arg| block.call(arg) end end end
process_schema_results(results, total_count, extended)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 100 def process_schema_results(results, total_count, extended) # aggregate the results table_schema = results[0] results[1..-1].each { |res| table_schema.each { |k, v| next if res[k].blank? # aggregate types count, set min and max res[k][:types].each { |type, info| table_schema[k][:types][type] ||= { count: 0 } table_schema[k][:types][type][:count] += info[:count] if type_has_min_max?(type) table_schema[k][:types][type][:min] = [table_schema[k][:types][type][:min] || INT_MAX, info[:min]].min table_schema[k][:types][type][:max] = [table_schema[k][:types][type][:max] || 0, info[:max]].max end } # aggregate other informations table_schema[k][:usage_count] += res[k][:usage_count].to_i if (table_schema[k][:type] != res[k][:type]) if table_schema[k][:type] == NilClass table_schema[k][:type] = res[k][:type] elsif res[k][:type] != NilClass table_schema[k][:type] = lowest_common_type(res[k][:type], table_schema[k][:type]) end end } # make sure keys that were not in table_schema are now added. table_schema.reverse_merge!(res) } # detect and remove nulls that are part of other schemas # e.g. { 'some_data': null } and { 'some_data': { 'hash': 1 } } # shouldn't be reported as different keys table_schema.each { |k, v| next unless v[:type] == NilClass # check if there is any key that match this one plus an hash/array extension full_key_exists = table_schema.find {|full_key, _| full_key =~ /^#{k}#{Regexp.quote(separator)}.*/}.present? table_schema.delete(k) if full_key_exists } # detect and process array information unless extended terminal_array_keys = {} table_schema.keys.each { |key| is_terminal_array = /.*#{Regexp.quote(separator)}[0-9]+$/ =~ key next unless is_terminal_array key_prefix = key.split(separator)[0...-1].join(separator) terminal_array_keys[key_prefix] ||= [] terminal_array_keys[key_prefix] << key } terminal_array_keys.each do |key_prefix, keys| keys_usage_count = keys.map{ |x| table_schema[x][:usage_count] } usage_count = keys_usage_count.max # min size = how many keys have "always" been used # As the keys may not have been used at the same time, # this may not be valid depending on the array usage. min_size = keys_usage_count.count { |x| x == usage_count } max_size = keys.map { |x| x.split(separator)[-1].to_i }.max + 1 # delete keys that are part of they array keys.each { |key, _| table_schema.delete(key) } table_schema[key_prefix] = { type: Array, usage_count: usage_count, min_size: min_size, max_size: max_size } end end # add a percentage in terms of usage table_schema.each { |k, v| table_schema[k][:usage] = table_schema[k][:usage_count] / total_count.to_f } @convert_types_to_string ? convert_types_to_string(table_schema) : table_schema end
record_schema(record, name = "")
click to toggle source
Recursively explore a record and return its schema
# File lib/schema/inference/schema_inferrer.rb, line 190 def record_schema(record, name = "") if record.is_a? Hash record.flat_map { |k, v| field_name = "#{name}#{separator}#{k}" if name.present? field_name ||= k record_schema(v, field_name) } elsif record.is_a? Array record.each_with_index.flat_map { |x, index| field_name = "#{name}#{separator}#{index}" if name.present? field_name ||= k record_schema(x, field_name) } else { field: name, type: detect_type_of(record), inferred_value: inferred_value_of(record) } end end
record_value(record:, key:)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 254 def record_value(record:, key:) tokens = key_access_tokens(key: key) record.dig(*tokens) end
type_has_min_max?(type)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 91 def type_has_min_max?(type) type == String || NumericTypes.include?(type) end
validate_dataset(dataset)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 48 def validate_dataset(dataset) return if dataset.is_a?(Array) raise ArgumentError, 'dataset must be an array or a hash' end
value_length(value, type)
click to toggle source
# File lib/schema/inference/schema_inferrer.rb, line 95 def value_length(value, type) return value.to_s.length if type == String value # leave as-is otherwise end