class Sqreen::Ecosystem::Messaging::Kinesis

Public Instance Methods

setup() click to toggle source
# File lib/sqreen/ecosystem/messaging/kinesis.rb, line 21
def setup
  advice_send = wrap_for_interest(ModuleApi::Tracing::ProducerData, &method(:after_send_advice))
  advice_receive = wrap_for_interest(ModuleApi::Tracing::ConsumerData, &method(:after_receive_advice))
  instrument 'Aws::Kinesis::Client#put_record',  after: advice_send
  instrument 'Aws::Kinesis::Client#put_records', after: advice_send
  # more sophisticated usages (register_stream_consumer, possibly with AsyncClient)
  # are not supported. They are more difficult to test, as kinesalite doesn't support them
  instrument 'Aws::Kinesis::Client#get_shard_iterator', after: advice_receive
end

Private Instance Methods

after_receive_advice(call, _ball) click to toggle source

@param [Sqreen::Graft::CallbackCall] call

# File lib/sqreen/ecosystem/messaging/kinesis.rb, line 45
def after_receive_advice(call, _ball)
  return if call.raised
  unless call.args.length > 0 && call.args[0].is_a?(Hash)
    logger.info "Unexpected arguments to get_shared_iterator"
    return
  end

  create_signal(call.instance, call.args, ModuleApi::Tracing::ConsumerData)
end
after_send_advice(call, _ball) click to toggle source

@param [Sqreen::Graft::CallbackCall] call

# File lib/sqreen/ecosystem/messaging/kinesis.rb, line 34
def after_send_advice(call, _ball)
  return if call.raised
  unless call.args.length > 0 && call.args[0].is_a?(Hash)
    logger.info "Unexpected arguments to put_record(s)"
    return
  end

  create_signal(call.instance, call.args, ModuleApi::Tracing::ProducerData)
end
create_signal(client, args, clazz) click to toggle source
# File lib/sqreen/ecosystem/messaging/kinesis.rb, line 55
def create_signal(client, args, clazz)
  hash = args[0]
  stream_name = hash[:stream_name] || hash['stream_name']
  return unless stream_name

  endpoint = client.instance_variable_get(:@config).endpoint

  clazz.new(
    message_type: :'aws-kinesis',
    host: endpoint.host,
    topic: stream_name,
  )
end