class Embulk::Output::Vertica::OutputThread
Constants
- PIPE_BUF
Public Class Methods
new(task)
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 64 def initialize(task) @task = task @queue = SizedQueue.new(1) @num_input_rows = 0 @num_output_rows = 0 @num_rejected_rows = 0 @outer_thread = Thread.current @thread_active = false @progress_log_timer = Time.now @previous_num_input_rows = 0 case task['compress'] when 'GZIP' @write_proc = self.method(:write_gzip) else @write_proc = self.method(:write_uncompressed) end end
Public Instance Methods
abort_on_error()
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 314 def abort_on_error @task['abort_on_error'] ? ' ABORT ON ERROR' : '' end
close(jv)
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 240 def close(jv) begin jv.close rescue java.sql.SQLException => e # The connection is closed Embulk.logger.debug "embulk-output-vertica: #{e.class} #{e.message}" end end
commit()
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 261 def commit Embulk.logger.debug "embulk-output-vertica: output_thread commit" @thread_active = false success = true if @thread.alive? Embulk.logger.debug { "embulk-output-vertica: push finish with finish_timeout:#{@task['finish_timeout']}" } @queue.push('finish') Thread.pass @thread.join(@task['finish_timeout']) if @thread.alive? @thread.kill Embulk.logger.error "embulk-output-vertica: finish_timeout #{@task['finish_timeout']}sec exceeded, thread is killed forcely" success = false end else Embulk.logger.error "embulk-output-vertica: thread died accidently" success = false end task_report = { 'num_input_rows' => @num_input_rows, 'num_output_rows' => @num_output_rows, 'num_rejected_rows' => @num_rejected_rows, 'success' => success } end
compress()
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 306 def compress " #{@task['compress']}" end
copy(jv, sql, &block)
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 163 def copy(jv, sql, &block) Embulk.logger.debug "embulk-output-vertica: copy, waiting a first message" num_output_rows = 0; rejected_row_nums = []; last_record = nil json_page = dequeue return [num_output_rows, rejected_row_nums, last_record] if json_page == 'finish' Embulk.logger.debug "embulk-output-vertica: #{sql}" num_output_rows, rejected_row_nums = jv.copy(sql) do |stdin, stream| @write_proc.call(stdin, json_page) {|record| last_record = record } while true json_page = dequeue break if json_page == 'finish' @write_proc.call(stdin, json_page) {|record| last_record = record } end end @num_output_rows += num_output_rows @num_rejected_rows += rejected_row_nums.size Embulk.logger.info { "embulk-output-vertica: COMMIT!" } jv.commit Embulk.logger.debug { "embulk-output-vertica: COMMITTED!" } if rejected_row_nums.size > 0 Embulk.logger.debug { "embulk-output-vertica: rejected_row_nums: #{rejected_row_nums}" } end [num_output_rows, rejected_row_nums, last_record] end
copy_mode()
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 310 def copy_mode " #{@task['copy_mode']}" end
copy_sql()
click to toggle source
private
# File lib/embulk/output/vertica/output_thread.rb, line 290 def copy_sql @copy_sql ||= "COPY #{quoted_schema}.#{quoted_temp_table} FROM STDIN#{compress}#{fjsonparser}#{copy_mode}#{abort_on_error} NO COMMIT" end
dequeue()
click to toggle source
@return [Array] dequeued json_page @return [String] ‘finish’ is dequeued to finish
# File lib/embulk/output/vertica/output_thread.rb, line 154 def dequeue json_page = nil Embulk.logger.trace { "embulk-output-vertica: @queue.pop with dequeue_timeout:#{@task['dequeue_timeout']}" } Timeout.timeout(@task['dequeue_timeout'], DequeueTimeoutError) { json_page = @queue.pop } Embulk.logger.trace { "embulk-output-vertica: dequeued" } Embulk.logger.debug { "embulk-output-vertica: dequeued finish" } if json_page == 'finish' json_page end
dequeue_all()
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 233 def dequeue_all Embulk.logger.debug "embulk-output-vertica: dequeue all" while @queue.size > 0 @queue.pop # dequeue all because some might be still trying @queue.push and get blocked, need to release end end
enqueue(json_page)
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 96 def enqueue(json_page) if @thread_active and @thread.alive? Embulk.logger.trace { "embulk-output-vertica: enqueue" } @queue.push(json_page) else Embulk.logger.info { "embulk-output-vertica: thread is dead, but still trying to enqueue" } thread_dump raise RuntimeError, "embulk-output-vertica: thread is died, but still trying to enqueue" end end
fjsonparser()
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 318 def fjsonparser " PARSER fjsonparser(#{reject_on_materialized_type_error})" end
num_format(number)
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 148 def num_format(number) number.to_s.gsub(/(\d)(?=(\d{3})+(?!\d))/, '\1,') end
quoted_schema()
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 294 def quoted_schema ::Jvertica.quote_identifier(@task['schema']) end
quoted_table()
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 298 def quoted_table ::Jvertica.quote_identifier(@task['table']) end
quoted_temp_table()
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 302 def quoted_temp_table ::Jvertica.quote_identifier(@task['temp_table']) end
reject_on_materialized_type_error()
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 322 def reject_on_materialized_type_error @task['reject_on_materialized_type_error'] ? 'reject_on_materialized_type_error=true' : '' end
rollback(jv)
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 248 def rollback(jv) begin jv.rollback rescue java.sql.SQLException => e # The connection is closed Embulk.logger.debug "embulk-output-vertica: #{e.class} #{e.message}" end end
run()
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 196 def run Embulk.logger.debug { "embulk-output-vertica: thread started" } begin jv = Vertica.connect(@task) begin num_output_rows, rejected_row_nums, last_record = copy(jv, copy_sql) Embulk.logger.debug { "embulk-output-vertica: thread finished" } rescue java.sql.SQLDataException => e if @task['reject_on_materialized_type_error'] and e.message =~ /Rejected by user-defined parser/ Embulk.logger.warn "embulk-output-vertica: ROLLBACK! some of column types and values types do not fit #{rejected_row_nums}" else Embulk.logger.warn "embulk-output-vertica: ROLLBACK! #{rejected_row_nums}" end Embulk.logger.info { "embulk-output-vertica: last_record: #{last_record}" } rollback(jv) raise e rescue => e Embulk.logger.warn "embulk-output-vertica: ROLLBACK! #{e.class} #{e.message} #{e.backtrace.join("\n ")}" rollback(jv) raise e end ensure close(jv) end rescue TimeoutError => e Embulk.logger.error "embulk-output-vertica: UNKNOWN TIMEOUT!! #{e.class}" @thread_active = false # not to be enqueued any more dequeue_all thread_dump exit(1) rescue Exception => e Embulk.logger.error "embulk-output-vertica: UNKNOWN ERROR! #{e.class} #{e.message} #{e.backtrace.join("\n ")}" @thread_active = false # not to be enqueued any more dequeue_all @outer_thread.raise e end
start()
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 256 def start @thread = Thread.new(&method(:run)) @thread_active = true end
thread_dump()
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 83 def thread_dump unless $embulk_output_vertica_thread_dumped $embulk_output_vertica_thread_dumped = true Embulk.logger.debug "embulk-output-vertica: kill -3 #{$$} (Thread dump)" begin Process.kill :QUIT, $$ rescue SignalException ensure sleep 1 end end end
write_buf(buf, json_page) { |record| ... }
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 132 def write_buf(buf, json_page, &block) json_page.each do |record| yield(record) if block_given? Embulk.logger.trace { "embulk-output-vertica: record #{record}" } buf << record << "\n" @num_input_rows += 1 end now = Time.now if @progress_log_timer < now - 10 # once in 10 seconds speed = ((@num_input_rows - @previous_num_input_rows) / (now - @progress_log_timer).to_f).round(1) @progress_log_timer = now @previous_num_input_rows = @num_input_rows Embulk.logger.info { "embulk-output-vertica: num_input_rows #{num_format(@num_input_rows)} (#{num_format(speed)} rows/sec)" } end end
write_gzip(io, page, &block)
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 107 def write_gzip(io, page, &block) buf = Zlib::Deflate.new write_buf(buf, page, &block) write_io(io, buf.finish) end
write_io(io, str)
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 121 def write_io(io, str) str = str.force_encoding('ASCII-8BIT') i = 0 # split str not to be blocked (max size of pipe buf is 64k bytes on Linux, Mac at default) while substr = str[i, PIPE_BUF] Embulk.logger.trace { "embulk-output-vertica: io.write with write_timeout:#{@task['write_timeout']}" } Timeout.timeout(@task['write_timeout'], WriteTimeoutError) { io.write(substr) } i += PIPE_BUF end end
write_uncompressed(io, page, &block)
click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 113 def write_uncompressed(io, page, &block) buf = '' write_buf(buf, page, &block) write_io(io, buf) end