module Fluent::BigQueryOutput::LoadImplementation

Public Instance Methods

_write(chunk, table_id, template_suffix) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 508
def _write(chunk, table_id, template_suffix)
  res = nil
  job_id = nil

  create_upload_source(chunk) do |upload_source|
    configuration, job_id = load_configuration(table_id, template_suffix, upload_source)
    res = client.insert_job(
      @project,
      configuration,
      {
        upload_source: upload_source,
        content_type: "application/octet-stream",
        options: {
          timeout_sec: @request_timeout_sec,
          open_timeout_sec: @request_open_timeout_sec,
        }
      }
    )
  end

  wait_load(res.job_reference.job_id)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
  # api_error? -> client cache clear
  @cached_client = nil

  reason = e.respond_to?(:reason) ? e.reason : nil
  log.error "job.insert API", project_id: @project, dataset: @dataset, table: table_id, code: e.status_code, message: e.message, reason: reason

  return wait_load(job_id) if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job

  if RETRYABLE_ERROR_REASON.include?(reason) || e.is_a?(Google::Apis::ServerError)
    raise "failed to insert into bigquery, retry" # TODO: error class
  elsif @secondary
    flush_secondary(@secondary)
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 493
def format(tag, time, record)
  fetch_schema if @template_suffix

  if @replace_record_key
    record = replace_record_key(record)
  end

  buf = String.new
  row = @fields.format(@add_time_field.call(record, time))
  unless row.empty?
    buf << MultiJson.dump(row) + "\n"
  end
  buf
end

Private Instance Methods

create_job_id(upload_source_path, dataset, table, schema, max_bad_records, ignore_unknown_values) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 634
def create_job_id(upload_source_path, dataset, table, schema, max_bad_records, ignore_unknown_values)
  "fluentd_job_" + Digest::SHA1.hexdigest("#{upload_source_path}#{dataset}#{table}#{schema.to_s}#{max_bad_records}#{ignore_unknown_values}")
end
create_upload_source(chunk) { |file| ... } click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 617
def create_upload_source(chunk)
  chunk_is_file = @buffer_type == 'file'
  if chunk_is_file
    File.open(chunk.path) do |file|
      yield file
    end
  else
    Tempfile.open("chunk-tmp") do |file|
      file.binmode
      chunk.write_to(file)
      file.sync
      file.rewind
      yield file
    end
  end
end
load_configuration(table_id, template_suffix, upload_source) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 547
def load_configuration(table_id, template_suffix, upload_source)
  job_id = nil
  if @prevent_duplicate_load
    job_id = create_job_id(upload_source.path, @dataset, "#{table_id}#{template_suffix}", @fields.to_a, @max_bad_records, @ignore_unknown_values)
  end

  configuration = {
    configuration: {
      load: {
        destination_table: {
          project_id: @project,
          dataset_id: @dataset,
          table_id: "#{table_id}#{template_suffix}",
        },
        schema: {
          fields: @fields.to_a,
        },
        write_disposition: "WRITE_APPEND",
        source_format: "NEWLINE_DELIMITED_JSON",
        ignore_unknown_values: @ignore_unknown_values,
        max_bad_records: @max_bad_records,
      }
    }
  }
  configuration.merge!({job_reference: {project_id: @project, job_id: job_id}}) if job_id

  # If target table is already exist, omit schema configuration.
  # Because schema changing is easier.
  begin
    if template_suffix && client.get_table(@project, @dataset, "#{table_id}#{template_suffix}")
      configuration[:configuration][:load].delete(:schema)
    end
  rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError
    raise "Schema is empty" if @fields.empty?
  end

  return configuration, job_id
end
wait_load(job_id) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 586
def wait_load(job_id)
  wait_interval = 10
  _response = client.get_job(@project, job_id)
  table_id = _response.configuration.load.destination_table.table_id

  until _response.status.state == "DONE"
    log.debug "wait for load job finish", state: _response.status.state, job_id: _response.job_reference.job_id
    sleep wait_interval
    _response = client.get_job(@project, _response.job_reference.job_id)
  end

  errors = _response.status.errors
  if errors
    errors.each do |e|
      log.error "job.insert API (rows)", project_id: @project, dataset: @dataset, table: table_id, message: e.message, reason: e.reason
    end
  end

  error_result = _response.status.error_result
  if error_result
    log.error "job.insert API (result)", project_id: @project, dataset: @dataset, table: table_id, message: error_result.message, reason: error_result.reason
    if RETRYABLE_ERROR_REASON.include?(error_result.reason)
      raise "failed to load into bigquery"
    elsif @secondary
      flush_secondary(@secondary)
    end
  end

  log.debug "finish load job", state: _response.status.state
end