class Embulk::Output::Vertica::OutputThreadPool

Public Class Methods

new(task, schema, size) click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 16
def initialize(task, schema, size)
  @task = task
  @size = size
  @schema = schema
  @converters = ValueConverterFactory.create_converters(schema, task['default_timezone'], task['column_options'])
  @output_threads = size.times.map { OutputThread.new(task) }
  @current_index = 0
end

Public Instance Methods

commit() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 41
def commit
  Embulk.logger.debug "embulk-output-vertica: commit"
  task_reports = @mutex.synchronize do
    @size.times.map {|i| @output_threads[i].commit }
  end
  unless task_reports.all? {|task_report| task_report['success'] }
    raise CommitError, "some of output_threads failed to commit"
  end
  task_reports
end
enqueue(page) click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 25
def enqueue(page)
  json_page = []
  page.each do |record|
    json_page << to_json(record)
  end
  @mutex.synchronize do
    @output_threads[@current_index].enqueue(json_page)
    @current_index = (@current_index + 1) % @size
  end
end
start() click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 36
def start
  @mutex = Mutex.new
  @size.times.map {|i| @output_threads[i].start }
end
to_json(record) click to toggle source
# File lib/embulk/output/vertica/output_thread.rb, line 52
def to_json(record)
  if @task['json_payload']
    record.first
  else
    Hash[*(@schema.names.zip(record).map do |column_name, value|
      [column_name, @converters[column_name].call(value)]
    end.flatten!(1))].to_json
  end
end