class Embulk::Input::Service::ExportService
Constants
- KNOWN_KEYS
mixpanel.com/help/questions/articles/special-or-reserved-properties mixpanel.com/help/questions/articles/what-properties-do-mixpanels-libraries-store-by-default
JavaScript to extract key names from HTML: run it on Chrome Devtool when opening their document > Array.from(document.querySelectorAll(“strong”)).map(function(s){ return s.textContent.match(//) ? s.parentNode.textContent.match(/((.*?))/) : s.textContent.split(“,”).join(“ ”) }).join(“ ”) > Array.from(document.querySelectorAll(“li”)).map(function(s){ m = s.textContent.match(/((.*?))/); return m && m }).filter(function(k) { return k && !k.match(“utm”) }).join(“ ”)
Public Instance Methods
create_task()
click to toggle source
# File lib/embulk/input/service/export_service.rb, line 37 def create_task { params: export_params, dates: range, timezone: @config.param(:timezone, :string, default: ""), export_endpoint: endpoint, api_secret: @config.param(:api_secret, :string), schema: @config.param(:columns, :array), fetch_unknown_columns: @config.param(:fetch_unknown_columns, :bool, default: false), fetch_custom_properties: @config.param(:fetch_custom_properties, :bool, default: true), retry_initial_wait_sec: @config.param(:retry_initial_wait_sec, :integer, default: 1), incremental_column: @config.param(:incremental_column, :string, default: nil), retry_limit: @config.param(:retry_limit, :integer, default: 5), latest_fetched_time: @config.param(:latest_fetched_time, :integer, default: 0), incremental: @config.param(:incremental, :bool, default: true), slice_range: @config.param(:slice_range, :integer, default: 7), job_start_time: Time.now.to_i * 1000, incremental_column_upper_limit: incremental_column_upper_limit, allow_partial_import: @config.param(:allow_partial_import, :bool, default: true) } end
create_task_report(current_latest_fetched_time, to_date, timezone)
click to toggle source
# File lib/embulk/input/service/export_service.rb, line 146 def create_task_report(current_latest_fetched_time, to_date, timezone) { latest_fetched_time: current_latest_fetched_time, to_date: to_date || today(timezone) - 1, } end
endpoint()
click to toggle source
# File lib/embulk/input/service/export_service.rb, line 233 def endpoint @config.param(:export_endpoint, :string, default: Embulk::Input::MixpanelApi::Client::DEFAULT_EXPORT_ENDPOINT) end
export_params()
click to toggle source
# File lib/embulk/input/service/export_service.rb, line 178 def export_params event = @config.param(:event, :array, default: nil) event = event.nil? ? nil : event.to_json { event: event, where: @config.param(:where, :string, default: nil), bucket: @config.param(:bucket, :string, default: nil), } end
fetch(dates, last_fetch_time, task, &block)
click to toggle source
# File lib/embulk/input/service/export_service.rb, line 205 def fetch(dates, last_fetch_time, task, &block) from_date = dates.first to_date = dates.last params = task[:params].merge( "from_date"=>from_date, "to_date"=>to_date ) incremental_column = task[:incremental_column] if !incremental_column.nil? # can't do filter on time column, time column need to be filter manually. params = params.merge( "where"=>"#{params['where'].nil? ? '' : "(#{params['where']}) and " }properties[\"#{incremental_column}\"] > #{last_fetch_time || 0} and properties[\"#{incremental_column}\"] < #{task[:incremental_column_upper_limit]}" ) end Embulk.logger.info "Where params is #{params["where"]}" client = create_client if preview? client.export_for_small_dataset(params) else Enumerator.new do |y| client.export(params) do |record| y << record end end end end
guess_columns()
click to toggle source
# File lib/embulk/input/service/export_service.rb, line 153 def guess_columns giveup_when_mixpanel_is_down range = guess_range Embulk.logger.info "Guessing schema using #{range.first}..#{range.last} records" params = export_params.merge( "from_date"=>range.first, "to_date"=>range.last, ) client = create_client guess_from_records(client.export_for_small_dataset(params)) end
guess_from_records(records)
click to toggle source
# File lib/embulk/input/service/export_service.rb, line 188 def guess_from_records(records) sample_props = records.map {|r| r["properties"]} schema = Guess::SchemaGuess.from_hash_records(sample_props) columns = schema.map do |col| next if col.name == "time" result = { name: col.name, type: col.type, } result["format"] = col.format if col.format result end.compact columns.unshift(name: NOT_PROPERTY_COLUMN, type: :string) # Shift incremental column to top columns.unshift(name: "time", type: :long) end
guess_range()
click to toggle source
# File lib/embulk/input/service/export_service.rb, line 167 def guess_range time_zone = @config.param(:timezone, :string, default: "") from_date = @config.param(:from_date, :string, default: default_guess_start_date(time_zone).to_s) fetch_days = @config.param(:fetch_days, :integer, default: DEFAULT_FETCH_DAYS) range = RangeGenerator.new(from_date, fetch_days, time_zone).generate_range if range.empty? return default_guess_start_date(time_zone)..(today(time_zone) - 1) end range end
ingest(task, page_builder)
click to toggle source
# File lib/embulk/input/service/export_service.rb, line 67 def ingest(task, page_builder) giveup_when_mixpanel_is_down @schema = task[:schema] @timezone = task[:timezone] Embulk.logger.info "Job start time is #{task[:job_start_time]}" dates = task[:dates] prev_latest_fetched_time = task[:latest_fetched_time] || 0 prev_latest_fetched_time_format = Time.at(prev_latest_fetched_time).strftime("%F %T %z") current_latest_fetched_time = prev_latest_fetched_time incremental_column = task[:incremental_column] incremental = task[:incremental] fetch_unknown_columns = task[:fetch_unknown_columns] dates.each_slice(task[:slice_range]) do |slice_dates| ignored_fetched_record_count = 0 # There is the issue with Mixpanel time field during the transition from standard to daylight saving time # in the US timezone i.e. 11 Mar 2018 2AM - 2:59AM, time within that period must not be existed, # due to daylight saving, time will be forwarded 1 hour from 2AM to 3AM. # # All of records with wrong timezone will be ignored instead of throw exception out ignored_wrong_daylight_tz_record_count = 0 unless preview? Embulk.logger.info "Fetching data from #{slice_dates.first} to #{slice_dates.last} ..." end record_time_column = incremental_column || DEFAULT_TIME_COLUMN begin fetch(slice_dates, prev_latest_fetched_time, task).each do |record| if incremental if !record["properties"].include?(record_time_column) raise Embulk::ConfigError.new("Incremental column not exists in fetched data #{record_time_column}") end record_time = record["properties"][record_time_column] if incremental_column.nil? if record_time <= prev_latest_fetched_time ignored_fetched_record_count += 1 next end end current_latest_fetched_time = [ current_latest_fetched_time, record_time, ].max end begin values = extract_values(record) if fetch_unknown_columns unknown_values = extract_unknown_values(record) values << unknown_values.to_json end if task[:fetch_custom_properties] values << collect_custom_properties(record) end page_builder.add(values) rescue TZInfo::PeriodNotFound ignored_wrong_daylight_tz_record_count += 1 end end rescue MixpanelApi::IncompleteExportResponseError if !task[:allow_partial_import] # re raise the exception if we don't allow partial import raise end end if ignored_fetched_record_count > 0 Embulk.logger.warn "Skipped already loaded #{ignored_fetched_record_count} records. These record times are older or equal than previous fetched record time (#{prev_latest_fetched_time} @ #{prev_latest_fetched_time_format})." end if ignored_wrong_daylight_tz_record_count > 0 Embulk.logger.warn "Skipped #{ignored_wrong_daylight_tz_record_count} records due to corrupted Mixpanel time transition from standard to daylight saving" end break if preview? end page_builder.finish create_task_report(current_latest_fetched_time, dates.last, task[:timezone]) end
next_from_date(task_report)
click to toggle source
# File lib/embulk/input/service/export_service.rb, line 59 def next_from_date(task_report) next_to_date = Date.parse(task_report[:to_date]) { from_date: next_to_date.to_s, latest_fetched_time: task_report[:latest_fetched_time], } end
validate_config()
click to toggle source
Calls superclass method
Embulk::Input::Service::BaseService#validate_config
# File lib/embulk/input/service/export_service.rb, line 20 def validate_config super incremental_column = @config.param(:incremental_column, :string, default: nil) latest_fetched_time = @config.param(:latest_fetched_time, :integer, default: 0) fetch_custom_properties = @config.param(:fetch_custom_properties, :bool, default: true) fetch_unknown_columns = @config.param(:fetch_unknown_columns, :bool, default: false) if !incremental_column.nil? && !latest_fetched_time.nil? && (incremental_column_upper_limit <= latest_fetched_time) raise Embulk::ConfigError.new("Incremental column upper limit (job_start_time - incremental_column_upper_limit_delay_in_seconds) can't be smaller or equal latest fetched time #{latest_fetched_time}") end if fetch_unknown_columns && fetch_custom_properties raise Embulk::ConfigError.new("Don't set true both `fetch_unknown_columns` and `fetch_custom_properties`.") end end
Private Instance Methods
collect_custom_properties(record)
click to toggle source
# File lib/embulk/input/service/export_service.rb, line 257 def collect_custom_properties(record) specified_columns = @schema.map {|col| col["name"]} custom_keys = record["properties"].keys.find_all {|key| !KNOWN_KEYS.include?(key.to_s) && !specified_columns.include?(key.to_s)} custom_keys.inject({}) do |result, key| result.merge({ key=>record["properties"][key] }) end end
extract_unknown_values(record)
click to toggle source
# File lib/embulk/input/service/export_service.rb, line 267 def extract_unknown_values(record) record_keys = record["properties"].keys + [NOT_PROPERTY_COLUMN] schema_keys = @schema.map {|column| column["name"]} unknown_keys = record_keys - schema_keys unless unknown_keys.empty? Embulk.logger.warn("Unknown columns exists in record: #{unknown_keys.join(', ')}") end unknown_keys.inject({}) do |result, key| result[key] = extract_value(record, key) result end end
extract_value(record, name)
click to toggle source
# File lib/embulk/input/service/export_service.rb, line 245 def extract_value(record, name) case name when NOT_PROPERTY_COLUMN record[NOT_PROPERTY_COLUMN] when "time" time = record["properties"]["time"] adjust_timezone(time) else record["properties"][name] end end
incremental_column_upper_limit()
click to toggle source
# File lib/embulk/input/service/export_service.rb, line 239 def incremental_column_upper_limit job_start_time = Time.now.to_i * 1000 upper_limit_delay = @config.param(:incremental_column_upper_limit_delay_in_seconds, :integer, default: 0) job_start_time - (upper_limit_delay * 1000) end