class Fluent::Plugin::BigQueryInsertOutput

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/plugin/out_bigquery_insert.rb, line 47
def configure(conf)
  super
  @is_load = false

  if @insert_id_field
    if @insert_id_field !~ /^\$[\[\.]/ && @insert_id_field =~ /\./
      warn "[BREAKING CHANGE] insert_id_field format is changed. Use fluentd record_accessor helper. (https://docs.fluentd.org/v1.0/articles/api-plugin-helper-record_accessor)"
    end
    @get_insert_id = record_accessor_create(@insert_id_field)
  end

  formatter_config = conf.elements("format")[0]
  if formatter_config && formatter_config['@type'] != "json"
    raise ConfigError, "`bigquery_insert` supports only json formatter."
  end
  @formatter = formatter_create(usage: 'out_bigquery_for_insert', type: 'json', conf: formatter_config)

  placeholder_params = "project=#{@project}/dataset=#{@dataset}/table=#{@tablelist.join(",")}/fetch_schema_table=#{@fetch_schema_table}/template_suffix=#{@template_suffix}"
  placeholder_validate!(:bigquery_insert, placeholder_params)
end
format(tag, time, record) click to toggle source

for Fluent::Plugin::Output#implement? method

# File lib/fluent/plugin/out_bigquery_insert.rb, line 69
def format(tag, time, record)
  super
end
insert(project, dataset, table_id, rows, schema, template_suffix) click to toggle source
# File lib/fluent/plugin/out_bigquery_insert.rb, line 104
def insert(project, dataset, table_id, rows, schema, template_suffix)
  writer.insert_rows(project, dataset, table_id, rows, schema, template_suffix: template_suffix)
rescue Fluent::BigQuery::Error => e
  raise if e.retryable?

  if @secondary
    # TODO: find better way
    @retry = retry_state_create(
      :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout,
      forever: false, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base,
      max_interval: @buffer_config.retry_max_interval,
      secondary: true, secondary_threshold: Float::EPSILON,
      randomize: @buffer_config.retry_randomize
    )
  else
    @retry = retry_state_create(
      :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout,
      forever: false, max_steps: 0, backoff_base: @buffer_config.retry_exponential_backoff_base,
      max_interval: @buffer_config.retry_max_interval,
      randomize: @buffer_config.retry_randomize
    )
  end

  raise
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_bigquery_insert.rb, line 73
def write(chunk)
  table_format = @tables_mutex.synchronize do
    t = @tables_queue.shift
    @tables_queue.push t
    t
  end

  now = Time.now.utc.strftime("%Y-%m-%d %H:%M:%S.%6N") if @add_insert_timestamp

  rows = chunk.open do |io|
    io.map do |line|
      record = MultiJson.load(line)
      record[@add_insert_timestamp] = now if @add_insert_timestamp
      row = {"json" => record}
      row["insert_id"] = @get_insert_id.call(record) if @get_insert_id
      Fluent::BigQuery::Helper.deep_symbolize_keys(row)
    end
  end

  metadata = chunk.metadata
  project = extract_placeholders(@project, metadata)
  dataset = extract_placeholders(@dataset, metadata)
  table_id = extract_placeholders(table_format, metadata)
  template_suffix = @template_suffix ? extract_placeholders(@template_suffix, metadata) : nil
  schema = get_schema(project, dataset, metadata)

  insert(project, dataset, table_id, rows, schema, template_suffix)
rescue MultiJson::ParseError => e
  raise Fluent::UnrecoverableError.new(e)
end