module Fluent::BigQueryOutput::InsertImplementation

Public Instance Methods

_write(chunk, table_id, template_suffix) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 439
def _write(chunk, table_id, template_suffix)
  rows = []
  chunk.msgpack_each do |row_object|
    # TODO: row size limit
    rows << row_object.deep_symbolize_keys
  end

  body = {
    rows: rows,
    skip_invalid_rows: @skip_invalid_rows,
    ignore_unknown_values: @ignore_unknown_values,
  }
  body.merge!(template_suffix: template_suffix) if template_suffix
  res = client.insert_all_table_data(
    @project, @dataset, table_id, body, {
      options: {timeout_sec: @request_timeout_sec, open_timeout_sec: @request_open_timeout_sec}
    }
  )

  if res.insert_errors
    reasons = []
    res.insert_errors.each do |i|
      i.errors.each do |e|
        reasons << e.reason
        log.error "tabledata.insertAll API", project_id: @project, dataset: @dataset, table: table_id, message: e.message, reason: e.reason
      end
    end

    raise "failed to insert into bigquery, retry" if reasons.find { |r| RETRYABLE_ERROR_REASON.include?(r) }
    return if reasons.all? { |r| r == "invalid" } && @skip_invalid_rows
    flush_secondary(@secondary) if @secondary
  end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
  # api_error? -> client cache clear
  @cached_client = nil

  if @auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ e.message
    # Table Not Found: Auto Create Table
    create_table(table_id)
    raise "table created. send rows next time."
  end

  reason = e.respond_to?(:reason) ? e.reason : nil
  log.error "tabledata.insertAll API", project_id: @project, dataset: @dataset, table: table_id, code: e.status_code, message: e.message, reason: reason

  if RETRYABLE_ERROR_REASON.include?(reason) || e.is_a?(Google::Apis::ServerError)
    raise "failed to insert into bigquery, retry" # TODO: error class
  elsif @secondary
    flush_secondary(@secondary)
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 422
def format(tag, time, record)
  fetch_schema if @template_suffix

  if @replace_record_key
    record = replace_record_key(record)
  end

  buf = String.new
  row = @fields.format(@add_time_field.call(record, time))
  unless row.empty?
    row = {"json" => row}
    row["insert_id"] = @get_insert_id.call(record) if @get_insert_id
    buf << row.to_msgpack
  end
  buf
end