class Embulk::Output::Vertica::OutputThread

Constants

PIPE_BUF

Public Class Methods

new(task) click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 64
def initialize(task)
  @task = task
  @queue = SizedQueue.new(1)
  @num_input_rows = 0
  @num_output_rows = 0
  @num_rejected_rows = 0
  @outer_thread = Thread.current
  @thread_active = false
  @progress_log_timer = Time.now
  @previous_num_input_rows = 0

  case task['compress']
  when 'GZIP'
    @write_proc = self.method(:write_gzip)
  else
    @write_proc = self.method(:write_uncompressed)
  end
end

Public Instance Methods

abort_on_error() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 314
def abort_on_error
  @task['abort_on_error'] ? ' ABORT ON ERROR' : ''
end
close(jv) click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 240
def close(jv)
  begin
    jv.close
  rescue java.sql.SQLException => e # The connection is closed
    Embulk.logger.debug "embulk-output-vertica: #{e.class} #{e.message}"
  end
end
commit() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 261
def commit
  Embulk.logger.debug "embulk-output-vertica: output_thread commit"
  @thread_active = false
  success = true
  if @thread.alive?
    Embulk.logger.debug { "embulk-output-vertica: push finish with finish_timeout:#{@task['finish_timeout']}" }
    @queue.push('finish')
    Thread.pass
    @thread.join(@task['finish_timeout'])
    if @thread.alive?
      @thread.kill
      Embulk.logger.error "embulk-output-vertica: finish_timeout #{@task['finish_timeout']}sec exceeded, thread is killed forcely"
      success = false
    end
  else
    Embulk.logger.error "embulk-output-vertica: thread died accidently"
    success = false
  end

  task_report = {
    'num_input_rows' => @num_input_rows,
    'num_output_rows' => @num_output_rows,
    'num_rejected_rows' => @num_rejected_rows,
    'success' => success
  }
end
compress() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 306
def compress
  " #{@task['compress']}"
end
copy(jv, sql, &block) click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 163
def copy(jv, sql, &block)
  Embulk.logger.debug "embulk-output-vertica: copy, waiting a first message"

  num_output_rows = 0; rejected_row_nums = []; last_record = nil

  json_page = dequeue
  return [num_output_rows, rejected_row_nums, last_record] if json_page == 'finish'

  Embulk.logger.debug "embulk-output-vertica: #{sql}"

  num_output_rows, rejected_row_nums = jv.copy(sql) do |stdin, stream|
    @write_proc.call(stdin, json_page) {|record| last_record = record }

    while true
      json_page = dequeue
      break if json_page == 'finish'
      @write_proc.call(stdin, json_page) {|record| last_record = record }
    end
  end

  @num_output_rows += num_output_rows
  @num_rejected_rows += rejected_row_nums.size
  Embulk.logger.info { "embulk-output-vertica: COMMIT!" }
  jv.commit
  Embulk.logger.debug { "embulk-output-vertica: COMMITTED!" }

  if rejected_row_nums.size > 0
    Embulk.logger.debug { "embulk-output-vertica: rejected_row_nums: #{rejected_row_nums}" }
  end

  [num_output_rows, rejected_row_nums, last_record]
end
copy_mode() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 310
def copy_mode
  " #{@task['copy_mode']}"
end
copy_sql() click to toggle source

private

# File lib/embulk/output/vertica/output_thread.rb, line 290
def copy_sql
  @copy_sql ||= "COPY #{quoted_schema}.#{quoted_temp_table} FROM STDIN#{compress}#{fjsonparser}#{copy_mode}#{abort_on_error} NO COMMIT"
end
dequeue() click to toggle source

@return [Array] dequeued json_page @return [String] ‘finish’ is dequeued to finish

# File lib/embulk/output/vertica/output_thread.rb, line 154
def dequeue
  json_page = nil
  Embulk.logger.trace { "embulk-output-vertica: @queue.pop with dequeue_timeout:#{@task['dequeue_timeout']}" }
  Timeout.timeout(@task['dequeue_timeout'], DequeueTimeoutError) { json_page = @queue.pop }
  Embulk.logger.trace { "embulk-output-vertica: dequeued" }
  Embulk.logger.debug { "embulk-output-vertica: dequeued finish" } if json_page == 'finish'
  json_page
end
dequeue_all() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 233
def dequeue_all
  Embulk.logger.debug "embulk-output-vertica: dequeue all"
  while @queue.size > 0
    @queue.pop # dequeue all because some might be still trying @queue.push and get blocked, need to release
  end
end
enqueue(json_page) click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 96
def enqueue(json_page)
  if @thread_active and @thread.alive?
    Embulk.logger.trace { "embulk-output-vertica: enqueue" }
    @queue.push(json_page)
  else
    Embulk.logger.info { "embulk-output-vertica: thread is dead, but still trying to enqueue" }
    thread_dump
    raise RuntimeError, "embulk-output-vertica: thread is died, but still trying to enqueue"
  end
end
fjsonparser() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 318
def fjsonparser
  " PARSER fjsonparser(#{reject_on_materialized_type_error})"
end
num_format(number) click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 148
def num_format(number)
  number.to_s.gsub(/(\d)(?=(\d{3})+(?!\d))/, '\1,')
end
quoted_schema() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 294
def quoted_schema
  ::Jvertica.quote_identifier(@task['schema'])
end
quoted_table() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 298
def quoted_table
  ::Jvertica.quote_identifier(@task['table'])
end
quoted_temp_table() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 302
def quoted_temp_table
  ::Jvertica.quote_identifier(@task['temp_table'])
end
reject_on_materialized_type_error() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 322
def reject_on_materialized_type_error
  @task['reject_on_materialized_type_error'] ? 'reject_on_materialized_type_error=true' : ''
end
rollback(jv) click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 248
def rollback(jv)
  begin
    jv.rollback
  rescue java.sql.SQLException => e # The connection is closed
    Embulk.logger.debug "embulk-output-vertica: #{e.class} #{e.message}"
  end
end
run() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 196
def run
  Embulk.logger.debug { "embulk-output-vertica: thread started" }
  begin
    jv = Vertica.connect(@task)
    begin
      num_output_rows, rejected_row_nums, last_record = copy(jv, copy_sql)
      Embulk.logger.debug { "embulk-output-vertica: thread finished" }
    rescue java.sql.SQLDataException => e
      if @task['reject_on_materialized_type_error'] and e.message =~ /Rejected by user-defined parser/
        Embulk.logger.warn "embulk-output-vertica: ROLLBACK! some of column types and values types do not fit #{rejected_row_nums}"
      else
        Embulk.logger.warn "embulk-output-vertica: ROLLBACK! #{rejected_row_nums}"
      end
      Embulk.logger.info { "embulk-output-vertica: last_record: #{last_record}" }
      rollback(jv)
      raise e
    rescue => e
      Embulk.logger.warn "embulk-output-vertica: ROLLBACK! #{e.class} #{e.message} #{e.backtrace.join("\n  ")}"
      rollback(jv)
      raise e
    end
  ensure
    close(jv)
  end
rescue TimeoutError => e
  Embulk.logger.error "embulk-output-vertica: UNKNOWN TIMEOUT!! #{e.class}"
  @thread_active = false # not to be enqueued any more
  dequeue_all
  thread_dump
  exit(1)
rescue Exception => e
  Embulk.logger.error "embulk-output-vertica: UNKNOWN ERROR! #{e.class} #{e.message} #{e.backtrace.join("\n  ")}"
  @thread_active = false # not to be enqueued any more
  dequeue_all
  @outer_thread.raise e
end
start() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 256
def start
  @thread = Thread.new(&method(:run))
  @thread_active = true
end
thread_dump() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 83
def thread_dump
  unless $embulk_output_vertica_thread_dumped
    $embulk_output_vertica_thread_dumped = true
    Embulk.logger.debug "embulk-output-vertica: kill -3 #{$$} (Thread dump)"
    begin
      Process.kill :QUIT, $$
    rescue SignalException
    ensure
      sleep 1
    end
  end
end
write_buf(buf, json_page) { |record| ... } click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 132
def write_buf(buf, json_page, &block)
  json_page.each do |record|
    yield(record) if block_given?
    Embulk.logger.trace { "embulk-output-vertica: record #{record}" }
    buf << record << "\n"
    @num_input_rows += 1
  end
  now = Time.now
  if @progress_log_timer < now - 10 # once in 10 seconds
    speed = ((@num_input_rows - @previous_num_input_rows) / (now - @progress_log_timer).to_f).round(1)
    @progress_log_timer = now
    @previous_num_input_rows = @num_input_rows
    Embulk.logger.info { "embulk-output-vertica: num_input_rows #{num_format(@num_input_rows)} (#{num_format(speed)} rows/sec)" }
  end
end
write_gzip(io, page, &block) click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 107
def write_gzip(io, page, &block)
  buf = Zlib::Deflate.new
  write_buf(buf, page, &block)
  write_io(io, buf.finish)
end
write_io(io, str) click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 121
def write_io(io, str)
  str = str.force_encoding('ASCII-8BIT')
  i = 0
  # split str not to be blocked (max size of pipe buf is 64k bytes on Linux, Mac at default)
  while substr = str[i, PIPE_BUF]
    Embulk.logger.trace { "embulk-output-vertica: io.write with write_timeout:#{@task['write_timeout']}" }
    Timeout.timeout(@task['write_timeout'], WriteTimeoutError) { io.write(substr) }
    i += PIPE_BUF
  end
end
write_uncompressed(io, page, &block) click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 113
def write_uncompressed(io, page, &block)
  buf = ''
  write_buf(buf, page, &block)
  write_io(io, buf)
end