class Sqreen::Ecosystem::Messaging::Kafka

Public Instance Methods

setup() click to toggle source
# File lib/sqreen/ecosystem/messaging/kafka.rb, line 20
def setup
  advice_send = wrap_for_interest(ModuleApi::Tracing::ProducerData, &method(:after_send))
  advice_receive = wrap_for_interest(ModuleApi::Tracing::ConsumerData, &method(:after_receive))
  instrument 'Kafka::Broker#produce',        after: advice_send
  instrument 'Kafka::Broker#fetch_messages', after: advice_receive
end

Private Instance Methods

after_receive(call, _ball) click to toggle source

@param [Sqreen::Graft::CallbackCall] call

# File lib/sqreen/ecosystem/messaging/kafka.rb, line 45
def after_receive(call, _ball)
  return if call.raised
  if call.args.length != 1
    logger.info "Expected 1 arguments to Kafka::Broker#fetch_messages"
    return
  end
  options = call.args.first
  topics = options[:topics].keys

  create_signal_data(ModuleApi::Tracing::ConsumerData,
                     call.instance,
                     topics)
end
after_send(call, _ball) click to toggle source

@param [Sqreen::Graft::CallbackCall] call

# File lib/sqreen/ecosystem/messaging/kafka.rb, line 30
def after_send(call, _ball)
  return if call.raised
  if call.args.length != 1
    logger.info "Expected 1 arguments to Kafka::Broker#produce"
    return
  end
  options = call.args.first
  topics = options[:messages_for_topics].keys

  create_signal_data(ModuleApi::Tracing::ProducerData,
                     call.instance,
                     topics)
end
create_signal_data(clazz, broker, topics) click to toggle source

@param [Class] clazz @param [Kafka::Broker] broker @param [Array<String>] topics

# File lib/sqreen/ecosystem/messaging/kafka.rb, line 62
def create_signal_data(clazz, broker, topics)
  host = broker.instance_variable_get :@host
  topics.map do |top|
    clazz.new(
      message_type: :kafka,
      host: host,
      topic: top,
    )
  end
end