class Atatus::Transport::Worker
@api private
Attributes
adapter[W]
connection[R]
filters[R]
name[R]
queue[R]
serializers[R]
Public Class Methods
adapter()
click to toggle source
# File lib/atatus/transport/worker.rb, line 27 def adapter @adapter ||= Connection end
new( config, queue, serializers:, filters: )
click to toggle source
# File lib/atatus/transport/worker.rb, line 40 def initialize( config, queue, serializers:, filters: ) @config = config @queue = queue @serializers = serializers @filters = filters @connection = self.class.adapter.new(config) end
Public Instance Methods
process(resource)
click to toggle source
# File lib/atatus/transport/worker.rb, line 72 def process(resource) return unless (json = serialize_and_filter(resource)) connection.write(json) end
work_forever()
click to toggle source
# File lib/atatus/transport/worker.rb, line 56 def work_forever while (msg = queue.pop) case msg when StopMessage debug 'Stopping worker -- %s', self connection.flush(:halt) break else process msg end end rescue Exception => e warn 'Worker died with exception: %s', e.inspect debug e.backtrace.join("\n") end
Private Instance Methods
serialize_and_filter(resource)
click to toggle source
# File lib/atatus/transport/worker.rb, line 79 def serialize_and_filter(resource) if resource.respond_to?(:prepare_for_serialization!) resource.prepare_for_serialization! end serialized = serializers.serialize(resource) # if a filter returns nil, it means skip the event return nil if @filters.apply!(serialized) == Filters::SKIP JSON.fast_generate(serialized) rescue Exception error format('Failed converting event to JSON: %s', resource.inspect) error serialized.inspect nil end