class Embulk::Output::Bigquery::FileWriter

Attributes

num_rows[R]

Public Class Methods

new(task, schema, index, converters = nil) click to toggle source
# File lib/embulk/output/bigquery/file_writer.rb, line 12
def initialize(task, schema, index, converters = nil)
  @task = task
  @schema = schema
  @index = index
  @converters = converters || ValueConverterFactory.create_converters(task, schema)

  @num_rows = 0
  if @task['progress_log_interval']
    @progress_log_interval = @task['progress_log_interval']
    @progress_log_timer = Time.now
    @previous_num_rows = 0
  end

  if @task['payload_column_index']
    @payload_column_index = @task['payload_column_index']
    @formatter_proc = self.method(:to_payload)
  else
    case @task['source_format'].downcase
    when 'csv'
      @formatter_proc = self.method(:to_csv)
    else
      @formatter_proc = self.method(:to_jsonl)
    end
  end
end

Public Instance Methods

add(page) click to toggle source
# File lib/embulk/output/bigquery/file_writer.rb, line 98
def add(page)
  _io = io
  # I once tried to split IO writing into another IO thread using SizedQueue
  # However, it resulted in worse performance, so I removed the codes.
  page.each do |record|
    Embulk.logger.trace { "embulk-output-bigquery: record #{record}" }
    formatted_record = @formatter_proc.call(record)
    Embulk.logger.trace { "embulk-output-bigquery: formatted_record #{formatted_record.chomp}" }
    _io.write formatted_record
    @num_rows += 1
  end
  show_progress if @task['progress_log_interval']
  @num_rows
end
close() click to toggle source
# File lib/embulk/output/bigquery/file_writer.rb, line 65
def close
  io.close rescue nil
  io
end
io() click to toggle source
# File lib/embulk/output/bigquery/file_writer.rb, line 38
def io
  return @io if @io

  path = sprintf(
    "#{@task['path_prefix']}#{@task['sequence_format']}#{@task['file_ext']}",
    Process.pid, Thread.current.object_id
  )
  if File.exist?(path)
    Embulk.logger.warn { "embulk-output-bigquery: unlink already existing #{path}" }
    File.unlink(path) rescue nil
  end
  Embulk.logger.info { "embulk-output-bigquery: create #{path}" }

  @io = open(path, 'w')
end
num_format(number) click to toggle source
# File lib/embulk/output/bigquery/file_writer.rb, line 94
def num_format(number)
  number.to_s.gsub(/(\d)(?=(\d{3})+(?!\d))/, '\1,')
end
open(path, mode = 'w') click to toggle source
# File lib/embulk/output/bigquery/file_writer.rb, line 54
def open(path, mode = 'w')
  file_io = File.open(path, mode)
  case @task['compression'].downcase
  when 'gzip'
    io = Zlib::GzipWriter.new(file_io)
  else
    io = file_io
  end
  io
end
reopen() click to toggle source
# File lib/embulk/output/bigquery/file_writer.rb, line 70
def reopen
  @io = open(io.path, 'a')
end
to_csv(record) click to toggle source
# File lib/embulk/output/bigquery/file_writer.rb, line 78
def to_csv(record)
  record.map.with_index do |value, column_index|
    @converters[column_index].call(value)
  end.to_csv
end
to_jsonl(record) click to toggle source
# File lib/embulk/output/bigquery/file_writer.rb, line 84
def to_jsonl(record)
  hash = {}
  column_names = @schema.names
  record.each_with_index do |value, column_index|
    column_name = column_names[column_index]
    hash[column_name] = @converters[column_index].call(value)
  end
  "#{hash.to_json}\n"
end
to_payload(record) click to toggle source
# File lib/embulk/output/bigquery/file_writer.rb, line 74
def to_payload(record)
  "#{record[@payload_column_index]}\n"
end

Private Instance Methods

show_progress() click to toggle source
# File lib/embulk/output/bigquery/file_writer.rb, line 115
def show_progress
  now = Time.now
  if @progress_log_timer < now - @progress_log_interval
    speed = ((@num_rows - @previous_num_rows) / (now - @progress_log_timer).to_f).round(1)
    @progress_log_timer = now
    @previous_num_rows = @num_rows
    Embulk.logger.info { "embulk-output-bigquery: num_rows #{num_format(@num_rows)} (#{num_format(speed)} rows/sec)" }
  end
end