class Embulk::Input::InputBigquery

Public Class Methods

transaction(config) { |task, columns, 1| ... } click to toggle source
# File lib/embulk/input/big-query-async.rb, line 22
def self.transaction(config, &control)
                sql = config[:sql]
                params = {}
                unless sql
                        sql_erb = config[:sql_erb]
                        erb = ERB.new(sql_erb)
                        erb_params = config[:erb_params]
                        erb_params.each do |k, v|
                                params[k] = eval(v)
                        end

                        sql = erb.result(binding)
                end

                task = {
                        project: config[:project],
                        keyfile: config.param(:keyfile, LocalFile, nil),
                        sql: sql,
                        columns: config[:columns],
                        params: params,
                        synchronous_method: config[:synchronous_method],
                        asynchronous_method: config[:asynchronous_method],
                        dataset: config[:dataset],
                        table: config[:table],
                        option: {
                                cache: config[:cache],
                                standard_sql: config[:standard_sql],
                                legacy_sql: config[:legacy_sql],
                        }
                }

                if task[:synchronous_method] || !task[:asynchronous_method]
                        task[:option].merge!(
                                {
                                        max: config[:max],
                                        timeout: config[:timeout],
                                        dryrun:  config[:dryrun],
                                }
                        )
                else
                        task[:option].merge!(
                                {
                                        large_results: config[:legacy_sql],
                                        write: config[:write],
                                }
                        )
                end

                columns = []
                config[:columns].each_with_index do |c, i|
                        columns << Column.new(i, c['name'], c['type'].to_sym)
                end

                yield(task, columns, 1)

                return {}
        end

Public Instance Methods

extract_record(row) click to toggle source
# File lib/embulk/input/big-query-async.rb, line 123
def extract_record(row)
        columns = []
        @task[:columns].each do |c|
                val = row[c['name']]
                if c['eval']
                        val = eval(c['eval'], binding)
                end
                columns << val
        end
        return columns
end
keys_to_sym(hash) click to toggle source
# File lib/embulk/input/big-query-async.rb, line 142
def keys_to_sym(hash)
        ret = {}
        hash.each do |key, value|
                ret[key.to_sym] = value
        end
        ret
end
run() click to toggle source
# File lib/embulk/input/big-query-async.rb, line 80
def run
        bq = Google::Cloud::Bigquery.new(project: @task[:project], keyfile: @task[:keyfile])
        params = @task[:params]
        @task[:columns] = values_to_sym(@task[:columns], 'name')
        option = keys_to_sym(@task[:option])
        if @task[:synchronous_method] || @task[:asynchronous_method].nil?
                run_synchronous_query(bq, option)
        else
                if @task[:dataset]
                        dataset = bq.dataset(@task[:dataset])
                        option[:table] = dataset.table(@task[:table])
                        if option[:table].nil?
                                option[:table] = dataset.create_table(@task[:table])
                        end
                end
                run_asynchronous_query(bq, option)
        end
        @page_builder.finish
        return {}
end
run_asynchronous_query(bq, option) click to toggle source
# File lib/embulk/input/big-query-async.rb, line 109
def run_asynchronous_query(bq, option)
        job = bq.query_job(@task[:sql], **option)
        job.wait_until_done!
        return {} if job.failed?
        results = job.query_results
        while results
                results.each do |row|
                        record = extract_record(row)
                        @page_builder.add(record)
                end
                results = results.next
        end
end
run_synchronous_query(bq, option) click to toggle source
# File lib/embulk/input/big-query-async.rb, line 101
def run_synchronous_query(bq, option)
        rows = bq.query(@task[:sql], **option)
        rows.each do |row|
                record = extract_record(row)
                @page_builder.add(record)
        end
end
values_to_sym(hashs, key) click to toggle source
# File lib/embulk/input/big-query-async.rb, line 135
def values_to_sym(hashs, key)
        hashs.map do |h|
                h[key] = h[key].to_sym
                h
        end
end