class Fluent::Plugin::BigQueryInsertOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::BigQueryBaseOutput#configure
# File lib/fluent/plugin/out_bigquery_insert.rb, line 47 def configure(conf) super @is_load = false if @insert_id_field if @insert_id_field !~ /^\$[\[\.]/ && @insert_id_field =~ /\./ warn "[BREAKING CHANGE] insert_id_field format is changed. Use fluentd record_accessor helper. (https://docs.fluentd.org/v1.0/articles/api-plugin-helper-record_accessor)" end @get_insert_id = record_accessor_create(@insert_id_field) end formatter_config = conf.elements("format")[0] if formatter_config && formatter_config['@type'] != "json" raise ConfigError, "`bigquery_insert` supports only json formatter." end @formatter = formatter_create(usage: 'out_bigquery_for_insert', type: 'json', conf: formatter_config) placeholder_params = "project=#{@project}/dataset=#{@dataset}/table=#{@tablelist.join(",")}/fetch_schema_table=#{@fetch_schema_table}/template_suffix=#{@template_suffix}" placeholder_validate!(:bigquery_insert, placeholder_params) end
format(tag, time, record)
click to toggle source
for Fluent::Plugin::Output#implement? method
Calls superclass method
Fluent::Plugin::BigQueryBaseOutput#format
# File lib/fluent/plugin/out_bigquery_insert.rb, line 69 def format(tag, time, record) super end
insert(project, dataset, table_id, rows, schema, template_suffix)
click to toggle source
# File lib/fluent/plugin/out_bigquery_insert.rb, line 104 def insert(project, dataset, table_id, rows, schema, template_suffix) writer.insert_rows(project, dataset, table_id, rows, schema, template_suffix: template_suffix) rescue Fluent::BigQuery::Error => e raise if e.retryable? if @secondary # TODO: find better way @retry = retry_state_create( :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout, forever: false, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base, max_interval: @buffer_config.retry_max_interval, secondary: true, secondary_threshold: Float::EPSILON, randomize: @buffer_config.retry_randomize ) else @retry = retry_state_create( :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout, forever: false, max_steps: 0, backoff_base: @buffer_config.retry_exponential_backoff_base, max_interval: @buffer_config.retry_max_interval, randomize: @buffer_config.retry_randomize ) end raise end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_bigquery_insert.rb, line 73 def write(chunk) table_format = @tables_mutex.synchronize do t = @tables_queue.shift @tables_queue.push t t end now = Time.now.utc.strftime("%Y-%m-%d %H:%M:%S.%6N") if @add_insert_timestamp rows = chunk.open do |io| io.map do |line| record = MultiJson.load(line) record[@add_insert_timestamp] = now if @add_insert_timestamp row = {"json" => record} row["insert_id"] = @get_insert_id.call(record) if @get_insert_id Fluent::BigQuery::Helper.deep_symbolize_keys(row) end end metadata = chunk.metadata project = extract_placeholders(@project, metadata) dataset = extract_placeholders(@dataset, metadata) table_id = extract_placeholders(table_format, metadata) template_suffix = @template_suffix ? extract_placeholders(@template_suffix, metadata) : nil schema = get_schema(project, dataset, metadata) insert(project, dataset, table_id, rows, schema, template_suffix) rescue MultiJson::ParseError => e raise Fluent::UnrecoverableError.new(e) end