module Fluent::BigQueryOutput::LoadImplementation
Public Instance Methods
_write(chunk, table_id, template_suffix)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 508 def _write(chunk, table_id, template_suffix) res = nil job_id = nil create_upload_source(chunk) do |upload_source| configuration, job_id = load_configuration(table_id, template_suffix, upload_source) res = client.insert_job( @project, configuration, { upload_source: upload_source, content_type: "application/octet-stream", options: { timeout_sec: @request_timeout_sec, open_timeout_sec: @request_open_timeout_sec, } } ) end wait_load(res.job_reference.job_id) rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e # api_error? -> client cache clear @cached_client = nil reason = e.respond_to?(:reason) ? e.reason : nil log.error "job.insert API", project_id: @project, dataset: @dataset, table: table_id, code: e.status_code, message: e.message, reason: reason return wait_load(job_id) if job_id && e.status_code == 409 && e.message =~ /Job/ # duplicate load job if RETRYABLE_ERROR_REASON.include?(reason) || e.is_a?(Google::Apis::ServerError) raise "failed to insert into bigquery, retry" # TODO: error class elsif @secondary flush_secondary(@secondary) end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 493 def format(tag, time, record) fetch_schema if @template_suffix if @replace_record_key record = replace_record_key(record) end buf = String.new row = @fields.format(@add_time_field.call(record, time)) unless row.empty? buf << MultiJson.dump(row) + "\n" end buf end
Private Instance Methods
create_job_id(upload_source_path, dataset, table, schema, max_bad_records, ignore_unknown_values)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 634 def create_job_id(upload_source_path, dataset, table, schema, max_bad_records, ignore_unknown_values) "fluentd_job_" + Digest::SHA1.hexdigest("#{upload_source_path}#{dataset}#{table}#{schema.to_s}#{max_bad_records}#{ignore_unknown_values}") end
create_upload_source(chunk) { |file| ... }
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 617 def create_upload_source(chunk) chunk_is_file = @buffer_type == 'file' if chunk_is_file File.open(chunk.path) do |file| yield file end else Tempfile.open("chunk-tmp") do |file| file.binmode chunk.write_to(file) file.sync file.rewind yield file end end end
load_configuration(table_id, template_suffix, upload_source)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 547 def load_configuration(table_id, template_suffix, upload_source) job_id = nil if @prevent_duplicate_load job_id = create_job_id(upload_source.path, @dataset, "#{table_id}#{template_suffix}", @fields.to_a, @max_bad_records, @ignore_unknown_values) end configuration = { configuration: { load: { destination_table: { project_id: @project, dataset_id: @dataset, table_id: "#{table_id}#{template_suffix}", }, schema: { fields: @fields.to_a, }, write_disposition: "WRITE_APPEND", source_format: "NEWLINE_DELIMITED_JSON", ignore_unknown_values: @ignore_unknown_values, max_bad_records: @max_bad_records, } } } configuration.merge!({job_reference: {project_id: @project, job_id: job_id}}) if job_id # If target table is already exist, omit schema configuration. # Because schema changing is easier. begin if template_suffix && client.get_table(@project, @dataset, "#{table_id}#{template_suffix}") configuration[:configuration][:load].delete(:schema) end rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError raise "Schema is empty" if @fields.empty? end return configuration, job_id end
wait_load(job_id)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 586 def wait_load(job_id) wait_interval = 10 _response = client.get_job(@project, job_id) table_id = _response.configuration.load.destination_table.table_id until _response.status.state == "DONE" log.debug "wait for load job finish", state: _response.status.state, job_id: _response.job_reference.job_id sleep wait_interval _response = client.get_job(@project, _response.job_reference.job_id) end errors = _response.status.errors if errors errors.each do |e| log.error "job.insert API (rows)", project_id: @project, dataset: @dataset, table: table_id, message: e.message, reason: e.reason end end error_result = _response.status.error_result if error_result log.error "job.insert API (result)", project_id: @project, dataset: @dataset, table: table_id, message: error_result.message, reason: error_result.reason if RETRYABLE_ERROR_REASON.include?(error_result.reason) raise "failed to load into bigquery" elsif @secondary flush_secondary(@secondary) end end log.debug "finish load job", state: _response.status.state end