class Hoss::Transport::Worker
@api private
Attributes
adapter[W]
config[R]
connection[R]
filters[R]
name[R]
queue[R]
serializers[R]
Public Class Methods
adapter()
click to toggle source
# File lib/hoss/transport/worker.rb, line 31 def adapter @adapter ||= Connection end
new( config, queue, serializers:, filters: )
click to toggle source
# File lib/hoss/transport/worker.rb, line 44 def initialize( config, queue, serializers:, filters: ) @config = config @queue = queue @serializers = serializers @filters = filters @connection = self.class.adapter.new(config) end
Public Instance Methods
work_forever()
click to toggle source
# File lib/hoss/transport/worker.rb, line 60 def work_forever done = false while (!done && msg = queue.pop) begin debug 'working message', msg # wait if we don't have a config while config.agentConfig.nil? sleep 0.1 end case msg when StopMessage debug 'Stopping worker -- %s', self done = true break else batch = [] current_batch_size = 0 # Use this as the first message in the batch event = msg.filtered ? msg : filter_resource(msg) unless event.nil? event_size = resource_size(event) if current_batch_size + event_size <= @config.batch_size unless host_blacklisted(event) current_batch_size += event_size if event.retries < @config.max_event_retries batch.push(event) else debug "max retries hit for event" end end else debug "Event is too large, body needs to be truncated" end end # Do inner loop reading queue to build report requeue_messages = [] while current_batch_size < @config.batch_size && !queue.empty? next_msg = queue.pop case next_msg when StopMessage debug 'Stopping worker -- %s', self done = true break else event = next_msg.filtered ? next_msg : filter_resource(next_msg) unless event.nil? event_size = resource_size(event) if current_batch_size + event_size <= @config.batch_size unless host_blacklisted(event) current_batch_size += event_size if event.retries < @config.max_event_retries batch.push(event) else debug "max retries hit for event" end end else debug "Event too large for this batch, requeue" requeue_messages.push(event) end end end end if batch.length == 0 debug "batch is empty, breaking" break end debug "Requeue #{requeue_messages.length} messages" if requeue_messages.length > 0 requeue_messages.each {|msg| queue.push(msg, false) } report = Report.new report.events = batch.map {|event| serializers.serialize(event) } debug "Finished building report" data = serializers.serialize(report) json = JSON.fast_generate(data) begin debug json rescue Exception => e debug 'unable to print body' puts json if config.debug end begin if config.disable_reporting debug "Reprting disabled, skipping" else connection.write(json) end rescue Exception => e error format('Failed send report: %s %s', e.inspect, e.backtrace) batch.each do |m| m.retries += 1 queue.push(m, false) end sleep 1 end end rescue Exception => e debug "error in worker #{e.inspect}" end end rescue Exception => e warn 'Worker died with exception: %s', e.inspect debug e.backtrace.join("\n") end
Private Instance Methods
account_api_configuration()
click to toggle source
Return 'accountApiConfiguration' or empty object
# File lib/hoss/transport/worker.rb, line 304 def account_api_configuration return {} if config.agentConfig.nil? or config.agentConfig['accountApiConfiguration'].nil? config.agentConfig['accountApiConfiguration'] end
api_configurations()
click to toggle source
Return the 'apis' or empty object
# File lib/hoss/transport/worker.rb, line 310 def api_configurations return {} if config.agentConfig.nil? or config.agentConfig['apis'].nil? config.agentConfig['apis'] end
contains_error(resource)
click to toggle source
# File lib/hoss/transport/worker.rb, line 234 def contains_error(resource) if !resource.response.nil? && resource.response.status_code >= 400 return true end false end
contains_string(arr, a)
click to toggle source
Do a case insensitive search for a string in array
# File lib/hoss/transport/worker.rb, line 299 def contains_string(arr, a) !arr.select { |b| b.casecmp(a) == 0 }.empty? end
decompress_body(body, encoding)
click to toggle source
# File lib/hoss/transport/worker.rb, line 184 def decompress_body(body, encoding) return body unless body case (encoding || '').downcase when "gzip" Zlib::GzipReader.new(StringIO.new(body)).read.to_s when "deflate" Zlib::Inflate.new.inflate(body) else body end rescue Exception => e debug "unable to decompress body #{e.inspect}" return nil end
filter_body(body, fields)
click to toggle source
# File lib/hoss/transport/worker.rb, line 259 def filter_body(body, fields) return nil if body.nil? json = JSON.parse(body) JSON.fast_generate(mask_object_fields(json, fields)) rescue Exception => e body end
filter_headers(headers, names)
click to toggle source
# File lib/hoss/transport/worker.rb, line 255 def filter_headers(headers, names) headers.each { |h| headers[h[0]] = 'xxx' if contains_string(names, h[0]) } end
filter_resource(resource)
click to toggle source
# File lib/hoss/transport/worker.rb, line 203 def filter_resource(resource) # Make sure the body is not compressed resource.request.body = decompress_body(resource.request.body, get_header(resource.request.headers, "content-encoding")) if resource.request resource.response.body = decompress_body(resource.response.body, get_header(resource.response.headers,"content-encoding")) if resource.response # Start applying filters config = get_configuration_for_host(resource.request.headers['host']) debug "Using config for event #{config.to_s}" # Filter the headers filter_headers(resource.request.headers, config['sanitizedHeaders']) if resource.request filter_headers(resource.response.headers, config['sanitizedHeaders']) if resource.response # Filter the body if config['bodyCapture'] == 'Off' || (config['bodyCapture'] == 'OnError' && !contains_error(resource)) resource.request.body = nil if resource.request resource.response.body = nil if resource.response else resource.request.body = filter_body(resource.request.body, config['sanitizedBodyFields']) if resource.request resource.response.body = filter_body(resource.response.body, config['sanitizedBodyFields']) if resource.response end # Filter the query params resource.request.url = filter_url(resource.request.url, config['sanitizedQueryParams']) # Record this event as filtered in case of retry resource.filtered = true resource end
filter_url(url, config)
click to toggle source
# File lib/hoss/transport/worker.rb, line 241 def filter_url(url, config) uri = URI(url) if uri.query query = CGI::parse(uri.query) query.keys.each{|k| query[k] = 'xxx' if contains_string(config, k)} uri.query = URI.encode_www_form(query) return uri.to_s end url rescue Exception => e error "Error filtering url", e url end
get_account_host_blacklist()
click to toggle source
Parse out the host blacklist
# File lib/hoss/transport/worker.rb, line 294 def get_account_host_blacklist account_api_configuration['hostBlacklist'] end
get_configuration_for_host(host)
click to toggle source
Get the configuration for a host, either use a defined api or the default account config
# File lib/hoss/transport/worker.rb, line 287 def get_configuration_for_host(host) api_config = api_configurations.select{ |api| contains_string(api['hosts'], host)}[0] return api_config['configuration'] if api_config account_api_configuration end
get_header(headers, name)
click to toggle source
# File lib/hoss/transport/worker.rb, line 199 def get_header(headers, name) headers[headers.keys.select{|h| h.casecmp(name) == 0}[0]] end
host_blacklisted(event)
click to toggle source
# File lib/hoss/transport/worker.rb, line 315 def host_blacklisted(event) blacklist = get_account_host_blacklist return false if blacklist.nil? is_blacklisted = false unless event.request.nil? || event.request.url.nil? url = URI(event.request.url) blacklist.each do |host| is_blacklisted = true if url.host == host is_blacklisted = true if url.host.end_with? '.'+host end end is_blacklisted end
mask_object_fields(obj, fields)
click to toggle source
# File lib/hoss/transport/worker.rb, line 267 def mask_object_fields(obj, fields) field_names = fields.map {|f| f['value'] } case obj when Hash obj.keys.each do |k| if contains_string(field_names, k) obj[k] = 'xxx' else obj[k] = mask_object_fields(obj[k], fields) end end when Array obj.each_index do |i| obj[i] = mask_object_fields(obj[i], fields) if (obj[i].is_a?(Hash) || obj[i].is_a?(Array)) end end obj end
resource_size(resource)
click to toggle source
# File lib/hoss/transport/worker.rb, line 175 def resource_size(resource) size = 20 size += JSON.fast_generate(resource.request.headers).length if resource.request && resource.request.headers size += JSON.fast_generate(resource.response.headers).length if resource.response && resource.response.headers size += resource.request.body.length if resource.request && resource.request.body size += resource.response.body.length if resource.response && resource.response.body size end