class Embulk::Input::Service::ExportService

Constants

KNOWN_KEYS

mixpanel.com/help/questions/articles/special-or-reserved-properties mixpanel.com/help/questions/articles/what-properties-do-mixpanels-libraries-store-by-default

JavaScript to extract key names from HTML: run it on Chrome Devtool when opening their document > Array.from(document.querySelectorAll(“strong”)).map(function(s){ return s.textContent.match(//) ? s.parentNode.textContent.match(/((.*?))/) : s.textContent.split(“,”).join(“ ”) }).join(“ ”) > Array.from(document.querySelectorAll(“li”)).map(function(s){ m = s.textContent.match(/((.*?))/); return m && m }).filter(function(k) { return k && !k.match(“utm”) }).join(“ ”)

Public Instance Methods

create_task() click to toggle source
# File lib/embulk/input/service/export_service.rb, line 37
def create_task
  {
    params: export_params,
    dates: range,
    timezone: @config.param(:timezone, :string, default: ""),
    export_endpoint: endpoint,
    api_secret: @config.param(:api_secret, :string),
    schema: @config.param(:columns, :array),
    fetch_unknown_columns: @config.param(:fetch_unknown_columns, :bool, default: false),
    fetch_custom_properties: @config.param(:fetch_custom_properties, :bool, default: true),
    retry_initial_wait_sec: @config.param(:retry_initial_wait_sec, :integer, default: 1),
    incremental_column: @config.param(:incremental_column, :string, default: nil),
    retry_limit: @config.param(:retry_limit, :integer, default: 5),
    latest_fetched_time: @config.param(:latest_fetched_time, :integer, default: 0),
    incremental: @config.param(:incremental, :bool, default: true),
    slice_range: @config.param(:slice_range, :integer, default: 7),
    job_start_time: Time.now.to_i * 1000,
    incremental_column_upper_limit: incremental_column_upper_limit,
    allow_partial_import: @config.param(:allow_partial_import, :bool, default: true)
  }
end
create_task_report(current_latest_fetched_time, to_date, timezone) click to toggle source
# File lib/embulk/input/service/export_service.rb, line 146
def create_task_report(current_latest_fetched_time, to_date, timezone)
  {
    latest_fetched_time: current_latest_fetched_time,
    to_date: to_date || today(timezone) - 1,
  }
end
endpoint() click to toggle source
# File lib/embulk/input/service/export_service.rb, line 233
def endpoint
  @config.param(:export_endpoint, :string, default: Embulk::Input::MixpanelApi::Client::DEFAULT_EXPORT_ENDPOINT)
end
export_params() click to toggle source
# File lib/embulk/input/service/export_service.rb, line 178
def export_params
  event = @config.param(:event, :array, default: nil)
  event = event.nil? ? nil : event.to_json
  {
    event: event,
    where: @config.param(:where, :string, default: nil),
    bucket: @config.param(:bucket, :string, default: nil),
  }
end
fetch(dates, last_fetch_time, task, &block) click to toggle source
# File lib/embulk/input/service/export_service.rb, line 205
def fetch(dates, last_fetch_time, task, &block)
  from_date = dates.first
  to_date = dates.last
  params = task[:params].merge(
    "from_date"=>from_date,
    "to_date"=>to_date
  )
  incremental_column = task[:incremental_column]
  if !incremental_column.nil? # can't do filter on time column, time column need to be filter manually.
    params = params.merge(
      "where"=>"#{params['where'].nil? ? '' : "(#{params['where']}) and " }properties[\"#{incremental_column}\"] > #{last_fetch_time || 0} and properties[\"#{incremental_column}\"] < #{task[:incremental_column_upper_limit]}"
    )
  end
  Embulk.logger.info "Where params is #{params["where"]}"

  client = create_client

  if preview?
    client.export_for_small_dataset(params)
  else
    Enumerator.new do |y|
      client.export(params) do |record|
        y << record
      end
    end
  end
end
guess_columns() click to toggle source
# File lib/embulk/input/service/export_service.rb, line 153
def guess_columns
  giveup_when_mixpanel_is_down
  range = guess_range
  Embulk.logger.info "Guessing schema using #{range.first}..#{range.last} records"

  params = export_params.merge(
    "from_date"=>range.first,
    "to_date"=>range.last,
  )

  client = create_client
  guess_from_records(client.export_for_small_dataset(params))
end
guess_from_records(records) click to toggle source
# File lib/embulk/input/service/export_service.rb, line 188
def guess_from_records(records)
  sample_props = records.map {|r| r["properties"]}
  schema = Guess::SchemaGuess.from_hash_records(sample_props)
  columns = schema.map do |col|
    next if col.name == "time"
    result = {
      name: col.name,
      type: col.type,
    }
    result["format"] = col.format if col.format
    result
  end.compact
  columns.unshift(name: NOT_PROPERTY_COLUMN, type: :string)
  # Shift incremental column to top
  columns.unshift(name: "time", type: :long)
end
guess_range() click to toggle source
# File lib/embulk/input/service/export_service.rb, line 167
def guess_range
  time_zone = @config.param(:timezone, :string, default: "")
  from_date = @config.param(:from_date, :string, default: default_guess_start_date(time_zone).to_s)
  fetch_days = @config.param(:fetch_days, :integer, default: DEFAULT_FETCH_DAYS)
  range = RangeGenerator.new(from_date, fetch_days, time_zone).generate_range
  if range.empty?
    return default_guess_start_date(time_zone)..(today(time_zone) - 1)
  end
  range
end
ingest(task, page_builder) click to toggle source
# File lib/embulk/input/service/export_service.rb, line 67
def ingest(task, page_builder)
  giveup_when_mixpanel_is_down

  @schema = task[:schema]
  @timezone = task[:timezone]

  Embulk.logger.info "Job start time is #{task[:job_start_time]}"

  dates = task[:dates]
  prev_latest_fetched_time = task[:latest_fetched_time] || 0
  prev_latest_fetched_time_format = Time.at(prev_latest_fetched_time).strftime("%F %T %z")
  current_latest_fetched_time = prev_latest_fetched_time
  incremental_column = task[:incremental_column]
  incremental = task[:incremental]
  fetch_unknown_columns = task[:fetch_unknown_columns]

  dates.each_slice(task[:slice_range]) do |slice_dates|
    ignored_fetched_record_count = 0
    # There is the issue with Mixpanel time field during the transition from standard to daylight saving time
    # in the US timezone i.e. 11 Mar 2018 2AM - 2:59AM, time within that period must not be existed,
    # due to daylight saving, time will be forwarded 1 hour from 2AM to 3AM.
    #
    # All of records with wrong timezone will be ignored instead of throw exception out
    ignored_wrong_daylight_tz_record_count = 0
    unless preview?
      Embulk.logger.info "Fetching data from #{slice_dates.first} to #{slice_dates.last} ..."
    end
    record_time_column = incremental_column || DEFAULT_TIME_COLUMN
    begin
      fetch(slice_dates, prev_latest_fetched_time, task).each do |record|
        if incremental
          if !record["properties"].include?(record_time_column)
            raise Embulk::ConfigError.new("Incremental column not exists in fetched data #{record_time_column}")
          end
          record_time = record["properties"][record_time_column]
          if incremental_column.nil?
            if record_time <= prev_latest_fetched_time
              ignored_fetched_record_count += 1
              next
            end
          end

          current_latest_fetched_time = [
            current_latest_fetched_time,
            record_time,
          ].max
        end
        begin
          values = extract_values(record)
          if fetch_unknown_columns
            unknown_values = extract_unknown_values(record)
            values << unknown_values.to_json
          end
          if task[:fetch_custom_properties]
            values << collect_custom_properties(record)
          end
          page_builder.add(values)
        rescue TZInfo::PeriodNotFound
          ignored_wrong_daylight_tz_record_count += 1
        end
      end
    rescue MixpanelApi::IncompleteExportResponseError
      if !task[:allow_partial_import]
        #   re raise the exception if we don't allow partial import
        raise
      end
    end
    if ignored_fetched_record_count > 0
      Embulk.logger.warn "Skipped already loaded #{ignored_fetched_record_count} records. These record times are older or equal than previous fetched record time (#{prev_latest_fetched_time} @ #{prev_latest_fetched_time_format})."
    end
    if ignored_wrong_daylight_tz_record_count > 0
      Embulk.logger.warn "Skipped #{ignored_wrong_daylight_tz_record_count} records due to corrupted Mixpanel time transition from standard to daylight saving"
    end
    break if preview?
  end
  page_builder.finish
  create_task_report(current_latest_fetched_time, dates.last, task[:timezone])
end
next_from_date(task_report) click to toggle source
# File lib/embulk/input/service/export_service.rb, line 59
def next_from_date(task_report)
  next_to_date = Date.parse(task_report[:to_date])
  {
    from_date: next_to_date.to_s,
    latest_fetched_time: task_report[:latest_fetched_time],
  }
end
validate_config() click to toggle source
# File lib/embulk/input/service/export_service.rb, line 20
def validate_config
  super

  incremental_column = @config.param(:incremental_column, :string, default: nil)
  latest_fetched_time = @config.param(:latest_fetched_time, :integer, default: 0)
  fetch_custom_properties = @config.param(:fetch_custom_properties, :bool, default: true)
  fetch_unknown_columns = @config.param(:fetch_unknown_columns, :bool, default: false)

  if !incremental_column.nil? && !latest_fetched_time.nil? && (incremental_column_upper_limit <= latest_fetched_time)
    raise Embulk::ConfigError.new("Incremental column upper limit (job_start_time - incremental_column_upper_limit_delay_in_seconds) can't be smaller or equal latest fetched time #{latest_fetched_time}")
  end

  if fetch_unknown_columns && fetch_custom_properties
    raise Embulk::ConfigError.new("Don't set true both `fetch_unknown_columns` and `fetch_custom_properties`.")
  end
end

Private Instance Methods

collect_custom_properties(record) click to toggle source
# File lib/embulk/input/service/export_service.rb, line 257
def collect_custom_properties(record)
  specified_columns = @schema.map {|col| col["name"]}
  custom_keys = record["properties"].keys.find_all {|key| !KNOWN_KEYS.include?(key.to_s) && !specified_columns.include?(key.to_s)}
  custom_keys.inject({}) do |result, key|
    result.merge({
      key=>record["properties"][key]
    })
  end
end
extract_unknown_values(record) click to toggle source
# File lib/embulk/input/service/export_service.rb, line 267
def extract_unknown_values(record)
  record_keys = record["properties"].keys + [NOT_PROPERTY_COLUMN]
  schema_keys = @schema.map {|column| column["name"]}
  unknown_keys = record_keys - schema_keys

  unless unknown_keys.empty?
    Embulk.logger.warn("Unknown columns exists in record: #{unknown_keys.join(', ')}")
  end

  unknown_keys.inject({}) do |result, key|
    result[key] = extract_value(record, key)
    result
  end
end
extract_value(record, name) click to toggle source
# File lib/embulk/input/service/export_service.rb, line 245
def extract_value(record, name)
  case name
  when NOT_PROPERTY_COLUMN
    record[NOT_PROPERTY_COLUMN]
  when "time"
    time = record["properties"]["time"]
    adjust_timezone(time)
  else
    record["properties"][name]
  end
end
incremental_column_upper_limit() click to toggle source
# File lib/embulk/input/service/export_service.rb, line 239
def incremental_column_upper_limit
  job_start_time = Time.now.to_i * 1000
  upper_limit_delay = @config.param(:incremental_column_upper_limit_delay_in_seconds, :integer, default: 0)
  job_start_time - (upper_limit_delay * 1000)
end