module SourMix::Perform

Constants

PAGE_SIZE

Public Instance Methods

perform() click to toggle source
# File lib/sourmix/perform.rb, line 18
def perform
  mixpanel = Mixpanel::Client.new \
    api_key: options.mixpanel_api_key,
    api_secret: options.mixpanel_api_secret
  dataset = download_dataset mixpanel
  results = process_dataset dataset
  gzip_results results
  report_results results
end

Private Instance Methods

download_dataset(mixpanel) click to toggle source
# File lib/sourmix/perform.rb, line 32
def download_dataset mixpanel
  log.debug event: :download_dataset

  dataset = Tempfile.new 'dataset'

  request_uri = mixpanel.request_uri 'export', \
    from_date: options.date, to_date: options.date

  download_dataset = 'curl -Lf -o %{out} %{url}' % {
    url: Shellwords.escape(request_uri),
    out: Shellwords.escape(dataset.path)
  }

  _, elapsed_s = timed { sh download_dataset }

  log.info event: :download_dataset, exited: $?.exitstatus, elapsed_s: elapsed_s

  dataset
end
gzip_results(results) click to toggle source
# File lib/sourmix/perform.rb, line 103
def gzip_results results
  log.debug event: :gzip_results
  started, total_size = Time.now.to_f, 0

  results.map do |page|
    Thread.new do
      gzip_result  = 'gzip -k %s' % Shellwords.escape(page.path)
      _, elapsed_s = timed { sh gzip_result }

      size = File::Stat.new(page.path + '.gz').size
      total_size += size

      log.debug event: :gzip_result, exited: $?.exitstatus, elapsed_s: elapsed_s, size: size
    end
  end.map(&:join)

  log.info event: :gzip_result, total_elapsed_s: (Time.now.to_f - started), total_size: total_size
end
normalize_event(e) click to toggle source
# File lib/sourmix/perform.rb, line 159
def normalize_event e
  digest = Digest::MD5.hexdigest e.to_s
  e.merge! e.delete('properties')
  e = e.inject({}) do |h,(k,v)|
    k = k.gsub('.','_').gsub(/^\W+/,'')
    h[k] = v unless v.nil? || v == '' || v == [] || v == {}
    h
  end
  e['@timestamp'] = Time.at(e['time']).utc.iso8601(3)
  e['@id'] = digest
  e['type'] = 'mixpanel'
  e
end
process_dataset(dataset) click to toggle source
# File lib/sourmix/perform.rb, line 53
def process_dataset dataset
  log.debug event: :process_dataset

  results = []
  nok, nerr = 0, 0

  _, elapsed_s = timed do
    dataset.each_line do |e|
      if nok % PAGE_SIZE == 0
        log.debug event: :process_dataset_page, okay: nok
        results << Tempfile.new('results')
      end

      begin
        event = JSON.generate(normalize_event(JSON.parse(e)))
        if v2_api?
          results.last.puts ':%s' % event
        else
          results.last.puts event
        end
        nok += 1
      rescue => e
        log.error event: :process_dataset_error, error: e.inspect, line: e
        nerr += 1
      end
    end
  end

  dataset.close
  dataset.unlink
  results.map(&:close)

  ops = 1.0 * (nok + nerr) / elapsed_s

  info = {
    event: :process_dataset,
    elapsed_s: elapsed_s,
    ops: ops,
    pages: results.size
  }

  info[:okay] = nok unless nok.zero?
  info[:errors] = nerr unless nerr.zero?

  log.info info

  results
end
report_results(results) click to toggle source
# File lib/sourmix/perform.rb, line 123
def report_results results
  log.debug event: :report_results

  theon_uri  = URI options.theon_uri
  theon_auth = [ theon_uri.user, theon_uri.password ].join(':')
  theon_url  = '%s://%s:%s%s' % [
    theon_uri.scheme, theon_uri.host, theon_uri.port, theon_uri.path
  ]

  auth_opt = theon_auth.nil? ? nil : '-u %s' % Shellwords.escape(theon_auth)
  started = Time.now.to_f

  results.map do |page|
    Thread.new do
      report_result = \
        'curl %{auth} -Lf %{url} -XPOST %{headers} --data-binary @%{out}.gz' % {
          auth: auth_opt,
          url: Shellwords.escape(theon_url),
          out: Shellwords.escape(page.path),
          headers: "-H 'Content-Encoding: gzip'"
        }

      _, elapsed_s = timed { sh report_result }
      page.unlink

      log.debug event: :report_result, exited: $?.exitstatus, elapsed_s: elapsed_s
    end
  end.map(&:join)

  log.info event: :report_results, total_elapsed_s: (Time.now.to_f - started)
end
sh(command) click to toggle source
# File lib/sourmix/perform.rb, line 174
def sh command
  output = if options.debug
    $stderr.puts command
    system "#{command} 1>&2"
  else
    `#{command} 1>&2`
  end

  unless $?.exitstatus.zero?
    log.fatal \
      error: 'Command failed',
      command: command,
      status: $?.exitstatus,
      output: output
    raise 'Command "%s" failed' % command
  end

  output
end
timed() { || ... } click to toggle source
# File lib/sourmix/perform.rb, line 195
def timed &block
  started = Time.now.to_f
  rvalue  = yield
  elapsed = Time.now.to_f - started
  return rvalue, elapsed
end
v2_api?() click to toggle source
# File lib/sourmix/perform.rb, line 156
def v2_api? ; options.theon_uri.include? '/v2/' end