class Embulk::Output::Bigquery
Constants
- FILE_WRITER_KEY
Public Class Methods
add_file_writer(file_writer)
click to toggle source
# File lib/embulk/output/bigquery.rb, line 448 def self.add_file_writer(file_writer) @file_writers_mutex.synchronize do @file_writers << file_writer end end
auto_create(task, bigquery)
click to toggle source
# File lib/embulk/output/bigquery.rb, line 292 def self.auto_create(task, bigquery) if task['auto_create_dataset'] bigquery.create_dataset(task['dataset']) else bigquery.get_dataset(task['dataset']) # raises NotFoundError end if task['mode'] == 'replace_backup' and task['dataset_old'] != task['dataset'] if task['auto_create_dataset'] bigquery.create_dataset(task['dataset_old'], reference: task['dataset']) else bigquery.get_dataset(task['dataset_old']) # raises NotFoundError end end temp_table_expiration = task['temporary_table_expiration'] temp_options = {'expiration_time' => temp_table_expiration} case task['mode'] when 'delete_in_advance' bigquery.delete_table_or_partition(task['table']) bigquery.create_table_if_not_exists(task['table']) when 'replace' bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options) bigquery.create_table_if_not_exists(task['table']) # needs for when task['table'] is a partition when 'append' bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options) bigquery.create_table_if_not_exists(task['table']) # needs for when task['table'] is a partition when 'replace_backup' bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options) bigquery.create_table_if_not_exists(task['table']) bigquery.create_table_if_not_exists(task['table_old'], dataset: task['dataset_old']) # needs for when a partition else # append_direct if task['auto_create_table'] bigquery.create_table_if_not_exists(task['table']) else bigquery.get_table(task['table']) # raises NotFoundError end end end
bigquery()
click to toggle source
# File lib/embulk/output/bigquery.rb, line 255 def self.bigquery @bigquery end
configure(config, schema, task_count)
click to toggle source
# File lib/embulk/output/bigquery.rb, line 33 def self.configure(config, schema, task_count) task = { 'mode' => config.param('mode', :string, :default => 'append'), 'auth_method' => config.param('auth_method', :string, :default => 'application_default'), 'json_keyfile' => config.param('json_keyfile', LocalFile, :default => nil), 'project' => config.param('project', :string, :default => nil), 'destination_project' => config.param('destination_project', :string, :default => nil), 'dataset' => config.param('dataset', :string), 'location' => config.param('location', :string, :default => nil), 'table' => config.param('table', :string), 'dataset_old' => config.param('dataset_old', :string, :default => nil), 'table_old' => config.param('table_old', :string, :default => nil), 'table_name_old' => config.param('table_name_old', :string, :default => nil), # lower version compatibility 'auto_create_dataset' => config.param('auto_create_dataset', :bool, :default => false), 'auto_create_table' => config.param('auto_create_table', :bool, :default => true), 'schema_file' => config.param('schema_file', :string, :default => nil), 'template_table' => config.param('template_table', :string, :default => nil), 'delete_from_local_when_job_end' => config.param('delete_from_local_when_job_end', :bool, :default => true), 'job_status_max_polling_time' => config.param('job_status_max_polling_time', :integer, :default => 3600), 'job_status_polling_interval' => config.param('job_status_polling_interval', :integer, :default => 10), 'is_skip_job_result_check' => config.param('is_skip_job_result_check', :bool, :default => false), 'with_rehearsal' => config.param('with_rehearsal', :bool, :default => false), 'rehearsal_counts' => config.param('rehearsal_counts', :integer, :default => 1000), 'abort_on_error' => config.param('abort_on_error', :bool, :default => nil), 'progress_log_interval' => config.param('progress_log_interval', :float, :default => nil), 'column_options' => config.param('column_options', :array, :default => []), 'default_timezone' => config.param('default_timezone', :string, :default => ValueConverterFactory::DEFAULT_TIMEZONE), 'default_timestamp_format' => config.param('default_timestamp_format', :string, :default => ValueConverterFactory::DEFAULT_TIMESTAMP_FORMAT), 'payload_column' => config.param('payload_column', :string, :default => nil), 'payload_column_index' => config.param('payload_column_index', :integer, :default => nil), 'description' => config.param('description', :string, :default => nil), 'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => nil), 'timeout_sec' => config.param('timeout_sec', :integer, :default => nil), # google-api-ruby-client < v0.11.0 'send_timeout_sec' => config.param('send_timeout_sec', :integer, :default => nil), # google-api-ruby-client >= v0.11.0 'read_timeout_sec' => config.param('read_timeout_sec', :integer, :default => nil), # google-api-ruby-client >= v0.11.0 'retries' => config.param('retries', :integer, :default => 5), 'application_name' => config.param('application_name', :string, :default => 'Embulk BigQuery plugin'), 'sdk_log_level' => config.param('sdk_log_level', :string, :default => nil), 'path_prefix' => config.param('path_prefix', :string, :default => nil), 'sequence_format' => config.param('sequence_format', :string, :default => '.%d.%d'), 'file_ext' => config.param('file_ext', :string, :default => nil), 'skip_file_generation' => config.param('skip_file_generation', :bool, :default => false), 'compression' => config.param('compression', :string, :default => 'NONE'), 'gcs_bucket' => config.param('gcs_bucket', :string, :default => nil), 'auto_create_gcs_bucket' => config.param('auto_create_gcs_bucket', :bool, :default => false), 'source_format' => config.param('source_format', :string, :default => 'CSV'), 'max_bad_records' => config.param('max_bad_records', :integer, :default => 0), 'field_delimiter' => config.param('field_delimiter', :string, :default => ','), 'encoding' => config.param('encoding', :string, :default => 'UTF-8'), 'ignore_unknown_values' => config.param('ignore_unknown_values', :bool, :default => false), 'allow_quoted_newlines' => config.param('allow_quoted_newlines', :bool, :default => false), 'time_partitioning' => config.param('time_partitioning', :hash, :default => nil), 'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0 'schema_update_options' => config.param('schema_update_options', :array, :default => nil), 'temporary_table_expiration' => config.param('temporary_table_expiration', :integer, :default => nil), # for debug 'skip_load' => config.param('skip_load', :bool, :default => false), 'temp_table' => config.param('temp_table', :string, :default => nil), 'rehearsal_table' => config.param('rehearsal_table', :string, :default => nil), } now = Time.now task['mode'] = task['mode'].downcase unless %w[append append_direct replace delete_in_advance replace_backup].include?(task['mode']) raise ConfigError.new "`mode` must be one of append, append_direct, replace, delete_in_advance, replace_backup" end if %w[append replace delete_in_advance replace_backup].include?(task['mode']) and !task['auto_create_table'] raise ConfigError.new "`mode: #{task['mode']}` requires `auto_create_table: true`" end if task['mode'] == 'replace_backup' task['table_old'] ||= task['table_name_old'] # for lower version compatibility if task['dataset_old'].nil? and task['table_old'].nil? raise ConfigError.new "`mode: replace_backup` requires either of `dataset_old` or `table_old`" end task['dataset_old'] ||= task['dataset'] task['table_old'] ||= task['table'] end if task['table_old'] task['table_old'] = now.strftime(task['table_old']) end if task['table'] task['table'] = now.strftime(task['table']) end task['auth_method'] = task['auth_method'].downcase unless %w[json_key service_account authorized_user compute_engine application_default].include?(task['auth_method']) raise ConfigError.new "`auth_method` must be one of service_account (or json_key), authorized_user, compute_engine, application_default" end if (task['auth_method'] == 'service_account' or task['auth_method'] == 'json_key') and task['json_keyfile'].nil? raise ConfigError.new "`json_keyfile` is required for auth_method: service_account (or json_key)" end if task['json_keyfile'] begin json_key = JSON.parse(task['json_keyfile']) task['project'] ||= json_key['project_id'] rescue => e raise ConfigError.new "Parsing 'json_keyfile' failed with error: #{e.class} #{e.message}" end end if task['project'].nil? raise ConfigError.new "Required field \"project\" is not set" end task['destination_project'] ||= task['project'] if (task['payload_column'] or task['payload_column_index']) and task['auto_create_table'] if task['schema_file'].nil? and task['template_table'].nil? raise ConfigError.new "Cannot guess table schema from Embulk schema with `payload_column` or `payload_column_index`. Either of `schema_file` or `template_table` is required for auto_create_table true" end end if task['payload_column_index'] if task['payload_column_index'] < 0 || schema.size <= task['payload_column_index'] raise ConfigError.new "payload_column_index #{task['payload_column_index']} is out of schema size" end elsif task['payload_column'] task['payload_column_index'] = schema.find_index {|c| c[:name] == task['payload_column'] } if task['payload_column_index'].nil? raise ConfigError.new "payload_column #{task['payload_column']} does not exist in schema" end end if task['schema_file'] unless File.exist?(task['schema_file']) raise ConfigError.new "schema_file #{task['schema_file']} is not found" end begin JSON.parse(File.read(task['schema_file'])) rescue => e raise ConfigError.new "Parsing 'schema_file' #{task['schema_file']} failed with error: #{e.class} #{e.message}" end end if task['path_prefix'].nil? task['path_prefix'] = Tempfile.create('embulk_output_bigquery_') {|fp| fp.path } end task['source_format'] = task['source_format'].upcase if task['source_format'] == 'JSONL' task['source_format'] = 'NEWLINE_DELIMITED_JSON' end unless %w[CSV NEWLINE_DELIMITED_JSON].include?(task['source_format']) raise ConfigError.new "`source_format` must be CSV or NEWLINE_DELIMITED_JSON (JSONL)" end task['compression'] = task['compression'].upcase unless %w[GZIP NONE].include?(task['compression']) raise ConfigError.new "`compression` must be GZIP or NONE" end if task['file_ext'].nil? case task['source_format'] when 'CSV' file_ext = '.csv' else # newline_delimited_json file_ext = '.jsonl' end case task['compression'] when 'GZIP' file_ext << '.gz' end task['file_ext'] = file_ext end unique_name = SecureRandom.uuid.gsub('-', '_') if %w[replace replace_backup append].include?(task['mode']) task['temp_table'] ||= "LOAD_TEMP_#{unique_name}_#{task['table']}" else task['temp_table'] = nil end if task['with_rehearsal'] task['rehearsal_table'] ||= "LOAD_REHEARSAL_#{unique_name}_#{task['table']}" end if task['sdk_log_level'] Google::Apis.logger.level = eval("::Logger::#{task['sdk_log_level'].upcase}") end if task['abort_on_error'].nil? task['abort_on_error'] = (task['max_bad_records'] == 0) end if task['time_partitioning'] unless task['time_partitioning']['type'] raise ConfigError.new "`time_partitioning` must have `type` key" end elsif Helper.has_partition_decorator?(task['table']) task['time_partitioning'] = {'type' => 'DAY'} end if task['clustering'] unless task['clustering']['fields'] raise ConfigError.new "`clustering` must have `fields` key" end end if task['schema_update_options'] task['schema_update_options'].each do |schema_update_option| unless %w[ALLOW_FIELD_ADDITION ALLOW_FIELD_RELAXATION].include?(schema_update_option) raise ConfigError.new "`schema_update_options` must contain either of ALLOW_FIELD_ADDITION or ALLOW_FIELD_RELAXATION or both" end end end task end
converters()
click to toggle source
# File lib/embulk/output/bigquery.rb, line 259 def self.converters @converters end
file_writers()
click to toggle source
# File lib/embulk/output/bigquery.rb, line 444 def self.file_writers @file_writers end
new(task, schema, index)
click to toggle source
instance is created on each task
Calls superclass method
# File lib/embulk/output/bigquery.rb, line 472 def initialize(task, schema, index) super if task['with_rehearsal'] and @index == 0 @rehearsaled = false end end
rehearsal_thread()
click to toggle source
# File lib/embulk/output/bigquery.rb, line 263 def self.rehearsal_thread @rehearsal_thread end
rehearsal_thread=(rehearsal_thread)
click to toggle source
# File lib/embulk/output/bigquery.rb, line 267 def self.rehearsal_thread=(rehearsal_thread) @rehearsal_thread = rehearsal_thread end
reset_file_writers()
click to toggle source
# File lib/embulk/output/bigquery.rb, line 440 def self.reset_file_writers @file_writers = Array.new end
transaction(config, schema, task_count) { |task| ... }
click to toggle source
# File lib/embulk/output/bigquery.rb, line 334 def self.transaction(config, schema, task_count, &control) task = self.configure(config, schema, task_count) @task = task @schema = schema @bigquery = BigqueryClient.new(task, schema) @converters = ValueConverterFactory.create_converters(task, schema) self.auto_create(@task, @bigquery) begin paths = [] if task['skip_file_generation'] yield(task) # does nothing, but seems it has to be called path_pattern = "#{task['path_prefix']}*#{task['file_ext']}" Embulk.logger.info { "embulk-output-bigquery: Skip file generation. Get paths from `#{path_pattern}`" } paths = Dir.glob(path_pattern) else task_reports = yield(task) # generates local files ios = file_writers.map(&:io) paths = ios.map(&:path) ios.each do |io| Embulk.logger.debug { "close #{io.path}" } io.close rescue nil end end if rehearsal_thread rehearsal_thread.join end if task['skip_load'] # only for debug Embulk.logger.info { "embulk-output-bigquery: Skip load" } else if !paths.empty? target_table = task['temp_table'] ? task['temp_table'] : task['table'] if bucket = task['gcs_bucket'] gcs = GcsClient.new(task) gcs.insert_temporary_bucket(bucket) if task['auto_create_gcs_bucket'] objects = paths.size.times.map { SecureRandom.uuid.to_s } gcs.insert_objects(paths, objects: objects, bucket: bucket) object_uris = objects.map {|object| URI.join("gs://#{bucket}", object).to_s } responses = bigquery.load_from_gcs(object_uris, target_table) objects.each {|object| gcs.delete_object(object, bucket: bucket) } else responses = bigquery.load_in_parallel(paths, target_table) end else responses = [] end transaction_report = self.transaction_report(task, responses) Embulk.logger.info { "embulk-output-bigquery: transaction_report: #{transaction_report.to_json}" } if task['abort_on_error'] && !task['is_skip_job_result_check'] if transaction_report['num_input_rows'] != transaction_report['num_output_rows'] raise Error, "ABORT: `num_input_rows (#{transaction_report['num_input_rows']})` and " \ "`num_output_rows (#{transaction_report['num_output_rows']})` does not match" end end if task['mode'] == 'replace_backup' begin bigquery.get_table_or_partition(task['table']) bigquery.copy(task['table'], task['table_old'], task['dataset_old']) rescue NotFoundError end end if task['temp_table'] if task['mode'] == 'append' bigquery.copy(task['temp_table'], task['table'], write_disposition: 'WRITE_APPEND') else # replace or replace_backup bigquery.copy(task['temp_table'], task['table'], write_disposition: 'WRITE_TRUNCATE') end end end ensure begin if task['temp_table'] # append or replace or replace_backup bigquery.delete_table(task['temp_table']) end ensure if task['delete_from_local_when_job_end'] paths.each do |path| Embulk.logger.info { "embulk-output-bigquery: delete #{path}" } File.unlink(path) rescue nil end else paths.each do |path| if File.exist?(path) Embulk.logger.info { "embulk-output-bigquery: keep #{path}" } end end end end end # this is for -c next_config option, add some paramters for next execution if wants next_config_diff = {} return next_config_diff end
transaction_report(task, responses)
click to toggle source
# File lib/embulk/output/bigquery.rb, line 271 def self.transaction_report(task, responses) num_input_rows = file_writers.empty? ? 0 : file_writers.map(&:num_rows).inject(:+) return {'num_input_rows' => num_input_rows} if task['is_skip_job_result_check'] num_response_rows = responses.inject(0) do |sum, response| sum + (response ? response.statistics.load.output_rows.to_i : 0) end if task['temp_table'] num_output_rows = bigquery.get_table_or_partition(task['temp_table']).num_rows.to_i else num_output_rows = num_response_rows end num_rejected_rows = num_input_rows - num_output_rows transaction_report = { 'num_input_rows' => num_input_rows, 'num_response_rows' => num_response_rows, 'num_output_rows' => num_output_rows, 'num_rejected_rows' => num_rejected_rows, } end
Public Instance Methods
abort()
click to toggle source
# File lib/embulk/output/bigquery.rb, line 524 def abort end
add(page)
click to toggle source
called for each page in each task
# File lib/embulk/output/bigquery.rb, line 485 def add(page) return if task['skip_file_generation'] num_rows = file_writer.add(page) if task['with_rehearsal'] and @index == 0 and !@rehearsaled if num_rows >= task['rehearsal_counts'] load_rehearsal @rehearsaled = true end end end
close()
click to toggle source
called for each page in each task
# File lib/embulk/output/bigquery.rb, line 481 def close end
commit()
click to toggle source
called after processing all pages in each task, returns a task_report
# File lib/embulk/output/bigquery.rb, line 528 def commit {} end
file_writer()
click to toggle source
Create one FileWriter
object for one output thread, that is, share among tasks. Close theses shared objects in transaction. This is mainly to suppress (or control by -X max_threads) number of files, which equals to number of concurrency to load in parallel, when number of input tasks is many
file_writer
must be called at only add
because threads in other methods are different (called from non-output threads). Note also that add
method of the same task instance would be called in different output threads
# File lib/embulk/output/bigquery.rb, line 464 def file_writer return Thread.current[FILE_WRITER_KEY] if Thread.current[FILE_WRITER_KEY] file_writer = FileWriter.new(@task, @schema, @index, self.class.converters) self.class.add_file_writer(file_writer) Thread.current[FILE_WRITER_KEY] = file_writer end
finish()
click to toggle source
# File lib/embulk/output/bigquery.rb, line 521 def finish end
load_rehearsal()
click to toggle source
# File lib/embulk/output/bigquery.rb, line 497 def load_rehearsal bigquery = self.class.bigquery Embulk.logger.info { "embulk-output-bigquery: Rehearsal started" } io = file_writer.close # need to close once for gzip rehearsal_path = "#{io.path}.rehearsal" Embulk.logger.debug { "embulk_output_bigquery: cp #{io.path} #{rehearsal_path}" } FileUtils.cp(io.path, rehearsal_path) file_writer.reopen self.class.rehearsal_thread = Thread.new do begin bigquery.create_table_if_not_exists(task['rehearsal_table']) response = bigquery.load(rehearsal_path, task['rehearsal_table']) num_output_rows = response ? response.statistics.load.output_rows.to_i : 0 Embulk.logger.info { "embulk-output-bigquery: Loaded rehearsal #{num_output_rows}" } ensure Embulk.logger.debug { "embulk_output_bigquery: delete #{rehearsal_path}" } File.unlink(rehearsal_path) rescue nil bigquery.delete_table(task['rehearsal_table']) end end end