class Embulk::Input::Service::BaseService

Constants

DEFAULT_FETCH_DAYS
DEFAULT_TIME_COLUMN
NOT_PROPERTY_COLUMN

Public Class Methods

new(config) click to toggle source
# File lib/embulk/input/service/base_service.rb, line 18
def initialize(config)
  @config = config
end

Public Instance Methods

default_guess_start_date(timezone) click to toggle source
# File lib/embulk/input/service/base_service.rb, line 22
def default_guess_start_date(timezone)
  today(timezone) - DEFAULT_FETCH_DAYS - 1
end

Protected Instance Methods

adjust_timezone(epoch) click to toggle source
# File lib/embulk/input/service/base_service.rb, line 43
def adjust_timezone(epoch)
  # Adjust timezone offset to get UTC time
  # c.f. https://mixpanel.com/docs/api-documentation/exporting-raw-data-you-inserted-into-mixpanel#export
  if epoch.present?
    tz = TZInfo::Timezone.get(@timezone)
    offset = tz.period_for_local(epoch, true).offset.utc_total_offset
    epoch - offset
  end
end
create_client() click to toggle source
# File lib/embulk/input/service/base_service.rb, line 76
def create_client
  if @client.present?
    @client
  else
    retryer = perfect_retry({
      # retry_initial_wait_sec: @config[:retry_initial_wait_sec] ? @config[:retry_initial_wait_sec] : 1,
      # retry_limit: @config[:retry_limit] ?  @config[:retry_limit] : 5,
      retry_initial_wait_sec: @config.param(:retry_initial_wait_sec, :integer, default: 1),
      retry_limit: @config.param(:retry_limit, :integer, default: 5),
    })
    MixpanelApi::Client.new(@config.param(:api_secret, :string), endpoint, retryer)
  end
end
extract_values(record) click to toggle source
# File lib/embulk/input/service/base_service.rb, line 62
def extract_values(record)
  @schema.map do |column|
    extract_value(record, column["name"])
  end
end
giveup_when_mixpanel_is_down() click to toggle source
# File lib/embulk/input/service/base_service.rb, line 37
def giveup_when_mixpanel_is_down
  unless MixpanelApi::Client.mixpanel_available?(endpoint)
    raise Embulk::DataError.new("Mixpanel service is down. Please retry later.")
  end
end
perfect_retry(task) click to toggle source
# File lib/embulk/input/service/base_service.rb, line 90
def perfect_retry(task)
  PerfectRetry.new do |config|
    config.limit = task[:retry_limit]
    config.sleep = proc {|n| task[:retry_initial_wait_sec] * (2 * (n - 1))}
    config.dont_rescues = [Embulk::ConfigError, MixpanelApi::IncompleteExportResponseError]
    config.rescues = [RuntimeError]
    config.log_level = nil
    config.logger = Embulk.logger
  end
end
preview?() click to toggle source
# File lib/embulk/input/service/base_service.rb, line 68
def preview?
  begin
    org.embulk.spi.Exec.isPreview()
  rescue java.lang.NullPointerException=>e
    false
  end
end
range() click to toggle source
# File lib/embulk/input/service/base_service.rb, line 101
def range
  timezone = @config.param(:timezone, :string, default: "")
  from_date = @config.param(:from_date, :string, default: (today(timezone) - 2).to_s)
  incremental = @config.param(:incremental, :bool, default: true)
  incremental_column = @config.param(:incremental_column, :string, default: nil)
  latest_fetched_time =  @config.param(:latest_fetched_time, :integer, default: 0)
  fetch_days = @config.param(:fetch_days, :integer, default: nil)

  # Backfill from date if incremental and an incremental field is set and we are in incremental run
  if incremental && incremental_column && latest_fetched_time !=0
    back_fill_days = @config.param(:back_fill_days, :integer, default: 5)
    Embulk.logger.info "Backfill days #{back_fill_days}"
    from_date = (Date.parse(from_date) - back_fill_days).to_s
    fetch_days = fetch_days.nil? ? nil : fetch_days + back_fill_days
  end

  RangeGenerator.new(from_date, fetch_days, timezone).generate_range
end
today(timezone) click to toggle source
# File lib/embulk/input/service/base_service.rb, line 53
def today(timezone)
  if timezone.nil?
    Date.today
  else
    zone = ActiveSupport::TimeZone[timezone]
    zone.nil? ? Date.today : zone.today
  end
end
validate_config() click to toggle source
# File lib/embulk/input/service/base_service.rb, line 28
def validate_config
  timezone = @config.param(:timezone, :string)
  validate_timezone(timezone)
end
validate_timezone(timezone) click to toggle source
# File lib/embulk/input/service/base_service.rb, line 33
def validate_timezone(timezone)
  TimezoneValidator.new(timezone).validate
end