module Fluent::BigQueryOutput::LoadImplementation
Public Instance Methods
_write(chunk, table_id_format, _)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 468 def _write(chunk, table_id_format, _) now = Time.at(Fluent::Engine.now) table_id = generate_table_id(table_id_format, now, nil, chunk) load(chunk, table_id) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 449 def format(tag, time, record) fetch_schema if @fetch_schema_table if @replace_record_key record = replace_record_key(record) end if @convert_hash_to_json record = convert_hash_to_json(record) end buf = String.new row = @fields.format(@add_time_field.call(record, time)) unless row.empty? buf << MultiJson.dump(row) + "\n" end buf end
load(chunk, table_id)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 474 def load(chunk, table_id) res = nil create_upload_source(chunk) do |upload_source| res = writer.create_load_job(chunk.unique_id, @project, @dataset, table_id, upload_source, @fields, { prevent_duplicate_load: @prevent_duplicate_load, ignore_unknown_values: @ignore_unknown_values, max_bad_records: @max_bad_records, timeout_sec: @request_timeout_sec, open_timeout_sec: @request_open_timeout_sec, auto_create_table: @auto_create_table, time_partitioning_type: @time_partitioning_type, time_partitioning_expiration: @time_partitioning_expiration }) end rescue Fluent::BigQuery::Error => e if e.retryable? raise e elsif @secondary flush_secondary(@secondary) end end
Private Instance Methods
create_upload_source(chunk) { |file| ... }
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 495 def create_upload_source(chunk) chunk_is_file = @buffer_type == 'file' if chunk_is_file File.open(chunk.path) do |file| yield file end else Tempfile.open("chunk-tmp") do |file| file.binmode chunk.write_to(file) file.sync file.rewind yield file end end end