class Fluent::BigQueryOutput

Constants

REGEXP_MAX_NUM
TODO: record field stream inserts doesn't works well?
 At table creation, table type json + field type record -> field type validation fails
 At streaming inserts, schema cannot be specified

config_param :field_record, :string, defualt: nil config_param :optional_data_field, :string, default: nil

Public Class Methods

new() click to toggle source
Table types

developers.google.com/bigquery/docs/tables

type - The following data types are supported; see Data Formats for details on each data type: STRING INTEGER FLOAT BOOLEAN RECORD A JSON object, used when importing nested records. This type is only available when using JSON source files.

mode - Whether a field can be null. The following values are supported: NULLABLE - The cell can be null. REQUIRED - The cell cannot be null. REPEATED - Zero or more repeated simple or nested subfields. This mode is only supported when using JSON source files.

Calls superclass method
# File lib/fluent/plugin/out_bigquery.rb, line 166
def initialize
  super
  require 'multi_json'
  require 'google/apis/bigquery_v2'
  require 'googleauth'
  require 'active_support/json'
  require 'active_support/core_ext/hash'
  require 'active_support/core_ext/object/json'

  # MEMO: signet-0.6.1 depend on Farady.default_connection
  Faraday.default_connection.options.timeout = 60
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigquery.rb, line 179
def configure(conf)
  if conf["method"] == "load"
    configure_for_load(conf)
  else
    configure_for_insert(conf)
  end
  super

  case @method
  when :insert
    extend(InsertImplementation)
  when :load
    raise Fluent::ConfigError, "'template_suffix' is for only `insert` mode, instead use 'fetch_schema_table' and formatted table name" if @template_suffix
    extend(LoadImplementation)
  else
    raise Fluent::ConfigError "'method' must be 'insert' or 'load'"
  end

  case @auth_method
  when :private_key
    unless @email && @private_key_path
      raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'"
    end
  when :compute_engine
    # Do nothing
  when :json_key
    unless @json_key
      raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'"
    end
  when :application_default
    # Do nothing
  else
    raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}"
  end

  unless @table.nil? ^ @tables.nil?
    raise Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid"
  end

  @tablelist = @tables ? @tables.split(',') : [@table]

  legacy_schema_config_deprecation
  @fields = Fluent::BigQuery::RecordSchema.new('record')
  if @schema
    @fields.load_schema(@schema)
  end
  if @schema_path
    @fields.load_schema(MultiJson.load(File.read(@schema_path)))
  end

  types = %w(string integer float boolean timestamp)
  types.each do |type|
    raw_fields = instance_variable_get("@field_#{type}")
    next unless raw_fields
    raw_fields.split(',').each do |field|
      @fields.register_field field.strip, type.to_sym
    end
  end

  @regexps = {}
  (1..REGEXP_MAX_NUM).each do |i|
    next unless conf["replace_record_key_regexp#{i}"]
    regexp, replacement = conf["replace_record_key_regexp#{i}"].split(/ /, 2)
    raise ConfigError, "replace_record_key_regexp#{i} does not contain 2 parameters" unless replacement
    raise ConfigError, "replace_record_key_regexp#{i} contains a duplicated key, #{regexp}" if @regexps[regexp]
    @regexps[regexp] = replacement
  end

  @localtime = false if @localtime.nil? && @utc

  @timef = TimeFormatter.new(@time_format, @localtime)

  if @time_field
    keys = @time_field.split('.')
    last_key = keys.pop
    @add_time_field = ->(record, time) {
      keys.inject(record) { |h, k| h[k] ||= {} }[last_key] = @timef.format(time)
      record
    }
  else
    @add_time_field = ->(record, time) { record }
  end

  if @insert_id_field
    insert_id_keys = @insert_id_field.split('.')
    @get_insert_id = ->(record) {
      insert_id_keys.inject(record) {|h, k| h[k] }
    }
  else
    @get_insert_id = nil
  end

  warn "[DEPRECATION] `convert_hash_to_json` param is deprecated. If Hash value is inserted string field, plugin convert it to json automatically." if @convert_hash_to_json
end
configure_for_insert(conf) click to toggle source

default for insert

# File lib/fluent/plugin/out_bigquery.rb, line 22
def configure_for_insert(conf)
  raise ConfigError unless conf["method"] != "load"

  conf["buffer_type"]                = "lightening"  unless conf["buffer_type"]
  conf["flush_interval"]             = 0.25          unless conf["flush_interval"]
  conf["try_flush_interval"]         = 0.05          unless conf["try_flush_interval"]
  conf["buffer_chunk_limit"]         = 1 * 1024 ** 2 unless conf["buffer_chunk_limit"] # 1MB
  conf["buffer_queue_limit"]         = 1024          unless conf["buffer_queue_limit"]
  conf["buffer_chunk_records_limit"] = 500           unless conf["buffer_chunk_records_limit"]
end
configure_for_load(conf) click to toggle source

default for loads

# File lib/fluent/plugin/out_bigquery.rb, line 34
def configure_for_load(conf)
  raise ConfigError unless conf["method"] == "load"

  # buffer_type, flush_interval, try_flush_interval is TimeSlicedOutput default
  conf["buffer_chunk_limit"] = 1 * 1024 ** 3 unless conf["buffer_chunk_limit"] # 1GB
  conf["buffer_queue_limit"] = 32            unless conf["buffer_queue_limit"]
end
convert_hash_to_json(record) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 333
def convert_hash_to_json(record)
  record.each do |key, value|
    if value.class == Hash
      record[key] = MultiJson.dump(value)
    end
  end
  record
end
fetch_schema(allow_overwrite = true) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 358
def fetch_schema(allow_overwrite = true)
  table_id = nil
  @fetch_schema_mutex.synchronize do
    if Fluent::Engine.now - @last_fetch_schema_time > @schema_cache_expire
      table_id_format = @fetch_schema_table || @tablelist[0]
      table_id = generate_table_id(table_id_format, Time.at(Fluent::Engine.now))
      schema = writer.fetch_schema(@project, @dataset, table_id)

      if schema
        if allow_overwrite
          fields = Fluent::BigQuery::RecordSchema.new("record")
          fields.load_schema(schema, allow_overwrite)
          @fields = fields
        else
          @fields.load_schema(schema, allow_overwrite)
        end
      else
        if @fields.empty?
          raise "failed to fetch schema from bigquery"
        else
          log.warn "#{table_id} uses previous schema"
        end
      end

      @last_fetch_schema_time = Fluent::Engine.now
    end
  end
end
generate_table_id(table_id_format, current_time, row = nil, chunk = nil) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 293
def generate_table_id(table_id_format, current_time, row = nil, chunk = nil)
  format, col = table_id_format.split(/@/)
  time = if col && row
           keys = col.split('.')
           t = keys.inject(row[:json]) {|obj, attr| obj[attr.to_sym] }
           Time.at(t)
         else
           current_time
         end
  if row && format =~ /\$\{/
    format.gsub!(/\$\{\s*(\w+)\s*\}/) do |m|
      row[:json][$1.to_sym].to_s.gsub(/[^\w]/, '')
    end
  end
  table_id = time.strftime(format)

  if chunk
    table_id.gsub(%r(%{time_slice})) { |expr|
      chunk.key
    }
  else
    table_id.gsub(%r(%{time_slice})) { |expr|
      current_time.strftime(@time_slice_format)
    }
  end
end
legacy_schema_config_deprecation() click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 342
def legacy_schema_config_deprecation
  if [@field_string, @field_integer, @field_float, @field_boolean, @field_timestamp].any?
    warn "[DEPRECATION] `field_*` style schema config is deprecated. Instead of it, use `schema` config params that is array of json style."
  end
end
replace_record_key(record) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 320
def replace_record_key(record)
  new_record = {}
  record.each do |key, _|
    new_key = key
    @regexps.each do |regexp, replacement|
      new_key = new_key.gsub(/#{regexp}/, replacement)
    end
    new_key = new_key.gsub(/\W/, '')
    new_record.store(new_key, record[key])
  end
  new_record
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigquery.rb, line 274
def start
  super

  @tables_queue = @tablelist.dup.shuffle
  @tables_mutex = Mutex.new
  @fetch_schema_mutex = Mutex.new

  @last_fetch_schema_time = 0
  fetch_schema(false) if @fetch_schema
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 348
def write(chunk)
  table_id_format = @tables_mutex.synchronize do
    t = @tables_queue.shift
    @tables_queue.push t
    t
  end
  template_suffix_format = @template_suffix
  _write(chunk, table_id_format, template_suffix_format)
end
writer() click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 285
def writer
  @writer ||= Fluent::BigQuery::Writer.new(@log, @auth_method, {
    private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase,
    email: @email,
    json_key: @json_key,
  })
end