class Fluent::Plugin::BigQueryBaseOutput
This class is abstract class
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigquery_base.rb, line 84 def configure(conf) super case @auth_method when :private_key unless @email && @private_key_path raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'" end when :compute_engine # Do nothing when :json_key unless @json_key raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'" end when :application_default # Do nothing else raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}" end unless @table.nil? ^ @tables.nil? raise Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid" end @tablelist = @tables ? @tables : [@table] @table_schema = Fluent::BigQuery::RecordSchema.new('record') if @schema @table_schema.load_schema(@schema) end formatter_config = conf.elements("format")[0] @formatter = formatter_create(usage: 'out_bigquery_for_insert', default_type: 'json', conf: formatter_config) end
fetch_schema(metadata)
click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 187 def fetch_schema(metadata) table_id = nil project = extract_placeholders(@project, metadata) dataset = extract_placeholders(@dataset, metadata) table_id = fetch_schema_target_table(metadata) if Fluent::Engine.now - @last_fetch_schema_time["#{project}.#{dataset}.#{table_id}"] > @schema_cache_expire schema = writer.fetch_schema(project, dataset, table_id) if schema table_schema = Fluent::BigQuery::RecordSchema.new("record") table_schema.load_schema(schema) @fetched_schemas["#{project}.#{dataset}.#{table_id}"] = table_schema else if @fetched_schemas["#{project}.#{dataset}.#{table_id}"].nil? raise "failed to fetch schema from bigquery" else log.warn "#{table_id} uses previous schema" end end @last_fetch_schema_time["#{project}.#{dataset}.#{table_id}"] = Fluent::Engine.now end @fetched_schemas["#{project}.#{dataset}.#{table_id}"] end
fetch_schema_target_table(metadata)
click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 214 def fetch_schema_target_table(metadata) extract_placeholders(@fetch_schema_table || @tablelist[0], metadata) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 156 def format(tag, time, record) if record.nil? log.warn("nil record detected. corrupted chunks? tag=#{tag}, time=#{time}") return end record = inject_values_to_record(tag, time, record) meta = metadata(tag, time, record) schema = if @fetch_schema fetch_schema(meta) elsif @schema_path read_schema(meta) else @table_schema end begin row = schema.format(record, is_load: !!@is_load) return if row.empty? @formatter.format(tag, time, row) rescue log.error("format error", record: record, schema: schema) raise end end
get_schema(project, dataset, metadata)
click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 233 def get_schema(project, dataset, metadata) if @fetch_schema @fetched_schemas["#{project}.#{dataset}.#{fetch_schema_target_table(metadata)}"] || fetch_schema(metadata) elsif @schema_path @read_schemas[read_schema_target_path(metadata)] || read_schema(metadata) else @table_schema end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 129 def multi_workers_ready? true end
read_schema(metadata)
click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 218 def read_schema(metadata) schema_path = read_schema_target_path(metadata) unless @read_schemas[schema_path] table_schema = Fluent::BigQuery::RecordSchema.new("record") table_schema.load_schema(MultiJson.load(File.read(schema_path))) @read_schemas[schema_path] = table_schema end @read_schemas[schema_path] end
read_schema_target_path(metadata)
click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 229 def read_schema_target_path(metadata) extract_placeholders(@schema_path, metadata) end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigquery_base.rb, line 119 def start super @tables_queue = @tablelist.shuffle @tables_mutex = Mutex.new @fetched_schemas = {} @last_fetch_schema_time = Hash.new(0) @read_schemas = {} end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 184 def write(chunk) end
writer()
click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 133 def writer @writer ||= Fluent::BigQuery::Writer.new(@log, @auth_method, private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase, email: @email, json_key: @json_key, location: @location, source_format: @source_format, skip_invalid_rows: @skip_invalid_rows, ignore_unknown_values: @ignore_unknown_values, max_bad_records: @max_bad_records, allow_retry_insert_errors: @allow_retry_insert_errors, prevent_duplicate_load: @prevent_duplicate_load, auto_create_table: @auto_create_table, time_partitioning_type: @time_partitioning_type, time_partitioning_field: @time_partitioning_field, time_partitioning_expiration: @time_partitioning_expiration, require_partition_filter: @require_partition_filter, clustering_fields: @clustering_fields, timeout_sec: @request_timeout_sec, open_timeout_sec: @request_open_timeout_sec, ) end