class Sqreen::Ecosystem::Messaging::Kafka
Public Instance Methods
setup()
click to toggle source
# File lib/sqreen/ecosystem/messaging/kafka.rb, line 20 def setup advice_send = wrap_for_interest(ModuleApi::Tracing::ProducerData, &method(:after_send)) advice_receive = wrap_for_interest(ModuleApi::Tracing::ConsumerData, &method(:after_receive)) instrument 'Kafka::Broker#produce', after: advice_send instrument 'Kafka::Broker#fetch_messages', after: advice_receive end
Private Instance Methods
after_receive(call, _ball)
click to toggle source
@param [Sqreen::Graft::CallbackCall] call
# File lib/sqreen/ecosystem/messaging/kafka.rb, line 45 def after_receive(call, _ball) return if call.raised if call.args.length != 1 logger.info "Expected 1 arguments to Kafka::Broker#fetch_messages" return end options = call.args.first topics = options[:topics].keys create_signal_data(ModuleApi::Tracing::ConsumerData, call.instance, topics) end
after_send(call, _ball)
click to toggle source
@param [Sqreen::Graft::CallbackCall] call
# File lib/sqreen/ecosystem/messaging/kafka.rb, line 30 def after_send(call, _ball) return if call.raised if call.args.length != 1 logger.info "Expected 1 arguments to Kafka::Broker#produce" return end options = call.args.first topics = options[:messages_for_topics].keys create_signal_data(ModuleApi::Tracing::ProducerData, call.instance, topics) end
create_signal_data(clazz, broker, topics)
click to toggle source
@param [Class] clazz @param [Kafka::Broker] broker @param [Array<String>] topics
# File lib/sqreen/ecosystem/messaging/kafka.rb, line 62 def create_signal_data(clazz, broker, topics) host = broker.instance_variable_get :@host topics.map do |top| clazz.new( message_type: :kafka, host: host, topic: top, ) end end