class Sqreen::Ecosystem::Messaging::Kinesis
Public Instance Methods
setup()
click to toggle source
# File lib/sqreen/ecosystem/messaging/kinesis.rb, line 21 def setup advice_send = wrap_for_interest(ModuleApi::Tracing::ProducerData, &method(:after_send_advice)) advice_receive = wrap_for_interest(ModuleApi::Tracing::ConsumerData, &method(:after_receive_advice)) instrument 'Aws::Kinesis::Client#put_record', after: advice_send instrument 'Aws::Kinesis::Client#put_records', after: advice_send # more sophisticated usages (register_stream_consumer, possibly with AsyncClient) # are not supported. They are more difficult to test, as kinesalite doesn't support them instrument 'Aws::Kinesis::Client#get_shard_iterator', after: advice_receive end
Private Instance Methods
after_receive_advice(call, _ball)
click to toggle source
@param [Sqreen::Graft::CallbackCall] call
# File lib/sqreen/ecosystem/messaging/kinesis.rb, line 45 def after_receive_advice(call, _ball) return if call.raised unless call.args.length > 0 && call.args[0].is_a?(Hash) logger.info "Unexpected arguments to get_shared_iterator" return end create_signal(call.instance, call.args, ModuleApi::Tracing::ConsumerData) end
after_send_advice(call, _ball)
click to toggle source
@param [Sqreen::Graft::CallbackCall] call
# File lib/sqreen/ecosystem/messaging/kinesis.rb, line 34 def after_send_advice(call, _ball) return if call.raised unless call.args.length > 0 && call.args[0].is_a?(Hash) logger.info "Unexpected arguments to put_record(s)" return end create_signal(call.instance, call.args, ModuleApi::Tracing::ProducerData) end
create_signal(client, args, clazz)
click to toggle source
# File lib/sqreen/ecosystem/messaging/kinesis.rb, line 55 def create_signal(client, args, clazz) hash = args[0] stream_name = hash[:stream_name] || hash['stream_name'] return unless stream_name endpoint = client.instance_variable_get(:@config).endpoint clazz.new( message_type: :'aws-kinesis', host: endpoint.host, topic: stream_name, ) end