class Fluent::BigQuery::Writer

Public Class Methods

new(log, auth_method, auth_options = {}) click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 4
def initialize(log, auth_method, auth_options = {})
  @auth_method = auth_method
  @scope = "https://www.googleapis.com/auth/bigquery"
  @auth_options = auth_options
  @log = log
  @num_errors_per_chunk = {}

  @cached_client_expiration = Time.now + 1800
end

Public Instance Methods

client() click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 14
def client
  return @client if @client && @cached_client_expiration > Time.now

  client = Google::Apis::BigqueryV2::BigqueryService.new.tap do |cl|
    cl.authorization = get_auth
  end

  @cached_client_expiration = Time.now + 1800
  @client = client
end
create_load_job(chunk_id, project, dataset, table_id, upload_source, fields, prevent_duplicate_load: false, ignore_unknown_values: false, max_bad_records: 0, timeout_sec: nil, open_timeout_sec: 60, auto_create_table: nil, time_partitioning_type: nil, time_partitioning_expiration: nil) click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 121
def create_load_job(chunk_id, project, dataset, table_id, upload_source, fields, prevent_duplicate_load: false, ignore_unknown_values: false, max_bad_records: 0, timeout_sec: nil, open_timeout_sec: 60, auto_create_table: nil, time_partitioning_type: nil, time_partitioning_expiration: nil)
  configuration = {
    configuration: {
      load: {
        destination_table: {
          project_id: project,
          dataset_id: dataset,
          table_id: table_id,
        },
        schema: {
          fields: fields.to_a,
        },
        write_disposition: "WRITE_APPEND",
        source_format: "NEWLINE_DELIMITED_JSON",
        ignore_unknown_values: ignore_unknown_values,
        max_bad_records: max_bad_records,
      }
    }
  }

  job_id = create_job_id(chunk_id, dataset, table_id, fields.to_a, max_bad_records, ignore_unknown_values) if prevent_duplicate_load
  configuration[:configuration][:load].merge!(create_disposition: "CREATE_NEVER") if time_partitioning_type
  configuration.merge!({job_reference: {project_id: project, job_id: job_id}}) if job_id

  # If target table is already exist, omit schema configuration.
  # Because schema changing is easier.
  begin
    if client.get_table(project, dataset, table_id)
      configuration[:configuration][:load].delete(:schema)
    end
  rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError
    raise Fluent::BigQuery::UnRetryableError.new("Schema is empty") if fields.empty?
  end

  res = client.insert_job(
    project,
    configuration,
    {
      upload_source: upload_source,
      content_type: "application/octet-stream",
      options: {
        timeout_sec: timeout_sec,
        open_timeout_sec: open_timeout_sec,
      }
    }
  )
  wait_load_job(chunk_id, project, dataset, res.job_reference.job_id, table_id)
  @num_errors_per_chunk.delete(chunk_id)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
  @client = nil

  reason = e.respond_to?(:reason) ? e.reason : nil
  log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason

  if auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ e.message
    # Table Not Found: Auto Create Table
    create_table(project, dataset, table_id, fields, time_partitioning_type: time_partitioning_type, time_partitioning_expiration: time_partitioning_expiration)
    raise "table created. send rows next time."
  end

  if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job
    wait_load_job(chunk_id, project, dataset, job_id, table_id)
    @num_errors_per_chunk.delete(chunk_id)
    return
  end

  raise Fluent::BigQuery::Error.wrap(e)
end
create_table(project, dataset, table_id, record_schema, time_partitioning_type: nil, time_partitioning_expiration: nil) click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 25
def create_table(project, dataset, table_id, record_schema, time_partitioning_type: nil, time_partitioning_expiration: nil)
  create_table_retry_limit = 3
  create_table_retry_wait = 1
  create_table_retry_count = 0
  table_id = safe_table_id(table_id)

  begin
    definition = {
      table_reference: {
        table_id: table_id,
      },
      schema: {
        fields: record_schema.to_a,
      }
    }

    if time_partitioning_type
      definition[:time_partitioning] = {
        type: time_partitioning_type.to_s.upcase,
        expiration_ms: time_partitioning_expiration ? time_partitioning_expiration * 1000 : nil
      }.compact
    end
    client.insert_table(project, dataset, definition, {})
    log.debug "create table", project_id: project, dataset: dataset, table: table_id
    @client = nil
  rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
    @client = nil

    message = e.message
    if e.status_code == 409 && /Already Exists:/ =~ message
      log.debug "already created table", project_id: project, dataset: dataset, table: table_id
      # ignore 'Already Exists' error
      return
    end

    reason = e.respond_to?(:reason) ? e.reason : nil
    log.error "tables.insert API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message, reason: reason

    if Fluent::BigQuery::Error.retryable_error_reason?(reason) && create_table_retry_count < create_table_retry_limit
      sleep create_table_retry_wait
      create_table_retry_wait *= 2
      create_table_retry_count += 1
      retry
    else
      raise Fluent::BigQuery::UnRetryableError.new("failed to create table in bigquery", e)
    end
  end
end
fetch_schema(project, dataset, table_id) click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 74
def fetch_schema(project, dataset, table_id)
  res = client.get_table(project, dataset, table_id)
  schema = res.schema.fields.as_json
  log.debug "Load schema from BigQuery: #{project}:#{dataset}.#{table_id} #{schema}"

  schema
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
  @client = nil
  message = e.message
  log.error "tables.get API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: message
  nil
end
insert_rows(project, dataset, table_id, rows, skip_invalid_rows: false, ignore_unknown_values: false, template_suffix: nil, timeout_sec: nil, open_timeout_sec: 60, allow_retry_insert_errors: false) click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 87
def insert_rows(project, dataset, table_id, rows, skip_invalid_rows: false, ignore_unknown_values: false, template_suffix: nil, timeout_sec: nil, open_timeout_sec: 60, allow_retry_insert_errors: false)
  body = {
    rows: rows,
    skip_invalid_rows: skip_invalid_rows,
    ignore_unknown_values: ignore_unknown_values,
  }
  body.merge!(template_suffix: template_suffix) if template_suffix
  res = client.insert_all_table_data(project, dataset, table_id, body, {
    options: {timeout_sec: timeout_sec, open_timeout_sec: open_timeout_sec}
  })
  log.debug "insert rows", project_id: project, dataset: dataset, table: table_id, count: rows.size

  if res.insert_errors && !res.insert_errors.empty?
    log.warn "insert errors", project_id: project, dataset: dataset, table: table_id, insert_errors: res.insert_errors.to_s
    if allow_retry_insert_errors
      is_included_any_retryable_insert_error = res.insert_errors.any? do |insert_error|
        insert_error.errors.any? { |error| Fluent::BigQuery::Error.retryable_insert_errors_reason?(error.reason) }
      end
      if is_included_any_retryable_insert_error
        raise Fluent::BigQuery::RetryableError.new("failed to insert into bigquery(insert errors), retry")
      else
        raise Fluent::BigQuery::UnRetryableError.new("failed to insert into bigquery(insert errors), and cannot retry")
      end
    end
  end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
  @client = nil

  reason = e.respond_to?(:reason) ? e.reason : nil
  log.error "tabledata.insertAll API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message, reason: reason

  raise Fluent::BigQuery::Error.wrap(e)
end
wait_load_job(chunk_id, project, dataset, job_id, table_id) click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 190
def wait_load_job(chunk_id, project, dataset, job_id, table_id)
  wait_interval = 10
  _response = client.get_job(project, job_id)

  until _response.status.state == "DONE"
    log.debug "wait for load job finish", state: _response.status.state, job_id: _response.job_reference.job_id
    sleep wait_interval
    _response = client.get_job(project, _response.job_reference.job_id)
  end

  errors = _response.status.errors
  if errors
    errors.each do |e|
      log.error "job.insert API (rows)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: e.message, reason: e.reason
    end
  end

  error_result = _response.status.error_result
  if error_result
    log.error "job.insert API (result)", job_id: job_id, project_id: project, dataset: dataset, table: table_id, message: error_result.message, reason: error_result.reason
    if Fluent::BigQuery::Error.retryable_error_reason?(error_result.reason)
      @num_errors_per_chunk[chunk_id] = @num_errors_per_chunk[chunk_id].to_i + 1
      raise Fluent::BigQuery::RetryableError.new("failed to load into bigquery, retry")
    else
      @num_errors_per_chunk.delete(chunk_id)
      raise Fluent::BigQuery::UnRetryableError.new("failed to load into bigquery, and cannot retry")
    end
  end

  log.debug "finish load job", state: _response.status.state
end

Private Instance Methods

create_job_id(chunk_id, dataset, table, schema, max_bad_records, ignore_unknown_values) click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 286
def create_job_id(chunk_id, dataset, table, schema, max_bad_records, ignore_unknown_values)
  job_id_key = "#{chunk_id}#{dataset}#{table}#{schema.to_s}#{max_bad_records}#{ignore_unknown_values}#{@num_errors_per_chunk[chunk_id]}"
  @log.debug "job_id_key: #{job_id_key}"
  "fluentd_job_" + Digest::SHA1.hexdigest(job_id_key)
end
get_auth() click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 228
def get_auth
  case @auth_method
  when :private_key
    get_auth_from_private_key
  when :compute_engine
    get_auth_from_compute_engine
  when :json_key
    get_auth_from_json_key
  when :application_default
    get_auth_from_application_default
  else
    raise ConfigError, "Unknown auth method: #{@auth_method}"
  end
end
get_auth_from_application_default() click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 278
def get_auth_from_application_default
  Google::Auth.get_application_default([@scope])
end
get_auth_from_compute_engine() click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 259
def get_auth_from_compute_engine
  Google::Auth::GCECredentials.new
end
get_auth_from_json_key() click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 263
def get_auth_from_json_key
  json_key = @auth_options[:json_key]

  begin
    JSON.parse(json_key)
    key = StringIO.new(json_key)
    Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: key, scope: @scope)
  rescue JSON::ParserError
    key = json_key
    File.open(json_key) do |f|
      Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: f, scope: @scope)
    end
  end
end
get_auth_from_private_key() click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 243
def get_auth_from_private_key
  require 'google/api_client/auth/key_utils'
  private_key_path = @auth_options[:private_key_path]
  private_key_passphrase = @auth_options[:private_key_passphrase]
  email = @auth_options[:email]

  key = Google::APIClient::KeyUtils.load_from_pkcs12(private_key_path, private_key_passphrase)
  Signet::OAuth2::Client.new(
    token_credential_uri: "https://accounts.google.com/o/oauth2/token",
    audience: "https://accounts.google.com/o/oauth2/token",
    scope: @scope,
    issuer: email,
    signing_key: key
  )
end
log() click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 224
def log
  @log
end
safe_table_id(table_id) click to toggle source
# File lib/fluent/plugin/bigquery/writer.rb, line 282
def safe_table_id(table_id)
  table_id.gsub(/\$\d+$/, "")
end