class Fluent::BigQuery::RecordSchema

Constants

FIELD_TYPES

Public Class Methods

new(name, mode = :nullable) click to toggle source
Calls superclass method Fluent::BigQuery::FieldSchema::new
# File lib/fluent/plugin/bigquery/schema.rb, line 180
def initialize(name, mode = :nullable)
  super(name, mode)
  @fields = {}
end

Public Instance Methods

[](name) click to toggle source
# File lib/fluent/plugin/bigquery/schema.rb, line 189
def [](name)
  @fields[name]
end
empty?() click to toggle source
# File lib/fluent/plugin/bigquery/schema.rb, line 193
def empty?
  @fields.empty?
end
format_one(record) click to toggle source
# File lib/fluent/plugin/bigquery/schema.rb, line 250
def format_one(record)
  out = {}
  record.each do |key, value|
    next if value.nil?
    schema = @fields[key]
    out[key] = schema ? schema.format(value) : value
  end
  out
end
load_schema(schema, allow_overwrite=true) click to toggle source
# File lib/fluent/plugin/bigquery/schema.rb, line 212
def load_schema(schema, allow_overwrite=true)
  schema.each do |field|
    raise ConfigError, 'field must have type' unless field.key?('type')

    name = field['name']
    mode = (field['mode'] || 'nullable').downcase.to_sym

    type = field['type'].downcase.to_sym
    field_schema_class = FIELD_TYPES[type]
    raise ConfigError, "Invalid field type: #{field['type']}" unless field_schema_class

    next if @fields.key?(name) and !allow_overwrite

    field_schema = field_schema_class.new(name, mode)
    @fields[name] = field_schema
    if type == :record
      raise ConfigError, "record field must have fields" unless field.key?('fields')
      field_schema.load_schema(field['fields'], allow_overwrite)
    end
  end
end
register_field(name, type) click to toggle source
# File lib/fluent/plugin/bigquery/schema.rb, line 234
def register_field(name, type)
  if @fields.key?(name) and @fields[name].type != :timestamp
    raise ConfigError, "field #{name} is registered twice"
  end
  if name[/\./]
    recordname = $`
    fieldname = $'
    register_record_field(recordname)
    @fields[recordname].register_field(fieldname, type)
  else
    schema = FIELD_TYPES[type]
    raise ConfigError, "[Bug] Invalid field type #{type}" unless schema
    @fields[name] = schema.new(name)
  end
end
to_a() click to toggle source
# File lib/fluent/plugin/bigquery/schema.rb, line 197
def to_a
  @fields.map do |_, field_schema|
    field_schema.to_h
  end
end
to_h() click to toggle source
# File lib/fluent/plugin/bigquery/schema.rb, line 203
def to_h
  {
    :name => name,
    :type => type.to_s.upcase,
    :mode => mode.to_s.upcase,
    :fields => self.to_a,
  }
end
type() click to toggle source
# File lib/fluent/plugin/bigquery/schema.rb, line 185
def type
  :record
end

Private Instance Methods

register_record_field(name) click to toggle source
# File lib/fluent/plugin/bigquery/schema.rb, line 261
def register_record_field(name)
  if !@fields.key?(name)
    @fields[name] = RecordSchema.new(name)
  else
    unless @fields[name].kind_of?(RecordSchema)
      raise ConfigError, "field #{name} is required to be a record but already registered as #{@field[name]}"
    end
  end
end