class Fluent::BigQueryOutput

TODO: error classes for each api error responses

class BigQueryAPIError < StandardError end

Constants

REGEXP_MAX_NUM
TODO: record field stream inserts doesn't works well?
 At table creation, table type json + field type record -> field type validation fails
 At streaming inserts, schema cannot be specified

config_param :field_record, :string, defualt: nil config_param :optional_data_field, :string, default: nil

RETRYABLE_ERROR_REASON

developers.google.com/bigquery/browser-tool-quickstart developers.google.com/bigquery/bigquery-api-quickstart

Public Class Methods

new() click to toggle source
Table types

developers.google.com/bigquery/docs/tables

type - The following data types are supported; see Data Formats for details on each data type: STRING INTEGER FLOAT BOOLEAN RECORD A JSON object, used when importing nested records. This type is only available when using JSON source files.

mode - Whether a field can be null. The following values are supported: NULLABLE - The cell can be null. REQUIRED - The cell cannot be null. REPEATED - Zero or more repeated simple or nested subfields. This mode is only supported when using JSON source files.

Calls superclass method
# File lib/fluent/plugin/out_bigquery.rb, line 165
def initialize
  super
  require 'json'
  require 'google/apis/bigquery_v2'
  require 'googleauth'
  require 'active_support/json'
  require 'active_support/core_ext/hash'
  require 'active_support/core_ext/object/json'

  # MEMO: signet-0.6.1 depend on Farady.default_connection
  Faraday.default_connection.options.timeout = 60
end

Public Instance Methods

client() click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 285
def client
  return @cached_client if @cached_client && @cached_client_expiration > Time.now

  client = Google::Apis::BigqueryV2::BigqueryService.new

  scope = "https://www.googleapis.com/auth/bigquery"

  case @auth_method
  when 'private_key'
    require 'google/api_client/auth/key_utils'
    key = Google::APIClient::KeyUtils.load_from_pkcs12(@private_key_path, @private_key_passphrase)
    auth = Signet::OAuth2::Client.new(
            token_credential_uri: "https://accounts.google.com/o/oauth2/token",
            audience: "https://accounts.google.com/o/oauth2/token",
            scope: scope,
            issuer: @email,
            signing_key: key)

  when 'compute_engine'
    auth = Google::Auth::GCECredentials.new

  when 'json_key'
    if File.exist?(@json_key)
      auth = File.open(@json_key) do |f|
        Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: f, scope: scope)
      end
    else
      key = StringIO.new(@json_key)
      auth = Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: key, scope: scope)
    end

  when 'application_default'
    auth = Google::Auth.get_application_default([scope])

  else
    raise ConfigError, "Unknown auth method: #{@auth_method}"
  end

  client.authorization = auth

  @cached_client_expiration = Time.now + 1800
  @cached_client = client
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigquery.rb, line 183
def configure(conf)
  if conf["method"] == "load"
    configure_for_load(conf)
  else
    configure_for_insert(conf)
  end
  super

  if @method == "insert"
    extend(InsertImplementation)
  elsif @method == "load"
    require 'digest/sha1'
    extend(LoadImplementation)
  else
    raise Fluend::ConfigError, "'method' must be 'insert' or 'load'"
  end

  case @auth_method
  when 'private_key'
    unless @email && @private_key_path
      raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'"
    end
  when 'compute_engine'
    # Do nothing
  when 'json_key'
    unless @json_key
      raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'"
    end
  when 'application_default'
    # Do nothing
  else
    raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}"
  end

  unless @table.nil? ^ @tables.nil?
    raise Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid"
  end

  @tablelist = @tables ? @tables.split(',') : [@table]

  @fields = RecordSchema.new('record')
  if @schema_path
    @fields.load_schema(JSON.parse(File.read(@schema_path)))
  end

  types = %w(string integer float boolean timestamp)
  types.each do |type|
    raw_fields = instance_variable_get("@field_#{type}")
    next unless raw_fields
    raw_fields.split(',').each do |field|
      @fields.register_field field.strip, type.to_sym
    end
  end

  @regexps = {}
  (1..REGEXP_MAX_NUM).each do |i|
    next unless conf["replace_record_key_regexp#{i}"]
    regexp, replacement = conf["replace_record_key_regexp#{i}"].split(/ /, 2)
    raise ConfigError, "replace_record_key_regexp#{i} does not contain 2 parameters" unless replacement
    raise ConfigError, "replace_record_key_regexp#{i} contains a duplicated key, #{regexp}" if @regexps[regexp]
    @regexps[regexp] = replacement
  end

  @localtime = false if @localtime.nil? && @utc

  @timef = TimeFormatter.new(@time_format, @localtime)

  if @time_field
    keys = @time_field.split('.')
    last_key = keys.pop
    @add_time_field = ->(record, time) {
      keys.inject(record) { |h, k| h[k] ||= {} }[last_key] = @timef.format(time)
      record
    }
  else
    @add_time_field = ->(record, time) { record }
  end

  if @insert_id_field
    insert_id_keys = @insert_id_field.split('.')
    @get_insert_id = ->(record) {
      insert_id_keys.inject(record) {|h, k| h[k] }
    }
  else
    @get_insert_id = nil
  end
end
configure_for_insert(conf) click to toggle source

default for insert

# File lib/fluent/plugin/out_bigquery.rb, line 24
def configure_for_insert(conf)
  raise ConfigError unless conf["method"] != "load"

  conf["buffer_type"]                = "lightening"  unless conf["buffer_type"]
  conf["flush_interval"]             = 0.25          unless conf["flush_interval"]
  conf["try_flush_interval"]         = 0.05          unless conf["try_flush_interval"]
  conf["buffer_chunk_limit"]         = 1 * 1024 ** 2 unless conf["buffer_chunk_limit"] # 1MB
  conf["buffer_queue_limit"]         = 1024          unless conf["buffer_queue_limit"]
  conf["buffer_chunk_records_limit"] = 500           unless conf["buffer_chunk_records_limit"]
end
configure_for_load(conf) click to toggle source

default for loads

# File lib/fluent/plugin/out_bigquery.rb, line 36
def configure_for_load(conf)
  raise ConfigError unless conf["method"] == "load"

  # buffer_type, flush_interval, try_flush_interval is TimeSlicedOutput default
  conf["buffer_chunk_limit"] = 1 * 1024 ** 3 unless conf["buffer_chunk_limit"] # 1GB
  conf["buffer_queue_limit"] = 64            unless conf["buffer_queue_limit"]
end
create_table(table_id) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 342
def create_table(table_id)
  client.insert_table(@project, @dataset, {
    table_reference: {
      table_id: table_id,
    },
    schema: {
      fields: @fields.to_a,
    }
  }, {})
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
  # api_error? -> client cache clear
  @cached_client = nil

  message = e.message
  if e.status_code == 409 && /Already Exists:/ =~ message
    # ignore 'Already Exists' error
    return
  end
  log.error "tables.insert API", :project_id => @project, :dataset => @dataset, :table => table_id, :code => e.status_code, :message => message
  raise "failed to create table in bigquery" # TODO: error class
end
fetch_schema(allow_overwrite = true) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 388
def fetch_schema(allow_overwrite = true)
  table_id = nil
  @fetch_schema_mutex.synchronize do
    if Fluent::Engine.now - @last_fetch_schema_time > @schema_cache_expire
      table_id_format = @tablelist[0]
      table_id = generate_table_id(table_id_format, Time.at(Fluent::Engine.now))
      res = client.get_table(@project, @dataset, table_id)

      schema = res.schema.fields.as_json
      log.debug "Load schema from BigQuery: #{@project}:#{@dataset}.#{table_id} #{schema}"
      if allow_overwrite
        fields = RecordSchema.new("record")
        fields.load_schema(schema, allow_overwrite)
        @fields = fields
      else
        @fields.load_schema(schema, allow_overwrite)
      end
      @last_fetch_schema_time = Fluent::Engine.now
    end
  end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
  # api_error? -> client cache clear
  @cached_client = nil
  message = e.message
  log.error "tables.get API", project_id: @project, dataset: @dataset, table: table_id, code: e.status_code, message: message
  if @fields.empty?
    raise "failed to fetch schema from bigquery" # TODO: error class
  else
    log.warn "Use previous schema"
    @last_fetch_schema_time = Fluent::Engine.now
  end
end
generate_table_id(table_id_format, current_time, chunk = nil) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 329
def generate_table_id(table_id_format, current_time, chunk = nil)
  table_id = current_time.strftime(table_id_format)
  if chunk
    table_id.gsub(%r(%{time_slice})) { |expr|
      chunk.key
    }
  else
    table_id.gsub(%r(%{time_slice})) { |expr|
      current_time.strftime(@time_slice_format)
    }
  end
end
replace_record_key(record) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 364
def replace_record_key(record)
  new_record = {}
  record.each do |key, _|
    new_key = key
    @regexps.each do |regexp, replacement|
      new_key = new_key.gsub(/#{regexp}/, replacement)
    end
    new_key = new_key.gsub(/\W/, '')
    new_record.store(new_key, record[key])
  end
  new_record
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigquery.rb, line 271
def start
  super

  @cached_client = nil
  @cached_client_expiration = nil

  @tables_queue = @tablelist.dup.shuffle
  @tables_mutex = Mutex.new
  @fetch_schema_mutex = Mutex.new

  @last_fetch_schema_time = 0
  fetch_schema(false) if @fetch_schema
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 377
def write(chunk)
  table_id_format = @tables_mutex.synchronize do
    t = @tables_queue.shift
    @tables_queue.push t
    t
  end
  table_id = generate_table_id(table_id_format, Time.at(Fluent::Engine.now), chunk)
  template_suffix = @template_suffix ? generate_table_id(@template_suffix, Time.at(Fluent::Engine.now), chunk) : nil
  _write(chunk, table_id, template_suffix)
end