class Embulk::Input::Lkqd
Constants
- DEFAULT_COLUMNS
Public Class Methods
guess(config)
click to toggle source
# File lib/embulk/input/lkqd.rb, line 68 def self.guess(config) return {} end
guess_column_type(metrics, column_name)
click to toggle source
# File lib/embulk/input/lkqd.rb, line 76 def self.guess_column_type(metrics, column_name) column_name.gsub!(/^\W/, '') column_option = DEFAULT_COLUMNS[column_name] if column_option return {type: column_option['type'].to_sym, name: column_name} else return {type: :string, name: column_name} end end
measurable_impressions?(task)
click to toggle source
# File lib/embulk/input/lkqd.rb, line 109 def self.measurable_impressions?(task) task['measurable_impressions'] && task['viewability_measured_rate_index'] && task['impressions_index'] end
request_lkqd(config)
click to toggle source
# File lib/embulk/input/lkqd.rb, line 72 def self.request_lkqd(config) ::HTTP.auth("Basic #{config[:authorization]}").post(config[:endpoint], json: config[:report_parameters]) end
resume(task, columns, count) { |task, columns, count| ... }
click to toggle source
# File lib/embulk/input/lkqd.rb, line 61 def self.resume(task, columns, count, &control) task_reports = yield(task, columns, count) next_config_diff = {} return next_config_diff end
transaction(config, &control)
click to toggle source
# File lib/embulk/input/lkqd.rb, line 14 def self.transaction(config, &control) task = { "secret_key_id" => config.param("secret_key_id", :string), "secret_key" => config.param("secret_key", :string), "endpoint" => config.param("endpoint", :string, default: 'https://api.lkqd.com/reports'), "copy_temp_to" => config.param("copy_temp_to", :string, default: nil), "measurable_impressions" => config.param("measurable_impressions", :bool, default: false), "viewable_impressions" => config.param("viewable_impressions", :bool, default: false), "report_parameters" => config.param("report_parameters", :hash, default: {}), } task['authorization'] = Base64.urlsafe_encode64("#{task['secret_key_id']}:#{task['secret_key']}") response = request_lkqd({authorization: task['authorization'], endpoint: task['endpoint'], report_parameters: task['report_parameters']}) tempfile = Tempfile.new('embulk-input-lkqd_') while chunk = response.body.readpartial tempfile.write chunk end tempfile.close task['tempfile_path'] = tempfile.path FileUtils.cp(task['tempfile_path'], task['copy_temp_to']) if task['copy_temp_to'] columns = [] ::CSV.foreach(tempfile.path).first.each_with_index do |column_name, index| column_type = guess_column_type(task['report_parameters']['metrics'], column_name) column = Column.new({index: index}.merge(column_type)) if column.name == 'Viewability Measured Rate' task['viewability_measured_rate_index'] = index elsif column.name == 'Viewability Rate' task['viewability_rate_index'] = index elsif column.name == 'Impressions' task['impressions_index'] = index end columns << column end if measurable_impressions?(task) columns << Column.new({index: columns.size, type: :double, name: 'Measurable Impressions'}) end if viewable_impressions?(task) columns << Column.new({index: columns.size, type: :double, name: 'Viewable Impressions'}) end resume(task, columns, 1, &control) end
try_convert(row, options={})
click to toggle source
# File lib/embulk/input/lkqd.rb, line 86 def self.try_convert(row, options={}) return row.map do |field| name, value = field column_name = name.gsub(/^\W/, '') column_option = DEFAULT_COLUMNS[column_name] if column_option.nil? next value #elsif column_option['type'] == 'timestamp' # next Time.strptime(value + " " + options[:timezone], column_option['format'] + " %Z").to_i elsif column_option['type'] == 'long' next value.gsub(',','').to_i elsif column_option['type'] == 'double' && value.match(/%$/) # handle x,xxxx.yy% next value.gsub(',','').to_f / 100.0 elsif column_option['type'] == 'double' && value.match(/^\$[\d\.,]+$/) # handle $x,xxxx.yy% next value.gsub(/[$,]/,'').to_f elsif column_option['type'] == 'double' next value.gsub(',','').to_f else next value end end end
viewable_impressions?(task)
click to toggle source
# File lib/embulk/input/lkqd.rb, line 113 def self.viewable_impressions?(task) task['viewable_impressions'] && task['viewability_rate_index'] && task['viewability_measured_rate_index'] && task['impressions_index'] end
Public Instance Methods
init()
click to toggle source
# File lib/embulk/input/lkqd.rb, line 118 def init @task = task end
run()
click to toggle source
# File lib/embulk/input/lkqd.rb, line 122 def run convert_options = {timezone: @task['report_parameters']['timezone']} viewability_measured_rate_index = @task['viewability_measured_rate_index'] viewability_rate_index = @task['viewability_rate_index'] impressions_index = @task['impressions_index'] CSV.foreach(@task['tempfile_path'], {headers: true}).each do |row| row = Lkqd.try_convert(row, convert_options) if Lkqd.measurable_impressions?(@task) row << row[impressions_index] * row[viewability_measured_rate_index] end if Lkqd.viewable_impressions?(@task) row << row[impressions_index] * row[viewability_measured_rate_index] * row[viewability_rate_index] end page_builder.add(row) end page_builder.finish FileUtils.rm_rf(@task['tempfile_path']) task_report = {} return task_report end