class Embulk::Input::Service::JqlService

Constants

FROM_DATE_PARAM
TO_DATE_PARAM

Public Instance Methods

adjust_timezone(epoch) click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 151
def adjust_timezone(epoch)
  # Adjust timezone offset to get UTC time
  # c.f. https://mixpanel.com/docs/api-documentation/exporting-raw-data-you-inserted-into-mixpanel#export
  tz = TZInfo::Timezone.get(@timezone)

  begin
    if epoch.present?
      offset = tz.period_for_local(epoch, true).offset.utc_total_offset
      epoch - offset
    end
      # TZInfo::PeriodNotFound signals that there is no equivalent UTC time (for example,
      # during the transition from standard time to daylight savings time when the clocks are moved forward and an hour is skipped).
      # tz.local_time(2018, 3, 11, 2, 30, 0, 0)
  rescue TZInfo::PeriodNotFound
    epoch + 1.hour
    offset = tz.period_for_local(epoch, true).offset.utc_total_offset
    epoch - offset
  end
end
create_task() click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 18
def create_task
  {
    timezone: @config.param(:timezone, :string, default: ""),
    api_secret: @config.param(:api_secret, :string),
    jql_endpoint: endpoint,
    dates: range,
    incremental: @config.param(:incremental, :bool, default: true),
    slice_range: @config.param(:slice_range, :integer, default: 7),
    schema: @config.param(:columns, :array),
    retry_initial_wait_sec: @config.param(:retry_initial_wait_sec, :integer, default: 1),
    retry_limit: @config.param(:retry_limit, :integer, default: 5),
    incremental_column: @config.param(:incremental_column, :string, default: nil),
    latest_fetched_time: @config.param(:latest_fetched_time, :integer, default: 0),
    jql_mode: true,
    jql_script: @config.param(:jql_script, :string, nil)
  }
end
endpoint() click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 179
def endpoint
  @config.param(:jql_endpoint, :string, default: Embulk::Input::MixpanelApi::Client::DEFAULT_JQL_ENDPOINT)
end
guess_columns() click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 36
def guess_columns
  giveup_when_mixpanel_is_down
  range = guess_range
  Embulk.logger.info "Guessing schema using #{range.first}..#{range.last}"

  client = create_client

  sample_records = client.send_jql_script_small_dataset(parameters(@config.param(:jql_script, :string, nil), range.first, range.last))

  validate_result(sample_records)

  @incremental = @config.param(:incremental, :bool, default: true)
  @incremental_column = @config.param(:incremental_column, :string, default: nil)
  validate_result_contain_incremental_column(sample_records)

  guess_from_records(sample_records)
end
guess_from_records(sample_props) click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 124
def guess_from_records(sample_props)
  validate_result(sample_props)

  begin
    schema = Guess::SchemaGuess.from_hash_records(sample_props)
    schema.map do |col|
      result = {
        name: col.name,
        type: col.type,
      }
      if (col.name.eql? "time") || (col.eql? "last_seen")
        result["format"] = col.format if col.format
      end
      result
    end
  rescue DataError
    raise Embulk::ConfigError.new("Non-supported result #{sample_props}. Revise your JQL.")
  end
end
guess_range() click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 110
def guess_range
  time_zone = @config.param(:timezone, :string, default: "")
  from_date = @config.param(:from_date, :string, default: default_guess_start_date(time_zone).to_s)
  fetch_days = @config.param(:fetch_days, :integer, default: DEFAULT_FETCH_DAYS)

  fetch_days = [fetch_days, DEFAULT_FETCH_DAYS].min

  range = RangeGenerator.new(from_date, fetch_days, time_zone).generate_range
  if range.empty?
    return default_guess_start_date(time_zone)..(today(time_zone) - 1)
  end
  range
end
ingest(task, page_builder) click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 54
def ingest(task, page_builder)
  @dates = task[:dates]
  @schema = task[:schema]
  @timezone = task[:timezone]
  @incremental_column = task[:incremental_column]
  unless @incremental_column
    Embulk.logger.warn "incremental_column should be specified when running in incremental mode to avoid duplicated"
    Embulk.logger.warn "Use default value #{DEFAULT_TIME_COLUMN}"
    @incremental_column = DEFAULT_TIME_COLUMN
  end

  @incremental = task[:incremental]
  latest_fetched_time = task[:latest_fetched_time]

  client = create_client

  ignored_fetched_record_count = 0
  next_fetched_time = latest_fetched_time
  @dates.each_slice(task[:slice_range]) do |slice_dates|
    Embulk.logger.info "Fetching date from #{slice_dates.first}..#{slice_dates.last}"
    if preview?
      records = client.send_jql_script_small_dataset(parameters(@config.param(:jql_script, :string, default: nil), slice_dates.first, slice_dates.last))
    else
      records = client.send_jql_script(parameters(task[:jql_script], slice_dates.first, slice_dates.last))
    end
    validate_result(records)
    records.each do |record|
      if @incremental
        if @schema.map {|col| col["name"]}.include?(@incremental_column)
          record_incremental_column = record[@incremental_column]
          if record_incremental_column
            if record_incremental_column <= latest_fetched_time
              ignored_fetched_record_count += 1
              next
            else
              next_fetched_time = [record_incremental_column, next_fetched_time].max
            end
          end
        else
          raise Embulk::ConfigError.new("Missing Incremental Field (<incremental_column>) in the returned dataset. Specify the correct Incremental Field value.")
        end
      end
      values = extract_values(record)
      page_builder.add(values)
    end
    break if preview?
  end
  Embulk.logger.info "Skip #{ignored_fetched_record_count} rows"
  page_builder.finish

  if task[:incremental] && !preview?
    return create_task_report(next_fetched_time)
  end
  {}
end
next_from_date(task_report) click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 171
def next_from_date(task_report)
  next_to_date = Date.parse(task_report[:to_date])
  {
    from_date: next_to_date.to_s,
    latest_fetched_time: task_report[:latest_fetched_time],
  }
end
parameters(script, from_date, to_date) click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 144
def parameters(script, from_date, to_date)
  {
    params: params(from_date, to_date),
    script: script
  }
end
validate_config() click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 11
def validate_config
  super

  validate_jql_script
  validate_fetch_days
end

Private Instance Methods

create_task_report(next_fetched_time) click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 185
def create_task_report(next_fetched_time)
  {
    to_date: @dates.last || today(@timezone) - 1,
    latest_fetched_time: next_fetched_time.to_s
  }
end
extract_value(record, name) click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 199
def extract_value(record, name)
  case name
  when NOT_PROPERTY_COLUMN
    record[NOT_PROPERTY_COLUMN]
  when "time"
    if record["time"].present?
      value = record["time"]
      if value > 0
        time = record["time"] / 1000
        adjust_timezone(time)
      else
        value
      end
    end
  when "last_seen"
    if record["last_seen"].present?
      value = record["last_seen"]
      if value > 0
        # last_seen format in ms
        time = record["last_seen"] / 1000
        adjust_timezone(time)
      else
        value
      end
    end
  when @incremental_column
    if record[@incremental_column].present?
      value = record[@incremental_column]
      if value > 0
        # format in ms
        time = record[@incremental_column] / 1000
        adjust_timezone(time)
      else
        value
      end
    end
  else
    record[name]
  end
end
params(from_date, to_date) click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 192
def params(from_date, to_date)
  {
    from_date: from_date,
    to_date: to_date
  }
end
validate_fetch_days() click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 266
def validate_fetch_days
  fetch_days = @config.param(:fetch_days, :integer, default: nil)
  if fetch_days && fetch_days <= 0
    raise Embulk::ConfigError.new("fetch_days should be larger than 0")
  end
end
validate_jql_script() click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 259
def validate_jql_script
  jql_script = @config.param(:jql_script, :string, default: nil)
  if jql_script.blank?
    raise Embulk::ConfigError.new("JQL script shouldn't be empty or null")
  end
end
validate_result(records) click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 240
def validate_result(records)
  if records.is_a?(Array) && records.first.is_a?(Integer)
    # incase using reduce, it only return the number of records
    raise Embulk::ConfigError.new("Non-supported result. Revise your JQL.")
  end
end
validate_result_contain_incremental_column(records) click to toggle source
# File lib/embulk/input/service/jql_service.rb, line 247
def validate_result_contain_incremental_column(records)
  unless @incremental_column
    Embulk.logger.warn "incremental_column should be specified when running in incremental mode to avoid duplicated"
    Embulk.logger.warn "Use default value #{DEFAULT_TIME_COLUMN}"
    @incremental_column = DEFAULT_TIME_COLUMN
  end

  if @incremental && records.length > 0 && !records[0].include?(@incremental_column)
    raise Embulk::ConfigError.new("Missing Incremental Field (<incremental_column>) in the returned dataset. Specify the correct Incremental Field value.")
  end
end