class Embulk::Output::Vertica
Public Class Methods
new(task, schema, index)
click to toggle source
instance is created on each thread
Calls superclass method
# File lib/embulk/output/vertica.rb, line 166 def initialize(task, schema, index) super end
thread_pool()
click to toggle source
# File lib/embulk/output/vertica.rb, line 14 def self.thread_pool @thread_pool ||= @thread_pool_proc.call end
transaction(config, schema, task_count) { |task| ... }
click to toggle source
# File lib/embulk/output/vertica.rb, line 35 def self.transaction(config, schema, task_count, &control) task = { 'host' => config.param('host', :string, :default => 'localhost'), 'port' => config.param('port', :integer, :default => 5433), 'user' => config.param('user', :string, :default => nil), 'username' => config.param('username', :string, :default => nil), # alias to :user for backward compatibility 'password' => config.param('password', :string, :default => ''), 'database' => config.param('database', :string, :default => 'vdb'), 'schema' => config.param('schema', :string, :default => 'public'), 'table' => config.param('table', :string), 'mode' => config.param('mode', :string, :default => 'insert'), 'copy_mode' => config.param('copy_mode', :string, :default => 'AUTO'), 'abort_on_error' => config.param('abort_on_error', :bool, :default => false), 'compress' => config.param('compress', :string, :default => 'UNCOMPRESSED'), 'default_timezone' => config.param('default_timezone', :string, :default => 'UTC'), 'column_options' => config.param('column_options', :hash, :default => {}), 'json_payload' => config.param('json_payload', :bool, :default => false), 'resource_pool' => config.param('resource_pool', :string, :default => nil), 'reject_on_materialized_type_error' => config.param('reject_on_materialized_type_error', :bool, :default => false), 'pool' => config.param('pool', :integer, :default => task_count), 'write_timeout' => config.param('write_timeout', :integer, :default => nil), # like 11 * 60 sec 'dequeue_timeout' => config.param('dequeue_timeout', :integer, :default => nil), # like 13 * 60 sec 'finish_timeout' => config.param('finish_timeout', :integer, :default => nil), # like 3 * 60 sec } @thread_pool_proc = Proc.new do OutputThreadPool.new(task, schema, task['pool']) end task['user'] ||= task['username'] unless task['user'] raise ConfigError.new 'required field "user" is not set' end task['mode'] = task['mode'].upcase unless %w[INSERT REPLACE DROP_INSERT].include?(task['mode']) raise ConfigError.new "`mode` must be one of INSERT, REPLACE, DROP_INSERT" end task['copy_mode'] = task['copy_mode'].upcase unless %w[AUTO DIRECT TRICKLE].include?(task['copy_mode']) raise ConfigError.new "`copy_mode` must be one of AUTO, DIRECT, TRICKLE" end # ToDo: Support BZIP, LZO task['compress'] = task['compress'].upcase unless %w[GZIP UNCOMPRESSED].include?(task['compress']) raise ConfigError.new "`compress` must be one of GZIP, UNCOMPRESSED" end now = Time.now unique_name = SecureRandom.uuid task['temp_table'] = "#{task['table']}_LOAD_TEMP_#{unique_name}" quoted_schema = ::Jvertica.quote_identifier(task['schema']) quoted_table = ::Jvertica.quote_identifier(task['table']) quoted_temp_table = ::Jvertica.quote_identifier(task['temp_table']) connect(task) do |jv| unless task['json_payload'] # ToDo: auto table creation is not supported to json_payload mode yet sql_schema_table = self.sql_schema_from_embulk_schema(schema, task['column_options']) # create the target table query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_table}]) if task['mode'] == 'DROP_INSERT' query(jv, %[CREATE TABLE IF NOT EXISTS #{quoted_schema}.#{quoted_table} (#{sql_schema_table})]) end # create a temp table query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_temp_table}]) if task['mode'] == 'REPLACE' # In the case of replace mode, this temp table is replaced with the original table. So, projections should also be copied query(jv, %[CREATE TABLE #{quoted_schema}.#{quoted_temp_table} LIKE #{quoted_schema}.#{quoted_table} INCLUDING PROJECTIONS]) else query(jv, %[CREATE TABLE #{quoted_schema}.#{quoted_temp_table} LIKE #{quoted_schema}.#{quoted_table}]) # Create internal vertica projection beforehand, otherwirse, parallel copies lock table to create a projection and we get S Lock error sometimes # This is a trick to create internal vertica projection query(jv, %[INSERT INTO #{quoted_schema}.#{quoted_temp_table} SELECT * FROM #{quoted_schema}.#{quoted_table} LIMIT 0]) end Embulk.logger.trace { result = query(jv, %[SELECT EXPORT_OBJECTS('', '#{task['schema']}.#{task['temp_table']}')]) # You can see `CREATE PROJECTION` if the table has a projection "embulk-output-vertica: #{result.to_a.flatten}" } end begin # insert data into the temp table thread_pool.start yield(task) task_reports = thread_pool.commit Embulk.logger.info { "embulk-output-vertica: task_reports: #{task_reports.to_json}" } connect(task) do |jv| transaction_report = self.transaction_report(jv, task, task_reports) Embulk.logger.info { "embulk-output-vertica: transaction_report: #{transaction_report.to_json}" } if task['abort_on_error'] # double-meaning, also used for COPY statement if transaction_report['num_input_rows'] != transaction_report['num_output_rows'] raise Error, "ABORT: `num_input_rows (#{transaction_report['num_input_rows']})` and " \ "`num_output_rows (#{transaction_report['num_output_rows']})` does not match" end end if task['mode'] == 'REPLACE' # swap table and drop the old table quoted_old_table = ::Jvertica.quote_identifier("#{task['table']}_LOAD_OLD_#{unique_name}") from = "#{quoted_schema}.#{quoted_table},#{quoted_schema}.#{quoted_temp_table}" to = "#{quoted_old_table},#{quoted_table}" query(jv, %[ALTER TABLE #{from} RENAME TO #{to}]) query(jv, %[DROP TABLE #{quoted_schema}.#{quoted_old_table}]) else # insert select from the temp table hint = '/*+ direct */ ' if task['copy_mode'] == 'DIRECT' # I did not prepare a specific option, does anyone want? query(jv, %[INSERT #{hint}INTO #{quoted_schema}.#{quoted_table} SELECT * FROM #{quoted_schema}.#{quoted_temp_table}]) jv.commit end end ensure connect(task) do |jv| # clean up the temp table query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_temp_table}]) Embulk.logger.trace { "embulk-output-vertica: select result\n#{query(jv, %[SELECT * FROM #{quoted_schema}.#{quoted_table} LIMIT 10]).map {|row| row.to_h }.join("\n") rescue nil}" } end end # this is for -o next_config option, add some paramters for next time execution if wants next_config_diff = {} return next_config_diff end
transaction_report(jv, task, task_reports)
click to toggle source
# File lib/embulk/output/vertica.rb, line 18 def self.transaction_report(jv, task, task_reports) quoted_schema = ::Jvertica.quote_identifier(task['schema']) quoted_temp_table = ::Jvertica.quote_identifier(task['temp_table']) num_input_rows = task_reports.map {|report| report['num_input_rows'].to_i }.inject(:+) num_response_rows = task_reports.map {|report| report['num_output_rows'].to_i }.inject(:+) result = query(jv, %[SELECT COUNT(*) FROM #{quoted_schema}.#{quoted_temp_table}]) num_output_rows = result.map {|row| row.values }.flatten.first.to_i num_rejected_rows = num_input_rows - num_output_rows transaction_report = { 'num_input_rows' => num_input_rows, 'num_response_rows' => num_response_rows, 'num_output_rows' => num_output_rows, 'num_rejected_rows' => num_rejected_rows, } end
Private Class Methods
connect(task) { |jv| ... }
click to toggle source
# File lib/embulk/output/vertica.rb, line 193 def self.connect(task) jv = ::Jvertica.connect({ host: task['host'], port: task['port'], user: task['user'], password: task['password'], database: task['database'], }) if resource_pool = task['resource_pool'] query(jv, "SET SESSION RESOURCE_POOL = #{::Jvertica.quote(resource_pool)}") end if block_given? begin yield jv ensure jv.close end end jv end
query(conn, sql)
click to toggle source
# File lib/embulk/output/vertica.rb, line 242 def self.query(conn, sql) Embulk.logger.info "embulk-output-vertica: #{sql}" conn.query(sql) end
sql_schema_from_embulk_schema(schema, column_options)
click to toggle source
@param [Schema] schema embulk defined column types @param [Hash] column_options user defined column types @return [String] sql schema used to CREATE TABLE
# File lib/embulk/output/vertica.rb, line 219 def self.sql_schema_from_embulk_schema(schema, column_options) sql_schema = schema.names.zip(schema.types).map do |column_name, type| if column_options[column_name] and column_options[column_name]['type'] sql_type = column_options[column_name]['type'] else sql_type = sql_type_from_embulk_type(type) end [column_name, sql_type] end sql_schema.map {|name, type| "#{::Jvertica.quote_identifier(name)} #{type}" }.join(',') end
sql_type_from_embulk_type(type)
click to toggle source
# File lib/embulk/output/vertica.rb, line 231 def self.sql_type_from_embulk_type(type) case type when :boolean then 'BOOLEAN' when :long then 'INT' # BIGINT is a synonym for INT in vertica when :double then 'FLOAT' # DOUBLE PRECISION is a synonym for FLOAT in vertica when :string then 'VARCHAR' # LONG VARCHAR is not recommended. Default is VARCHAR(80) when :timestamp then 'TIMESTAMP' else raise NotSupportedType, "embulk-output-vertica cannot take column type #{type}" end end
Public Instance Methods
abort()
click to toggle source
# File lib/embulk/output/vertica.rb, line 182 def abort end
add(page)
click to toggle source
called for each page in each thread
# File lib/embulk/output/vertica.rb, line 175 def add(page) self.class.thread_pool.enqueue(page) end
close()
click to toggle source
called for each page in each thread
# File lib/embulk/output/vertica.rb, line 171 def close end
commit()
click to toggle source
called after processing all pages in each thread we do commit on transaction for all pools, not at here
# File lib/embulk/output/vertica.rb, line 187 def commit {} end
finish()
click to toggle source
# File lib/embulk/output/vertica.rb, line 179 def finish end
Private Instance Methods
query(conn, sql)
click to toggle source
# File lib/embulk/output/vertica.rb, line 247 def query(conn, sql) self.class.query(conn, sql) end