class Hoss::Transport::Worker

@api private

Attributes

adapter[W]
config[R]
connection[R]
filters[R]
name[R]
queue[R]
serializers[R]

Public Class Methods

adapter() click to toggle source
# File lib/hoss/transport/worker.rb, line 31
def adapter
  @adapter ||= Connection
end
new( config, queue, serializers:, filters: ) click to toggle source
# File lib/hoss/transport/worker.rb, line 44
def initialize(
  config,
  queue,
  serializers:,
  filters:
)
  @config = config
  @queue = queue

  @serializers = serializers
  @filters = filters

  @connection = self.class.adapter.new(config)
end

Public Instance Methods

work_forever() click to toggle source
# File lib/hoss/transport/worker.rb, line 60
def work_forever
  
  done = false
  while (!done && msg = queue.pop)
    begin
      debug 'working message', msg

      # wait if we don't have a config
      while config.agentConfig.nil?
        sleep 0.1
      end

      case msg
      when StopMessage
        debug 'Stopping worker -- %s', self
        done = true
        break
      else
        batch = []
        current_batch_size = 0

        # Use this as the first message in the batch
        event = msg.filtered ? msg : filter_resource(msg)
        unless event.nil?
          event_size = resource_size(event)
          if current_batch_size + event_size <= @config.batch_size
            unless host_blacklisted(event)
              current_batch_size += event_size
              if event.retries < @config.max_event_retries
                batch.push(event)
              else
                debug "max retries hit for event"
              end
            end
          else
            debug "Event is too large, body needs to be truncated"
          end
        end

        # Do inner loop reading queue to build report
        requeue_messages = []
        while current_batch_size < @config.batch_size && !queue.empty?
          next_msg = queue.pop
          case next_msg
          when StopMessage
            debug 'Stopping worker -- %s', self
            done = true
            break
          else
            event = next_msg.filtered ? next_msg : filter_resource(next_msg)
            unless event.nil?
              event_size = resource_size(event)
              if current_batch_size + event_size <= @config.batch_size
                unless host_blacklisted(event)
                  current_batch_size += event_size
                  if event.retries < @config.max_event_retries
                    batch.push(event)
                  else
                    debug "max retries hit for event"
                  end
                end
              else
                debug "Event too large for this batch, requeue"
                requeue_messages.push(event)
              end
            end
          end
        end

        if batch.length == 0
          debug "batch is empty, breaking"
          break
        end

        debug "Requeue #{requeue_messages.length} messages" if requeue_messages.length > 0
        requeue_messages.each {|msg| queue.push(msg, false) }

        report = Report.new
        report.events = batch.map {|event| serializers.serialize(event) }

        debug "Finished building report"
        data = serializers.serialize(report)
        json = JSON.fast_generate(data)
        begin
          debug json
        rescue Exception => e
          debug 'unable to print body'
          puts json if config.debug
        end
        begin
          if config.disable_reporting
            debug "Reprting disabled, skipping"
          else
            connection.write(json)
          end
        rescue Exception => e
          error format('Failed send report: %s %s', e.inspect, e.backtrace)
          batch.each do |m|
            m.retries += 1
            queue.push(m, false)
          end
          sleep 1
        end
      end
    rescue Exception => e
      debug "error in worker #{e.inspect}"
    end
  end
rescue Exception => e
  warn 'Worker died with exception: %s', e.inspect
  debug e.backtrace.join("\n")
end

Private Instance Methods

account_api_configuration() click to toggle source

Return 'accountApiConfiguration' or empty object

# File lib/hoss/transport/worker.rb, line 304
def account_api_configuration
  return {} if config.agentConfig.nil? or config.agentConfig['accountApiConfiguration'].nil?
  config.agentConfig['accountApiConfiguration']
end
api_configurations() click to toggle source

Return the 'apis' or empty object

# File lib/hoss/transport/worker.rb, line 310
def api_configurations
  return {} if config.agentConfig.nil? or config.agentConfig['apis'].nil?
  config.agentConfig['apis']
end
contains_error(resource) click to toggle source
# File lib/hoss/transport/worker.rb, line 234
def contains_error(resource)
  if !resource.response.nil? && resource.response.status_code >= 400
    return true
  end
  false
end
contains_string(arr, a) click to toggle source

Do a case insensitive search for a string in array

# File lib/hoss/transport/worker.rb, line 299
def contains_string(arr, a)
  !arr.select { |b| b.casecmp(a) == 0 }.empty?
end
decompress_body(body, encoding) click to toggle source
# File lib/hoss/transport/worker.rb, line 184
def decompress_body(body, encoding)
  return body unless body
  case (encoding || '').downcase
  when "gzip"
    Zlib::GzipReader.new(StringIO.new(body)).read.to_s
  when "deflate"
    Zlib::Inflate.new.inflate(body)
  else
    body
  end
rescue Exception => e
  debug "unable to decompress body #{e.inspect}"
  return nil
end
filter_body(body, fields) click to toggle source
# File lib/hoss/transport/worker.rb, line 259
def filter_body(body, fields)
  return nil if body.nil?
  json = JSON.parse(body)
  JSON.fast_generate(mask_object_fields(json, fields))
rescue Exception => e
  body
end
filter_headers(headers, names) click to toggle source
# File lib/hoss/transport/worker.rb, line 255
def filter_headers(headers, names)
    headers.each { |h| headers[h[0]] = 'xxx' if contains_string(names, h[0]) }
end
filter_resource(resource) click to toggle source
# File lib/hoss/transport/worker.rb, line 203
def filter_resource(resource)
  # Make sure the body is not compressed
  resource.request.body = decompress_body(resource.request.body, get_header(resource.request.headers, "content-encoding")) if resource.request
  resource.response.body = decompress_body(resource.response.body, get_header(resource.response.headers,"content-encoding")) if resource.response

  # Start applying filters
  config = get_configuration_for_host(resource.request.headers['host'])
  debug "Using config for event #{config.to_s}"

  # Filter the headers
  filter_headers(resource.request.headers, config['sanitizedHeaders']) if resource.request
  filter_headers(resource.response.headers, config['sanitizedHeaders']) if resource.response

  # Filter the body
  if config['bodyCapture'] == 'Off' || (config['bodyCapture'] == 'OnError' && !contains_error(resource))
    resource.request.body = nil if resource.request
    resource.response.body = nil if resource.response
  else
    resource.request.body = filter_body(resource.request.body, config['sanitizedBodyFields']) if resource.request
    resource.response.body = filter_body(resource.response.body, config['sanitizedBodyFields']) if resource.response
  end

  # Filter the query params
  resource.request.url = filter_url(resource.request.url, config['sanitizedQueryParams'])

  # Record this event as filtered in case of retry
  resource.filtered = true

  resource
end
filter_url(url, config) click to toggle source
# File lib/hoss/transport/worker.rb, line 241
def filter_url(url, config)
  uri = URI(url)
  if uri.query
    query = CGI::parse(uri.query)
    query.keys.each{|k| query[k] = 'xxx' if contains_string(config, k)}
    uri.query = URI.encode_www_form(query)
    return uri.to_s
  end
  url
rescue Exception => e
  error "Error filtering url", e
  url
end
get_account_host_blacklist() click to toggle source

Parse out the host blacklist

# File lib/hoss/transport/worker.rb, line 294
def get_account_host_blacklist
  account_api_configuration['hostBlacklist']
end
get_configuration_for_host(host) click to toggle source

Get the configuration for a host, either use a defined api or the default account config

# File lib/hoss/transport/worker.rb, line 287
def get_configuration_for_host(host)
  api_config = api_configurations.select{ |api| contains_string(api['hosts'], host)}[0]
  return api_config['configuration'] if api_config
  account_api_configuration
end
get_header(headers, name) click to toggle source
# File lib/hoss/transport/worker.rb, line 199
def get_header(headers, name)
  headers[headers.keys.select{|h| h.casecmp(name) == 0}[0]]
end
host_blacklisted(event) click to toggle source
# File lib/hoss/transport/worker.rb, line 315
def host_blacklisted(event)
  blacklist = get_account_host_blacklist
  return false if blacklist.nil?
  is_blacklisted = false
  unless event.request.nil? || event.request.url.nil?
    url = URI(event.request.url)
    blacklist.each do |host|
      is_blacklisted = true if url.host == host
      is_blacklisted = true if url.host.end_with? '.'+host
    end
  end
  is_blacklisted
end
mask_object_fields(obj, fields) click to toggle source
# File lib/hoss/transport/worker.rb, line 267
def mask_object_fields(obj, fields)
  field_names = fields.map {|f| f['value'] }
  case obj
  when Hash
    obj.keys.each do |k|
      if contains_string(field_names, k)
        obj[k] = 'xxx'
      else
        obj[k] = mask_object_fields(obj[k], fields)
      end
    end
  when Array
    obj.each_index do |i|
      obj[i] = mask_object_fields(obj[i], fields) if (obj[i].is_a?(Hash) || obj[i].is_a?(Array))
    end
  end
  obj
end
resource_size(resource) click to toggle source
# File lib/hoss/transport/worker.rb, line 175
def resource_size(resource)
  size = 20
  size += JSON.fast_generate(resource.request.headers).length if resource.request && resource.request.headers
  size += JSON.fast_generate(resource.response.headers).length if resource.response && resource.response.headers
  size += resource.request.body.length if resource.request && resource.request.body
  size += resource.response.body.length if resource.response && resource.response.body
  size
end