module Fluent::BigQueryOutput::InsertImplementation
Public Instance Methods
_write(chunk, table_id, template_suffix)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 439 def _write(chunk, table_id, template_suffix) rows = [] chunk.msgpack_each do |row_object| # TODO: row size limit rows << row_object.deep_symbolize_keys end body = { rows: rows, skip_invalid_rows: @skip_invalid_rows, ignore_unknown_values: @ignore_unknown_values, } body.merge!(template_suffix: template_suffix) if template_suffix res = client.insert_all_table_data( @project, @dataset, table_id, body, { options: {timeout_sec: @request_timeout_sec, open_timeout_sec: @request_open_timeout_sec} } ) if res.insert_errors reasons = [] res.insert_errors.each do |i| i.errors.each do |e| reasons << e.reason log.error "tabledata.insertAll API", project_id: @project, dataset: @dataset, table: table_id, message: e.message, reason: e.reason end end raise "failed to insert into bigquery, retry" if reasons.find { |r| RETRYABLE_ERROR_REASON.include?(r) } return if reasons.all? { |r| r == "invalid" } && @skip_invalid_rows flush_secondary(@secondary) if @secondary end rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e # api_error? -> client cache clear @cached_client = nil if @auto_create_table && e.status_code == 404 && /Not Found: Table/i =~ e.message # Table Not Found: Auto Create Table create_table(table_id) raise "table created. send rows next time." end reason = e.respond_to?(:reason) ? e.reason : nil log.error "tabledata.insertAll API", project_id: @project, dataset: @dataset, table: table_id, code: e.status_code, message: e.message, reason: reason if RETRYABLE_ERROR_REASON.include?(reason) || e.is_a?(Google::Apis::ServerError) raise "failed to insert into bigquery, retry" # TODO: error class elsif @secondary flush_secondary(@secondary) end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 422 def format(tag, time, record) fetch_schema if @template_suffix if @replace_record_key record = replace_record_key(record) end buf = String.new row = @fields.format(@add_time_field.call(record, time)) unless row.empty? row = {"json" => row} row["insert_id"] = @get_insert_id.call(record) if @get_insert_id buf << row.to_msgpack end buf end