class Embulk::Input::MixpanelApi::Client
Constants
- DEFAULT_EXPORT_ENDPOINT
- DEFAULT_JQL_ENDPOINT
- JQL_RATE_LIMIT
- PING_RETRY_LIMIT
- PING_RETRY_WAIT
- PING_TIMEOUT_SECONDS
- SMALL_NUM_OF_RECORDS
- TIMEOUT_SECONDS
Attributes
retryer[R]
Public Class Methods
mixpanel_available?(endpoint = nil)
click to toggle source
# File lib/embulk/input/mixpanel_api/client.rb, line 22 def self.mixpanel_available?(endpoint = nil) endpoint ||= DEFAULT_EXPORT_ENDPOINT retryer = PerfectRetry.new do |config| config.limit = PING_RETRY_LIMIT config.sleep = PING_RETRY_WAIT config.logger = Embulk.logger config.log_level = nil end begin retryer.with_retry do client = HTTPClient.new client.force_basic_auth = true client.connect_timeout = PING_TIMEOUT_SECONDS client.set_auth(nil, @api_secret, nil) client.get(URI.join(endpoint, '/')) end true rescue PerfectRetry::TooManyRetry false end end
new(api_secret, endpoint, retryer = nil)
click to toggle source
# File lib/embulk/input/mixpanel_api/client.rb, line 45 def initialize(api_secret, endpoint, retryer = nil) @endpoint = endpoint @api_secret = api_secret @retryer = retryer || PerfectRetry.new do |config| # for test config.limit = 0 config.dont_rescues = [RuntimeError] config.log_level = nil config.logger = Embulk.logger config.raise_original_error = true end end
Public Instance Methods
export(params = {}, &block)
click to toggle source
# File lib/embulk/input/mixpanel_api/client.rb, line 58 def export(params = {}, &block) retryer.with_retry do request(params, &block) end end
export_for_small_dataset(params = {})
click to toggle source
# File lib/embulk/input/mixpanel_api/client.rb, line 64 def export_for_small_dataset(params = {}) yesterday = Date.today - 1 latest_tried_to_date = nil try_to_dates(params["from_date"]).each do |to_date| next if yesterday < to_date latest_tried_to_date = to_date params["to_date"] = to_date.strftime("%Y-%m-%d") records = retryer.with_retry do request_small_dataset(params, SMALL_NUM_OF_RECORDS) end next if records.first.nil? return records end raise ConfigError.new "#{params["from_date"]}..#{latest_tried_to_date} has no record." end
send_jql_script(params = {})
click to toggle source
# File lib/embulk/input/mixpanel_api/client.rb, line 81 def send_jql_script(params = {}) retryer.with_retry do response = request_jql(params) handle_error(response, response.body) begin return JSON.parse(response.body) rescue =>e raise Embulk::DataError.new(e) end end end
send_jql_script_small_dataset(params = {})
click to toggle source
# File lib/embulk/input/mixpanel_api/client.rb, line 93 def send_jql_script_small_dataset(params = {}) retryer.with_retry do response = request_jql(params) handle_error(response, response.body) begin return JSON.parse(response.body)[0..SMALL_NUM_OF_RECORDS - 1] rescue =>e raise Embulk::DataError.new(e.message) end end end
try_to_dates(from_date)
click to toggle source
# File lib/embulk/input/mixpanel_api/client.rb, line 105 def try_to_dates(from_date) try_to_dates = 5.times.map do |n| # from_date + 1, from_date + 10, from_date + 100, ... so on days = 1 * (10 ** n) Date.parse(from_date.to_s) + days end yesterday = Date.today - 1 try_to_dates << yesterday try_to_dates.find_all {|date| date <= yesterday}.uniq end
Private Instance Methods
handle_error(response, error_response)
click to toggle source
# File lib/embulk/input/mixpanel_api/client.rb, line 180 def handle_error(response, error_response) Embulk.logger.debug "response code: #{response.code}" case response.code when 429 # [429] {"error": "too many export requests in progress for this project"} Embulk.logger.info "Hit rate limit sleep for 1 hour" sleep(60 * 60) raise RuntimeError.new("[#{response.code}] #{error_response} (will retry)") when 400..499 raise ConfigError.new("[#{response.code}] #{error_response}") when 500..599 raise RuntimeError.new("[#{response.code}] #{error_response}") end end
httpclient()
click to toggle source
# File lib/embulk/input/mixpanel_api/client.rb, line 195 def httpclient @client ||= begin client = HTTPClient.new client.receive_timeout = TIMEOUT_SECONDS client.tcp_keepalive = true client.default_header = {Accept: "application/json; charset=UTF-8"} # client.debug_dev = STDERR client.force_basic_auth = true client.set_auth(nil, @api_secret, nil); client end end
query_string(prs)
click to toggle source
# File lib/embulk/input/mixpanel_api/client.rb, line 163 def query_string(prs) URI.encode_www_form({ params: JSON.generate(prs[:params]), script: prs[:script] }) end
request(params, &block)
click to toggle source
# File lib/embulk/input/mixpanel_api/client.rb, line 127 def request(params, &block) # https://mixpanel.com/docs/api-documentation/exporting-raw-data-you-inserted-into-mixpanel Embulk.logger.debug "Export param: #{params.to_s}" buf = "" error_response = '' Embulk.logger.info "Sending request to #{@endpoint}" response = httpclient.get(@endpoint, params) do |response, chunk| # Only process data if response status is 200..299 if response.status / 100 == 2 chunk.each_line do |line| begin record = JSON.parse(buf + line) block.call record buf = "" rescue JSON::ParserError=>e buf << line end end else error_response << chunk end end handle_error(response, error_response) if !buf.empty? # buffer is not empty mean the last json line is incomplete Embulk.logger.error "Received incomplete data from Mixpanel, #{buf}" raise MixpanelApi::IncompleteExportResponseError.new("Incomplete data received") end end
request_jql(parameters)
click to toggle source
# File lib/embulk/input/mixpanel_api/client.rb, line 158 def request_jql(parameters) Embulk.logger.info "Sending request to #{@endpoint} params #{parameters}" httpclient.post(@endpoint, query_string(parameters)) end
request_small_dataset(params, num_of_records)
click to toggle source
# File lib/embulk/input/mixpanel_api/client.rb, line 170 def request_small_dataset(params, num_of_records) # guess/preview # Try to fetch first number of records params["limit"] = num_of_records Embulk.logger.info "Sending request to #{@endpoint}" res = httpclient.get(@endpoint, params) handle_error(res, res.body) response_to_enum(res.body) end
response_to_enum(response_body)
click to toggle source
# File lib/embulk/input/mixpanel_api/client.rb, line 118 def response_to_enum(response_body) Enumerator.new do |y| response_body.lines.each do |json| # TODO: raise Embulk::DataError when invalid json given for Embulk 0.7+ y << JSON.parse(json) end end end