class Tumugi::Plugin::BigqueryExportTask
Public Instance Methods
run()
click to toggle source
# File lib/tumugi/plugin/task/bigquery_export.rb, line 27 def run unless output.is_a?(Tumugi::Plugin::FileSystemTarget) raise Tumugi::TumugiError.new("BigqueryExportTask#output must be return a instance of Tumugi::Plugin::FileSystemTarget") end client = Tumugi::Plugin::Bigquery::Client.new(config) table = Tumugi::Plugin::Bigquery::Table.new(project_id: client.project_id, dataset_id: dataset_id, table_id: table_id) job_project_id = client.project_id if job_project_id.nil? log "Source: #{table}" log "Destination: #{output}" if is_gcs?(output) export_to_gcs(client) else if destination_format.upcase == 'AVRO' raise Tumugi::TumugiError.new("destination_format='AVRO' is only supported when export to Google Cloud Storage") end if compression.upcase == 'GZIP' logger.warn("compression parameter is ignored, it's only supported when export to Google Cloud Storage") end export_to_file_system(client) end end
Private Instance Methods
config()
click to toggle source
# File lib/tumugi/plugin/task/bigquery_export.rb, line 103 def config cfg = Tumugi.config.section('bigquery').to_h unless project_id.nil? cfg[:project_id] = project_id end cfg end
export_to_file_system(client)
click to toggle source
# File lib/tumugi/plugin/task/bigquery_export.rb, line 71 def export_to_file_system(client) schema ||= client.table(dataset_id, table_id, project_id: client.project_id).schema.fields field_names = schema.map{|f| f.respond_to?(:[]) ? (f["name"] || f[:name]) : f.name } start_index = 0 page_token = nil options = { max_result: page_size, project_id: client.project_id, } output.open('w') do |file| file.puts field_names.join(field_delimiter) if destination_format == 'CSV' && print_header begin table_data_list = client.list_tabledata(dataset_id, table_id, options.merge(start_index: start_index, page_token: page_token)) start_index += page_size page_token = table_data_list[:next_token] table_data_list[:rows].each do |row| file.puts line(field_names, row, destination_format) end end while not page_token.nil? end end
export_to_gcs(client)
click to toggle source
# File lib/tumugi/plugin/task/bigquery_export.rb, line 58 def export_to_gcs(client) options = { compression: compression.upcase, destination_format: destination_format.upcase, field_delimiter: field_delimiter, print_header: print_header, project_id: client.project_id, job_project_id: job_project_id || client.project_id, wait: wait } client.extract(dataset_id, table_id, output.to_s, options) end
is_gcs?(target)
click to toggle source
# File lib/tumugi/plugin/task/bigquery_export.rb, line 54 def is_gcs?(target) not target.to_s.match(/^gs:\/\/[^\/]+\/.+$/).nil? end
line(field_names, row, format)
click to toggle source
# File lib/tumugi/plugin/task/bigquery_export.rb, line 94 def line(field_names, row, format) case format when 'CSV' row.map{|v| v[1]}.join(field_delimiter) when 'NEWLINE_DELIMITED_JSON' JSON.generate(row.to_h) end end