class Fluent::BigQueryOutput
TODO: error classes for each api error responses
class BigQueryAPIError < StandardError end
Constants
- REGEXP_MAX_NUM
TODO: record field stream inserts doesn't works well? At table creation, table type json + field type record -> field type validation fails At streaming inserts, schema cannot be specified
config_param :field_record, :string, defualt: nil config_param :optional_data_field, :string, default: nil
- RETRYABLE_ERROR_REASON
developers.google.com/bigquery/browser-tool-quickstart developers.google.com/bigquery/bigquery-api-quickstart
Public Class Methods
Table types
developers.google.com/bigquery/docs/tables
type - The following data types are supported; see Data Formats for details on each data type: STRING INTEGER FLOAT BOOLEAN RECORD A JSON object, used when importing nested records. This type is only available when using JSON source files.
mode - Whether a field can be null. The following values are supported: NULLABLE - The cell can be null. REQUIRED - The cell cannot be null. REPEATED - Zero or more repeated simple or nested subfields. This mode is only supported when using JSON source files.
# File lib/fluent/plugin/out_bigquery.rb, line 165 def initialize super require 'json' require 'google/apis/bigquery_v2' require 'googleauth' require 'active_support/json' require 'active_support/core_ext/hash' require 'active_support/core_ext/object/json' # MEMO: signet-0.6.1 depend on Farady.default_connection Faraday.default_connection.options.timeout = 60 end
Public Instance Methods
# File lib/fluent/plugin/out_bigquery.rb, line 285 def client return @cached_client if @cached_client && @cached_client_expiration > Time.now client = Google::Apis::BigqueryV2::BigqueryService.new scope = "https://www.googleapis.com/auth/bigquery" case @auth_method when 'private_key' require 'google/api_client/auth/key_utils' key = Google::APIClient::KeyUtils.load_from_pkcs12(@private_key_path, @private_key_passphrase) auth = Signet::OAuth2::Client.new( token_credential_uri: "https://accounts.google.com/o/oauth2/token", audience: "https://accounts.google.com/o/oauth2/token", scope: scope, issuer: @email, signing_key: key) when 'compute_engine' auth = Google::Auth::GCECredentials.new when 'json_key' if File.exist?(@json_key) auth = File.open(@json_key) do |f| Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: f, scope: scope) end else key = StringIO.new(@json_key) auth = Google::Auth::ServiceAccountCredentials.make_creds(json_key_io: key, scope: scope) end when 'application_default' auth = Google::Auth.get_application_default([scope]) else raise ConfigError, "Unknown auth method: #{@auth_method}" end client.authorization = auth @cached_client_expiration = Time.now + 1800 @cached_client = client end
# File lib/fluent/plugin/out_bigquery.rb, line 183 def configure(conf) if conf["method"] == "load" configure_for_load(conf) else configure_for_insert(conf) end super if @method == "insert" extend(InsertImplementation) elsif @method == "load" require 'digest/sha1' extend(LoadImplementation) else raise Fluend::ConfigError, "'method' must be 'insert' or 'load'" end case @auth_method when 'private_key' unless @email && @private_key_path raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'" end when 'compute_engine' # Do nothing when 'json_key' unless @json_key raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'" end when 'application_default' # Do nothing else raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}" end unless @table.nil? ^ @tables.nil? raise Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid" end @tablelist = @tables ? @tables.split(',') : [@table] @fields = RecordSchema.new('record') if @schema_path @fields.load_schema(JSON.parse(File.read(@schema_path))) end types = %w(string integer float boolean timestamp) types.each do |type| raw_fields = instance_variable_get("@field_#{type}") next unless raw_fields raw_fields.split(',').each do |field| @fields.register_field field.strip, type.to_sym end end @regexps = {} (1..REGEXP_MAX_NUM).each do |i| next unless conf["replace_record_key_regexp#{i}"] regexp, replacement = conf["replace_record_key_regexp#{i}"].split(/ /, 2) raise ConfigError, "replace_record_key_regexp#{i} does not contain 2 parameters" unless replacement raise ConfigError, "replace_record_key_regexp#{i} contains a duplicated key, #{regexp}" if @regexps[regexp] @regexps[regexp] = replacement end @localtime = false if @localtime.nil? && @utc @timef = TimeFormatter.new(@time_format, @localtime) if @time_field keys = @time_field.split('.') last_key = keys.pop @add_time_field = ->(record, time) { keys.inject(record) { |h, k| h[k] ||= {} }[last_key] = @timef.format(time) record } else @add_time_field = ->(record, time) { record } end if @insert_id_field insert_id_keys = @insert_id_field.split('.') @get_insert_id = ->(record) { insert_id_keys.inject(record) {|h, k| h[k] } } else @get_insert_id = nil end end
default for insert
# File lib/fluent/plugin/out_bigquery.rb, line 24 def configure_for_insert(conf) raise ConfigError unless conf["method"] != "load" conf["buffer_type"] = "lightening" unless conf["buffer_type"] conf["flush_interval"] = 0.25 unless conf["flush_interval"] conf["try_flush_interval"] = 0.05 unless conf["try_flush_interval"] conf["buffer_chunk_limit"] = 1 * 1024 ** 2 unless conf["buffer_chunk_limit"] # 1MB conf["buffer_queue_limit"] = 1024 unless conf["buffer_queue_limit"] conf["buffer_chunk_records_limit"] = 500 unless conf["buffer_chunk_records_limit"] end
default for loads
# File lib/fluent/plugin/out_bigquery.rb, line 36 def configure_for_load(conf) raise ConfigError unless conf["method"] == "load" # buffer_type, flush_interval, try_flush_interval is TimeSlicedOutput default conf["buffer_chunk_limit"] = 1 * 1024 ** 3 unless conf["buffer_chunk_limit"] # 1GB conf["buffer_queue_limit"] = 64 unless conf["buffer_queue_limit"] end
# File lib/fluent/plugin/out_bigquery.rb, line 342 def create_table(table_id) client.insert_table(@project, @dataset, { table_reference: { table_id: table_id, }, schema: { fields: @fields.to_a, } }, {}) rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e # api_error? -> client cache clear @cached_client = nil message = e.message if e.status_code == 409 && /Already Exists:/ =~ message # ignore 'Already Exists' error return end log.error "tables.insert API", :project_id => @project, :dataset => @dataset, :table => table_id, :code => e.status_code, :message => message raise "failed to create table in bigquery" # TODO: error class end
# File lib/fluent/plugin/out_bigquery.rb, line 388 def fetch_schema(allow_overwrite = true) table_id = nil @fetch_schema_mutex.synchronize do if Fluent::Engine.now - @last_fetch_schema_time > @schema_cache_expire table_id_format = @tablelist[0] table_id = generate_table_id(table_id_format, Time.at(Fluent::Engine.now)) res = client.get_table(@project, @dataset, table_id) schema = res.schema.fields.as_json log.debug "Load schema from BigQuery: #{@project}:#{@dataset}.#{table_id} #{schema}" if allow_overwrite fields = RecordSchema.new("record") fields.load_schema(schema, allow_overwrite) @fields = fields else @fields.load_schema(schema, allow_overwrite) end @last_fetch_schema_time = Fluent::Engine.now end end rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e # api_error? -> client cache clear @cached_client = nil message = e.message log.error "tables.get API", project_id: @project, dataset: @dataset, table: table_id, code: e.status_code, message: message if @fields.empty? raise "failed to fetch schema from bigquery" # TODO: error class else log.warn "Use previous schema" @last_fetch_schema_time = Fluent::Engine.now end end
# File lib/fluent/plugin/out_bigquery.rb, line 329 def generate_table_id(table_id_format, current_time, chunk = nil) table_id = current_time.strftime(table_id_format) if chunk table_id.gsub(%r(%{time_slice})) { |expr| chunk.key } else table_id.gsub(%r(%{time_slice})) { |expr| current_time.strftime(@time_slice_format) } end end
# File lib/fluent/plugin/out_bigquery.rb, line 364 def replace_record_key(record) new_record = {} record.each do |key, _| new_key = key @regexps.each do |regexp, replacement| new_key = new_key.gsub(/#{regexp}/, replacement) end new_key = new_key.gsub(/\W/, '') new_record.store(new_key, record[key]) end new_record end
# File lib/fluent/plugin/out_bigquery.rb, line 271 def start super @cached_client = nil @cached_client_expiration = nil @tables_queue = @tablelist.dup.shuffle @tables_mutex = Mutex.new @fetch_schema_mutex = Mutex.new @last_fetch_schema_time = 0 fetch_schema(false) if @fetch_schema end
# File lib/fluent/plugin/out_bigquery.rb, line 377 def write(chunk) table_id_format = @tables_mutex.synchronize do t = @tables_queue.shift @tables_queue.push t t end table_id = generate_table_id(table_id_format, Time.at(Fluent::Engine.now), chunk) template_suffix = @template_suffix ? generate_table_id(@template_suffix, Time.at(Fluent::Engine.now), chunk) : nil _write(chunk, table_id, template_suffix) end