class Fluent::BigQueryOutput
Constants
- REGEXP_MAX_NUM
TODO: record field stream inserts doesn't works well? At table creation, table type json + field type record -> field type validation fails At streaming inserts, schema cannot be specified
config_param :field_record, :string, defualt: nil config_param :optional_data_field, :string, default: nil
Public Class Methods
new()
click to toggle source
Table types
developers.google.com/bigquery/docs/tables
type - The following data types are supported; see Data Formats for details on each data type: STRING INTEGER FLOAT BOOLEAN RECORD A JSON object, used when importing nested records. This type is only available when using JSON source files.
mode - Whether a field can be null. The following values are supported: NULLABLE - The cell can be null. REQUIRED - The cell cannot be null. REPEATED - Zero or more repeated simple or nested subfields. This mode is only supported when using JSON source files.
Calls superclass method
# File lib/fluent/plugin/out_bigquery.rb, line 166 def initialize super require 'multi_json' require 'google/apis/bigquery_v2' require 'googleauth' require 'active_support/json' require 'active_support/core_ext/hash' require 'active_support/core_ext/object/json' # MEMO: signet-0.6.1 depend on Farady.default_connection Faraday.default_connection.options.timeout = 60 end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigquery.rb, line 179 def configure(conf) if conf["method"] == "load" configure_for_load(conf) else configure_for_insert(conf) end super case @method when :insert extend(InsertImplementation) when :load raise Fluent::ConfigError, "'template_suffix' is for only `insert` mode, instead use 'fetch_schema_table' and formatted table name" if @template_suffix extend(LoadImplementation) else raise Fluent::ConfigError "'method' must be 'insert' or 'load'" end case @auth_method when :private_key unless @email && @private_key_path raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'" end when :compute_engine # Do nothing when :json_key unless @json_key raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'" end when :application_default # Do nothing else raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}" end unless @table.nil? ^ @tables.nil? raise Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid" end @tablelist = @tables ? @tables.split(',') : [@table] legacy_schema_config_deprecation @fields = Fluent::BigQuery::RecordSchema.new('record') if @schema @fields.load_schema(@schema) end if @schema_path @fields.load_schema(MultiJson.load(File.read(@schema_path))) end types = %w(string integer float boolean timestamp) types.each do |type| raw_fields = instance_variable_get("@field_#{type}") next unless raw_fields raw_fields.split(',').each do |field| @fields.register_field field.strip, type.to_sym end end @regexps = {} (1..REGEXP_MAX_NUM).each do |i| next unless conf["replace_record_key_regexp#{i}"] regexp, replacement = conf["replace_record_key_regexp#{i}"].split(/ /, 2) raise ConfigError, "replace_record_key_regexp#{i} does not contain 2 parameters" unless replacement raise ConfigError, "replace_record_key_regexp#{i} contains a duplicated key, #{regexp}" if @regexps[regexp] @regexps[regexp] = replacement end @localtime = false if @localtime.nil? && @utc @timef = TimeFormatter.new(@time_format, @localtime) if @time_field keys = @time_field.split('.') last_key = keys.pop @add_time_field = ->(record, time) { keys.inject(record) { |h, k| h[k] ||= {} }[last_key] = @timef.format(time) record } else @add_time_field = ->(record, time) { record } end if @insert_id_field insert_id_keys = @insert_id_field.split('.') @get_insert_id = ->(record) { insert_id_keys.inject(record) {|h, k| h[k] } } else @get_insert_id = nil end warn "[DEPRECATION] `convert_hash_to_json` param is deprecated. If Hash value is inserted string field, plugin convert it to json automatically." if @convert_hash_to_json end
configure_for_insert(conf)
click to toggle source
default for insert
# File lib/fluent/plugin/out_bigquery.rb, line 22 def configure_for_insert(conf) raise ConfigError unless conf["method"] != "load" conf["buffer_type"] = "lightening" unless conf["buffer_type"] conf["flush_interval"] = 0.25 unless conf["flush_interval"] conf["try_flush_interval"] = 0.05 unless conf["try_flush_interval"] conf["buffer_chunk_limit"] = 1 * 1024 ** 2 unless conf["buffer_chunk_limit"] # 1MB conf["buffer_queue_limit"] = 1024 unless conf["buffer_queue_limit"] conf["buffer_chunk_records_limit"] = 500 unless conf["buffer_chunk_records_limit"] end
configure_for_load(conf)
click to toggle source
default for loads
# File lib/fluent/plugin/out_bigquery.rb, line 34 def configure_for_load(conf) raise ConfigError unless conf["method"] == "load" # buffer_type, flush_interval, try_flush_interval is TimeSlicedOutput default conf["buffer_chunk_limit"] = 1 * 1024 ** 3 unless conf["buffer_chunk_limit"] # 1GB conf["buffer_queue_limit"] = 32 unless conf["buffer_queue_limit"] end
convert_hash_to_json(record)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 333 def convert_hash_to_json(record) record.each do |key, value| if value.class == Hash record[key] = MultiJson.dump(value) end end record end
fetch_schema(allow_overwrite = true)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 358 def fetch_schema(allow_overwrite = true) table_id = nil @fetch_schema_mutex.synchronize do if Fluent::Engine.now - @last_fetch_schema_time > @schema_cache_expire table_id_format = @fetch_schema_table || @tablelist[0] table_id = generate_table_id(table_id_format, Time.at(Fluent::Engine.now)) schema = writer.fetch_schema(@project, @dataset, table_id) if schema if allow_overwrite fields = Fluent::BigQuery::RecordSchema.new("record") fields.load_schema(schema, allow_overwrite) @fields = fields else @fields.load_schema(schema, allow_overwrite) end else if @fields.empty? raise "failed to fetch schema from bigquery" else log.warn "#{table_id} uses previous schema" end end @last_fetch_schema_time = Fluent::Engine.now end end end
generate_table_id(table_id_format, current_time, row = nil, chunk = nil)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 293 def generate_table_id(table_id_format, current_time, row = nil, chunk = nil) format, col = table_id_format.split(/@/) time = if col && row keys = col.split('.') t = keys.inject(row[:json]) {|obj, attr| obj[attr.to_sym] } Time.at(t) else current_time end if row && format =~ /\$\{/ format.gsub!(/\$\{\s*(\w+)\s*\}/) do |m| row[:json][$1.to_sym].to_s.gsub(/[^\w]/, '') end end table_id = time.strftime(format) if chunk table_id.gsub(%r(%{time_slice})) { |expr| chunk.key } else table_id.gsub(%r(%{time_slice})) { |expr| current_time.strftime(@time_slice_format) } end end
legacy_schema_config_deprecation()
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 342 def legacy_schema_config_deprecation if [@field_string, @field_integer, @field_float, @field_boolean, @field_timestamp].any? warn "[DEPRECATION] `field_*` style schema config is deprecated. Instead of it, use `schema` config params that is array of json style." end end
replace_record_key(record)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 320 def replace_record_key(record) new_record = {} record.each do |key, _| new_key = key @regexps.each do |regexp, replacement| new_key = new_key.gsub(/#{regexp}/, replacement) end new_key = new_key.gsub(/\W/, '') new_record.store(new_key, record[key]) end new_record end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigquery.rb, line 274 def start super @tables_queue = @tablelist.dup.shuffle @tables_mutex = Mutex.new @fetch_schema_mutex = Mutex.new @last_fetch_schema_time = 0 fetch_schema(false) if @fetch_schema end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 348 def write(chunk) table_id_format = @tables_mutex.synchronize do t = @tables_queue.shift @tables_queue.push t t end template_suffix_format = @template_suffix _write(chunk, table_id_format, template_suffix_format) end
writer()
click to toggle source
# File lib/fluent/plugin/out_bigquery.rb, line 285 def writer @writer ||= Fluent::BigQuery::Writer.new(@log, @auth_method, { private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase, email: @email, json_key: @json_key, }) end