module Fluent::BigQueryOutput::LoadImplementation

Public Instance Methods

_write(chunk, table_id_format, _) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 468
def _write(chunk, table_id_format, _)
  now = Time.at(Fluent::Engine.now)
  table_id = generate_table_id(table_id_format, now, nil, chunk)
  load(chunk, table_id)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 449
def format(tag, time, record)
  fetch_schema if @fetch_schema_table

  if @replace_record_key
    record = replace_record_key(record)
  end

  if @convert_hash_to_json
    record = convert_hash_to_json(record)
  end

  buf = String.new
  row = @fields.format(@add_time_field.call(record, time))
  unless row.empty?
    buf << MultiJson.dump(row) + "\n"
  end
  buf
end
load(chunk, table_id) click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 474
def load(chunk, table_id)
  res = nil

  create_upload_source(chunk) do |upload_source|
    res = writer.create_load_job(chunk.unique_id, @project, @dataset, table_id, upload_source, @fields, {
      prevent_duplicate_load: @prevent_duplicate_load,
      ignore_unknown_values: @ignore_unknown_values, max_bad_records: @max_bad_records,
      timeout_sec: @request_timeout_sec,  open_timeout_sec: @request_open_timeout_sec, auto_create_table: @auto_create_table,
      time_partitioning_type: @time_partitioning_type, time_partitioning_expiration: @time_partitioning_expiration
    })
  end
rescue Fluent::BigQuery::Error => e
  if e.retryable?
    raise e
  elsif @secondary
    flush_secondary(@secondary)
  end
end

Private Instance Methods

create_upload_source(chunk) { |file| ... } click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 495
def create_upload_source(chunk)
  chunk_is_file = @buffer_type == 'file'
  if chunk_is_file
    File.open(chunk.path) do |file|
      yield file
    end
  else
    Tempfile.open("chunk-tmp") do |file|
      file.binmode
      chunk.write_to(file)
      file.sync
      file.rewind
      yield file
    end
  end
end