class Embulk::Input::Service::JqlService
Constants
- FROM_DATE_PARAM
- TO_DATE_PARAM
Public Instance Methods
adjust_timezone(epoch)
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 151 def adjust_timezone(epoch) # Adjust timezone offset to get UTC time # c.f. https://mixpanel.com/docs/api-documentation/exporting-raw-data-you-inserted-into-mixpanel#export tz = TZInfo::Timezone.get(@timezone) begin if epoch.present? offset = tz.period_for_local(epoch, true).offset.utc_total_offset epoch - offset end # TZInfo::PeriodNotFound signals that there is no equivalent UTC time (for example, # during the transition from standard time to daylight savings time when the clocks are moved forward and an hour is skipped). # tz.local_time(2018, 3, 11, 2, 30, 0, 0) rescue TZInfo::PeriodNotFound epoch + 1.hour offset = tz.period_for_local(epoch, true).offset.utc_total_offset epoch - offset end end
create_task()
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 18 def create_task { timezone: @config.param(:timezone, :string, default: ""), api_secret: @config.param(:api_secret, :string), jql_endpoint: endpoint, dates: range, incremental: @config.param(:incremental, :bool, default: true), slice_range: @config.param(:slice_range, :integer, default: 7), schema: @config.param(:columns, :array), retry_initial_wait_sec: @config.param(:retry_initial_wait_sec, :integer, default: 1), retry_limit: @config.param(:retry_limit, :integer, default: 5), incremental_column: @config.param(:incremental_column, :string, default: nil), latest_fetched_time: @config.param(:latest_fetched_time, :integer, default: 0), jql_mode: true, jql_script: @config.param(:jql_script, :string, nil) } end
endpoint()
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 179 def endpoint @config.param(:jql_endpoint, :string, default: Embulk::Input::MixpanelApi::Client::DEFAULT_JQL_ENDPOINT) end
guess_columns()
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 36 def guess_columns giveup_when_mixpanel_is_down range = guess_range Embulk.logger.info "Guessing schema using #{range.first}..#{range.last}" client = create_client sample_records = client.send_jql_script_small_dataset(parameters(@config.param(:jql_script, :string, nil), range.first, range.last)) validate_result(sample_records) @incremental = @config.param(:incremental, :bool, default: true) @incremental_column = @config.param(:incremental_column, :string, default: nil) validate_result_contain_incremental_column(sample_records) guess_from_records(sample_records) end
guess_from_records(sample_props)
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 124 def guess_from_records(sample_props) validate_result(sample_props) begin schema = Guess::SchemaGuess.from_hash_records(sample_props) schema.map do |col| result = { name: col.name, type: col.type, } if (col.name.eql? "time") || (col.eql? "last_seen") result["format"] = col.format if col.format end result end rescue DataError raise Embulk::ConfigError.new("Non-supported result #{sample_props}. Revise your JQL.") end end
guess_range()
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 110 def guess_range time_zone = @config.param(:timezone, :string, default: "") from_date = @config.param(:from_date, :string, default: default_guess_start_date(time_zone).to_s) fetch_days = @config.param(:fetch_days, :integer, default: DEFAULT_FETCH_DAYS) fetch_days = [fetch_days, DEFAULT_FETCH_DAYS].min range = RangeGenerator.new(from_date, fetch_days, time_zone).generate_range if range.empty? return default_guess_start_date(time_zone)..(today(time_zone) - 1) end range end
ingest(task, page_builder)
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 54 def ingest(task, page_builder) @dates = task[:dates] @schema = task[:schema] @timezone = task[:timezone] @incremental_column = task[:incremental_column] unless @incremental_column Embulk.logger.warn "incremental_column should be specified when running in incremental mode to avoid duplicated" Embulk.logger.warn "Use default value #{DEFAULT_TIME_COLUMN}" @incremental_column = DEFAULT_TIME_COLUMN end @incremental = task[:incremental] latest_fetched_time = task[:latest_fetched_time] client = create_client ignored_fetched_record_count = 0 next_fetched_time = latest_fetched_time @dates.each_slice(task[:slice_range]) do |slice_dates| Embulk.logger.info "Fetching date from #{slice_dates.first}..#{slice_dates.last}" if preview? records = client.send_jql_script_small_dataset(parameters(@config.param(:jql_script, :string, default: nil), slice_dates.first, slice_dates.last)) else records = client.send_jql_script(parameters(task[:jql_script], slice_dates.first, slice_dates.last)) end validate_result(records) records.each do |record| if @incremental if @schema.map {|col| col["name"]}.include?(@incremental_column) record_incremental_column = record[@incremental_column] if record_incremental_column if record_incremental_column <= latest_fetched_time ignored_fetched_record_count += 1 next else next_fetched_time = [record_incremental_column, next_fetched_time].max end end else raise Embulk::ConfigError.new("Missing Incremental Field (<incremental_column>) in the returned dataset. Specify the correct Incremental Field value.") end end values = extract_values(record) page_builder.add(values) end break if preview? end Embulk.logger.info "Skip #{ignored_fetched_record_count} rows" page_builder.finish if task[:incremental] && !preview? return create_task_report(next_fetched_time) end {} end
next_from_date(task_report)
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 171 def next_from_date(task_report) next_to_date = Date.parse(task_report[:to_date]) { from_date: next_to_date.to_s, latest_fetched_time: task_report[:latest_fetched_time], } end
parameters(script, from_date, to_date)
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 144 def parameters(script, from_date, to_date) { params: params(from_date, to_date), script: script } end
validate_config()
click to toggle source
Calls superclass method
Embulk::Input::Service::BaseService#validate_config
# File lib/embulk/input/service/jql_service.rb, line 11 def validate_config super validate_jql_script validate_fetch_days end
Private Instance Methods
create_task_report(next_fetched_time)
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 185 def create_task_report(next_fetched_time) { to_date: @dates.last || today(@timezone) - 1, latest_fetched_time: next_fetched_time.to_s } end
extract_value(record, name)
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 199 def extract_value(record, name) case name when NOT_PROPERTY_COLUMN record[NOT_PROPERTY_COLUMN] when "time" if record["time"].present? value = record["time"] if value > 0 time = record["time"] / 1000 adjust_timezone(time) else value end end when "last_seen" if record["last_seen"].present? value = record["last_seen"] if value > 0 # last_seen format in ms time = record["last_seen"] / 1000 adjust_timezone(time) else value end end when @incremental_column if record[@incremental_column].present? value = record[@incremental_column] if value > 0 # format in ms time = record[@incremental_column] / 1000 adjust_timezone(time) else value end end else record[name] end end
params(from_date, to_date)
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 192 def params(from_date, to_date) { from_date: from_date, to_date: to_date } end
validate_fetch_days()
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 266 def validate_fetch_days fetch_days = @config.param(:fetch_days, :integer, default: nil) if fetch_days && fetch_days <= 0 raise Embulk::ConfigError.new("fetch_days should be larger than 0") end end
validate_jql_script()
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 259 def validate_jql_script jql_script = @config.param(:jql_script, :string, default: nil) if jql_script.blank? raise Embulk::ConfigError.new("JQL script shouldn't be empty or null") end end
validate_result(records)
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 240 def validate_result(records) if records.is_a?(Array) && records.first.is_a?(Integer) # incase using reduce, it only return the number of records raise Embulk::ConfigError.new("Non-supported result. Revise your JQL.") end end
validate_result_contain_incremental_column(records)
click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 247 def validate_result_contain_incremental_column(records) unless @incremental_column Embulk.logger.warn "incremental_column should be specified when running in incremental mode to avoid duplicated" Embulk.logger.warn "Use default value #{DEFAULT_TIME_COLUMN}" @incremental_column = DEFAULT_TIME_COLUMN end if @incremental && records.length > 0 && !records[0].include?(@incremental_column) raise Embulk::ConfigError.new("Missing Incremental Field (<incremental_column>) in the returned dataset. Specify the correct Incremental Field value.") end end