class SnowplowTracker::Emitter
Public Class Methods
new(endpoint, config={})
click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 50 def initialize(endpoint, config={}) config = @@DefaultConfig.merge(config) @lock = Monitor.new @collector_uri = as_collector_uri(endpoint, config[:protocol], config[:port], config[:method]) @buffer = [] if not config[:buffer_size].nil? @buffer_size = config[:buffer_size] elsif config[:method] == 'get' @buffer_size = 1 else @buffer_size = 10 end @method = config[:method] @on_success = config[:on_success] @on_failure = config[:on_failure] LOGGER.info("#{self.class} initialized with endpoint #{@collector_uri}") self end
Public Instance Methods
flush(async=true)
click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 98 def flush(async=true) @lock.synchronize do send_requests(@buffer) @buffer = [] end nil end
input(payload)
click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 83 def input(payload) payload.each { |k,v| payload[k] = v.to_s} @lock.synchronize do @buffer.push(payload) if @buffer.size >= @buffer_size flush end end nil end
is_good_status_code(status_code)
click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 217 def is_good_status_code(status_code) status_code.to_i >= 200 && status_code.to_i < 400 end
send_requests(evts)
click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 109 def send_requests(evts) if evts.size < 1 LOGGER.info("Skipping sending events since buffer is empty") return end LOGGER.info("Attempting to send #{evts.size} request#{evts.size == 1 ? '' : 's'}") evts.each do |event| event['stm'] = (Time.now.to_f * 1000).to_i.to_s # add the sent timestamp, overwrite if already exists end if @method == 'post' post_succeeded = false begin request = http_post(SelfDescribingJson.new( 'iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4', evts ).to_json) post_succeeded = is_good_status_code(request.code) rescue StandardError => se LOGGER.warn(se) end if post_succeeded unless @on_success.nil? @on_success.call(evts.size) end else unless @on_failure.nil? @on_failure.call(0, evts) end end elsif @method == 'get' success_count = 0 unsent_requests = [] evts.each do |evt| get_succeeded = false begin request = http_get(evt) get_succeeded = is_good_status_code(request.code) rescue StandardError => se LOGGER.warn(se) end if get_succeeded success_count += 1 else unsent_requests << evt end end if unsent_requests.size == 0 unless @on_success.nil? @on_success.call(success_count) end else unless @on_failure.nil? @on_failure.call(success_count, unsent_requests) end end end nil end
Private Instance Methods
as_collector_uri(endpoint, protocol, port, method)
click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 73 def as_collector_uri(endpoint, protocol, port, method) port_string = port == nil ? '' : ":#{port.to_s}" path = method == 'get' ? '/i' : '/com.snowplowanalytics.snowplow/tp2' "#{protocol}://#{endpoint}#{port_string}#{path}" end
http_get(payload)
click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 175 def http_get(payload) destination = URI(@collector_uri + '?' + URI.encode_www_form(payload)) LOGGER.info("Sending GET request to #{@collector_uri}...") LOGGER.debug("Payload: #{payload}") http = Net::HTTP.new(destination.host, destination.port) request = Net::HTTP::Get.new(destination.request_uri) if destination.scheme == 'https' http.use_ssl = true end response = http.request(request) LOGGER.add(is_good_status_code(response.code) ? Logger::INFO : Logger::WARN) { "GET request to #{@collector_uri} finished with status code #{response.code}" } response end
http_post(payload)
click to toggle source
# File lib/snowplow-tracker/emitters.rb, line 195 def http_post(payload) LOGGER.info("Sending POST request to #{@collector_uri}...") LOGGER.debug("Payload: #{payload}") destination = URI(@collector_uri) http = Net::HTTP.new(destination.host, destination.port) request = Net::HTTP::Post.new(destination.request_uri) if destination.scheme == 'https' http.use_ssl = true end request.body = payload.to_json request.set_content_type('application/json; charset=utf-8') response = http.request(request) LOGGER.add(is_good_status_code(response.code) ? Logger::INFO : Logger::WARN) { "POST request to #{@collector_uri} finished with status code #{response.code}" } response end