class SnowplowTracker::Emitter

Public Class Methods

new(endpoint, config={}) click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 50
def initialize(endpoint, config={})
  config = @@DefaultConfig.merge(config)
  @lock = Monitor.new
  @collector_uri = as_collector_uri(endpoint, config[:protocol], config[:port], config[:method])
  @buffer = []
  if not config[:buffer_size].nil?
    @buffer_size = config[:buffer_size]
  elsif config[:method] == 'get'
    @buffer_size = 1
  else
    @buffer_size = 10
  end
  @method = config[:method]
  @on_success = config[:on_success]
  @on_failure = config[:on_failure]
  LOGGER.info("#{self.class} initialized with endpoint #{@collector_uri}")

  self
end

Public Instance Methods

flush(async=true) click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 98
def flush(async=true)
  @lock.synchronize do
    send_requests(@buffer)
    @buffer = []
  end
  nil
end
input(payload) click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 83
def input(payload)
  payload.each { |k,v| payload[k] = v.to_s}
  @lock.synchronize do
    @buffer.push(payload)
    if @buffer.size >= @buffer_size
      flush
    end
  end

  nil
end
is_good_status_code(status_code) click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 217
def is_good_status_code(status_code)
  status_code.to_i >= 200 && status_code.to_i < 400
end
send_requests(evts) click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 109
def send_requests(evts)
  if evts.size < 1
    LOGGER.info("Skipping sending events since buffer is empty")
    return
  end
  LOGGER.info("Attempting to send #{evts.size} request#{evts.size == 1 ? '' : 's'}")

  evts.each do |event|
    event['stm'] = (Time.now.to_f * 1000).to_i.to_s # add the sent timestamp, overwrite if already exists
  end

  if @method == 'post'
    post_succeeded = false
    begin
      request = http_post(SelfDescribingJson.new(
        'iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4',
        evts
      ).to_json)
      post_succeeded = is_good_status_code(request.code)
    rescue StandardError => se
      LOGGER.warn(se)
    end
    if post_succeeded
      unless @on_success.nil?
        @on_success.call(evts.size)
      end
    else
      unless @on_failure.nil?
        @on_failure.call(0, evts)
      end
    end

  elsif @method == 'get'
    success_count = 0
    unsent_requests = []
    evts.each do |evt|
      get_succeeded = false
      begin
        request = http_get(evt)
        get_succeeded = is_good_status_code(request.code)
      rescue StandardError => se
        LOGGER.warn(se)
      end
      if get_succeeded
        success_count += 1
      else
        unsent_requests << evt
      end
    end
    if unsent_requests.size == 0
      unless @on_success.nil?
        @on_success.call(success_count)
      end
    else
      unless @on_failure.nil?
        @on_failure.call(success_count, unsent_requests)
      end
    end
  end

  nil
end

Private Instance Methods

as_collector_uri(endpoint, protocol, port, method) click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 73
def as_collector_uri(endpoint, protocol, port, method)
  port_string = port == nil ? '' : ":#{port.to_s}"
  path = method == 'get' ? '/i' : '/com.snowplowanalytics.snowplow/tp2'

  "#{protocol}://#{endpoint}#{port_string}#{path}"
end
http_get(payload) click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 175
def http_get(payload)
  destination = URI(@collector_uri + '?' + URI.encode_www_form(payload))
  LOGGER.info("Sending GET request to #{@collector_uri}...")
  LOGGER.debug("Payload: #{payload}")
  http = Net::HTTP.new(destination.host, destination.port)
  request = Net::HTTP::Get.new(destination.request_uri)
  if destination.scheme == 'https'
    http.use_ssl = true
  end
  response = http.request(request)
  LOGGER.add(is_good_status_code(response.code) ? Logger::INFO : Logger::WARN) {
    "GET request to #{@collector_uri} finished with status code #{response.code}"
  }

  response
end
http_post(payload) click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 195
def http_post(payload)
  LOGGER.info("Sending POST request to #{@collector_uri}...")
  LOGGER.debug("Payload: #{payload}")
  destination = URI(@collector_uri)
  http = Net::HTTP.new(destination.host, destination.port)
  request = Net::HTTP::Post.new(destination.request_uri)
  if destination.scheme == 'https'
    http.use_ssl = true
  end
  request.body = payload.to_json
  request.set_content_type('application/json; charset=utf-8')
  response = http.request(request)
  LOGGER.add(is_good_status_code(response.code) ? Logger::INFO : Logger::WARN) {
    "POST request to #{@collector_uri} finished with status code #{response.code}"
  }

  response
end