class Embulk::Input::InputBigquery

Public Class Methods

determine_columns_by_query_results(sql, option, bigquery_client) click to toggle source
# File lib/embulk/input/bigquery.rb, line 108
def self.determine_columns_by_query_results(sql, option, bigquery_client)
  Embulk.logger.info 'determine columns using the getQueryResults API instead of the config.yml'

  query_option = option.dup
  query_option.delete(:max)
  query_option.delete(:location)
  job = bigquery_client.query_job(sql, **query_option) do |query|
    query.location = option[:location] if option[:location]
  end

  Embulk.logger.info 'waiting for the query job to complete to get schema from query results'
  job.wait_until_done!

  Embulk.logger.info "completed: job_id=#{job.job_id}"
  result = job.query_results(max: 0)

  columns = result.fields.map do |f|
    {
      'name' => f.name,
      'type' => embulk_column_type(f.type)
    }
  end
  Embulk.logger.info "determined columns: #{columns.inspect}"

  [job.job_id, columns]
end
embulk_column_type(bq_data_type) click to toggle source
# File lib/embulk/input/bigquery.rb, line 135
def self.embulk_column_type(bq_data_type)
  case bq_data_type
  when 'BOOLEAN', 'BOOL'
    :boolean
  when 'INTEGER', 'INT64'
    :long
  when 'FLOAT', 'FLOAT64'
    :double
  when 'STRING', 'DATETIME', 'DATE', 'TIME'
    :string
  when 'TIMESTAMP'
    :timestamp
  when 'RECORD', 'BYTES'
    raise "unsupported type #{bq_data_type.inspect}"
  else
    raise "unknown type #{bq_data_type.inspect}"
  end
end
resume(task, columns, count) { |task, columns, count| ... } click to toggle source
# File lib/embulk/input/bigquery.rb, line 66
def self.resume(task, columns, count, &control)
  task_reports = yield(task, columns, count)

  next_config_diff = {}
end
transaction(config, &control) click to toggle source
# File lib/embulk/input/bigquery.rb, line 23
def self.transaction(config, &control)
  sql = config[:sql]
  params = {}
  unless sql
    sql_erb = config[:sql_erb]
    erb = ERB.new(sql_erb)
    erb_params = config[:erb_params] || {}
    erb_params.each do |k, v|
      params[k] = eval(v)
    end

    sql = erb.result(binding)
  end

  task = {
    project: config[:project],
    keyfile: config.param(:keyfile, LocalFile, nil),
    sql: sql,
    params: params,
    option: {
      max: config[:max],
      cache: config[:cache],
      standard_sql: config[:standard_sql],
      legacy_sql: config[:legacy_sql],
      location: config[:location],
    }
  }

  if config[:columns]
    task[:columns] = config[:columns]
  else
    bq = Google::Cloud::Bigquery.new(project: task[:project], keyfile: task[:keyfile])
    task[:job_id], task[:columns] = determine_columns_by_query_results(sql, task[:option], bq)
  end

  columns = []
  task[:columns].each_with_index do |c, i|
    columns << Column.new(i, c['name'], c['type'].to_sym)
  end

  resume(task, columns, 1, &control)
end

Public Instance Methods

as_serializable(v) click to toggle source
# File lib/embulk/input/bigquery.rb, line 169
def as_serializable(v)
  case v
  when ::Google::Cloud::Bigquery::Time
    v.value
  when DateTime
    v.strftime('%Y-%m-%d %H:%M:%S.%6N')
  when Date
    v.strftime('%Y-%m-%d')
  else
    v
  end
end
keys_to_sym(hash) click to toggle source
# File lib/embulk/input/bigquery.rb, line 154
def keys_to_sym(hash)
  ret = {}
  hash.each do |key, value|
    ret[key.to_sym] = value
  end
  ret
end
run() click to toggle source
# File lib/embulk/input/bigquery.rb, line 72
def run
  bq = Google::Cloud::Bigquery.new(project: task[:project], keyfile: task[:keyfile])
  params = @task[:params]
  option = keys_to_sym(@task[:option])

  rows = if @task[:job_id].nil?
           query_option = option.dup
           query_option.delete(:location)

           bq.query(@task[:sql], **query_option) do |job_updater|
             job_updater.location = option[:location] if option[:location]
           end
         else
           job_option = {}
           job_option[:location] = option[:location] if option[:location]

           bq.job(@task[:job_id], **job_option).query_results(max: option[:max])
         end

  @task[:columns] = values_to_sym(@task[:columns], 'name')

  rows.all do |row|
    columns = []
    @task[:columns].each do |c|
      val = row[c['name'].to_sym]
      val = eval(c['eval'], binding) if c['eval']

      columns << as_serializable(val)
    end

    @page_builder.add(columns)
  end
  @page_builder.finish
  {}
end
values_to_sym(hashs, key) click to toggle source
# File lib/embulk/input/bigquery.rb, line 162
def values_to_sym(hashs, key)
  hashs.map do |h|
    h[key] = h[key].to_sym
    h
  end
end