module Kafka::Tracer
Constants
- IngoreMessage
- VERSION
Attributes
ignore_message[RW]
tracer[RW]
Public Class Methods
compatible_version?()
click to toggle source
# File lib/kafka/tracer.rb, line 31 def compatible_version? # https://github.com/zendesk/ruby-kafka/pull/604 Gem::Version.new(Kafka::VERSION) >= Gem::Version.new("0.7.0") end
deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, retries: 1)
click to toggle source
# File lib/kafka/tracer.rb, line 75 def deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, retries: 1) if ::Kafka::Tracer.ignore_message.call(value, key, headers, topic, partition, partition_key) result = deliver_message_original(value, key: key, headers: headers, topic: topic, partition: partition, partition_key: partition_key, retries: retries) else tags = { 'component' => 'ruby-kafka', 'span.kind' => 'producer', 'message_bus.partition' => partition, 'message_bus.partition_key' => partition_key, 'message_bus.destination' => topic, 'message_bus.pending_message' => false } tracer = ::Kafka::Tracer.tracer tracer.start_active_span('kafka.producer', tags: tags) do |scope| OpenTracing.inject(scope.span.context, OpenTracing::FORMAT_TEXT_MAP, headers) begin result = deliver_message_original(value, key: key, headers: headers, topic: topic, partition: partition, partition_key: partition_key, retries: retries) rescue Kafka::Error => e scope.span.set_tag('error', true) raise end end end result end
each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1_048_576, &block)
click to toggle source
# File lib/kafka/tracer.rb, line 181 def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1_048_576, &block) tracer = ::Kafka::Tracer.tracer wrapped_block = lambda { |message| context = tracer.extract(OpenTracing::FORMAT_TEXT_MAP, message.headers) tags = { 'component' => 'ruby-kafka', 'span.kind' => 'consumer', 'message_bus.partition' => message.partition, 'message_bus.destination' => message.topic, 'message_bus.pre_fetched_in_batch' => true } reference = OpenTracing::Reference.follows_from(context) tracer.start_active_span('kafka.consumer', references: [reference], tags: tags) do |scope| begin block.call(message) rescue StandardError scope.span.set_tag('error', true) raise end end } each_message_original( topic: topic, start_from_beginning: start_from_beginning, max_wait_time: max_wait_time, min_bytes: min_bytes, max_bytes: max_bytes, &wrapped_block ) end
instrument(tracer: OpenTracing.global_tracer, ignore_message: IngoreMessage)
click to toggle source
# File lib/kafka/tracer.rb, line 15 def instrument(tracer: OpenTracing.global_tracer, ignore_message: IngoreMessage) begin require 'kafka' rescue LoadError return end raise IncompatibleGemVersion unless compatible_version? @ignore_message = ignore_message @tracer = tracer patch_producer_produce patch_client_deliver_message patch_client_each_message patch_consumer_each_message end
patch_client_deliver_message()
click to toggle source
used to send sync messages
# File lib/kafka/tracer.rb, line 71 def patch_client_deliver_message ::Kafka::Client.class_eval do alias_method :deliver_message_original, :deliver_message def deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, retries: 1) if ::Kafka::Tracer.ignore_message.call(value, key, headers, topic, partition, partition_key) result = deliver_message_original(value, key: key, headers: headers, topic: topic, partition: partition, partition_key: partition_key, retries: retries) else tags = { 'component' => 'ruby-kafka', 'span.kind' => 'producer', 'message_bus.partition' => partition, 'message_bus.partition_key' => partition_key, 'message_bus.destination' => topic, 'message_bus.pending_message' => false } tracer = ::Kafka::Tracer.tracer tracer.start_active_span('kafka.producer', tags: tags) do |scope| OpenTracing.inject(scope.span.context, OpenTracing::FORMAT_TEXT_MAP, headers) begin result = deliver_message_original(value, key: key, headers: headers, topic: topic, partition: partition, partition_key: partition_key, retries: retries) rescue Kafka::Error => e scope.span.set_tag('error', true) raise end end end result end end end
patch_client_each_message()
click to toggle source
# File lib/kafka/tracer.rb, line 177 def patch_client_each_message ::Kafka::Client.class_eval do alias_method :each_message_original, :each_message def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1_048_576, &block) tracer = ::Kafka::Tracer.tracer wrapped_block = lambda { |message| context = tracer.extract(OpenTracing::FORMAT_TEXT_MAP, message.headers) tags = { 'component' => 'ruby-kafka', 'span.kind' => 'consumer', 'message_bus.partition' => message.partition, 'message_bus.destination' => message.topic, 'message_bus.pre_fetched_in_batch' => true } reference = OpenTracing::Reference.follows_from(context) tracer.start_active_span('kafka.consumer', references: [reference], tags: tags) do |scope| begin block.call(message) rescue StandardError scope.span.set_tag('error', true) raise end end } each_message_original( topic: topic, start_from_beginning: start_from_beginning, max_wait_time: max_wait_time, min_bytes: min_bytes, max_bytes: max_bytes, &wrapped_block ) end end end
patch_consumer_each_message()
click to toggle source
# File lib/kafka/tracer.rb, line 219 def patch_consumer_each_message ::Kafka::Consumer.class_eval do alias_method :each_message_original, :each_message def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true) tracer = ::Kafka::Tracer.tracer wrapped_block = lambda { |message| context = tracer.extract(OpenTracing::FORMAT_TEXT_MAP, message.headers) tags = { 'component' => 'ruby-kafka', 'span.kind' => 'consumer', 'message_bus.partition' => message.partition, 'message_bus.destination' => message.topic, 'message_bus.pre_fetched_in_batch' => true } reference = OpenTracing::Reference.follows_from(context) tracer.start_active_span('kafka.consumer', references: [reference], tags: tags) do |scope| begin yield message rescue StandardError scope.span.set_tag('error', true) raise end end } each_message_original( min_bytes: min_bytes, max_bytes: max_bytes, max_wait_time: max_wait_time, automatically_mark_as_processed: automatically_mark_as_processed, &wrapped_block ) end end end
patch_deliver_messages()
click to toggle source
TODO
# File lib/kafka/tracer.rb, line 175 def patch_deliver_messages; end
patch_producer_produce()
click to toggle source
used to send a batch of messages
# File lib/kafka/tracer.rb, line 120 def patch_producer_produce ::Kafka::Producer.class_eval do alias_method :produce_original, :produce def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now) if ::Kafka::Tracer.ignore_message.call(value, key, headers, topic, partition, partition_key) result = produce_original( value, key: key, headers: headers, topic: topic, partition: partition, partition_key: partition_key, create_time: create_time ) else tags = { 'component' => 'ruby-kafka', 'span.kind' => 'producer', 'message_bus.partition' => partition, 'message_bus.partition_key' => partition_key, 'message_bus.destination' => topic, 'message_bus.pending_message' => true } tracer = ::Kafka::Tracer.tracer tracer.start_active_span('kafka.producer', tags: tags) do |scope| OpenTracing.inject(scope.span.context, OpenTracing::FORMAT_TEXT_MAP, headers) begin result = produce_original( value, key: key, headers: headers, topic: topic, partition: partition, partition_key: partition_key, create_time: create_time ) rescue Kafka::Error => e scope.span.set_tag('error', true) raise end end end result end end end
produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now)
click to toggle source
# File lib/kafka/tracer.rb, line 124 def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now) if ::Kafka::Tracer.ignore_message.call(value, key, headers, topic, partition, partition_key) result = produce_original( value, key: key, headers: headers, topic: topic, partition: partition, partition_key: partition_key, create_time: create_time ) else tags = { 'component' => 'ruby-kafka', 'span.kind' => 'producer', 'message_bus.partition' => partition, 'message_bus.partition_key' => partition_key, 'message_bus.destination' => topic, 'message_bus.pending_message' => true } tracer = ::Kafka::Tracer.tracer tracer.start_active_span('kafka.producer', tags: tags) do |scope| OpenTracing.inject(scope.span.context, OpenTracing::FORMAT_TEXT_MAP, headers) begin result = produce_original( value, key: key, headers: headers, topic: topic, partition: partition, partition_key: partition_key, create_time: create_time ) rescue Kafka::Error => e scope.span.set_tag('error', true) raise end end end result end
remove()
click to toggle source
# File lib/kafka/tracer.rb, line 36 def remove if ::Kafka::Producer.method_defined?(:produce_original) ::Kafka::Producer.class_eval do remove_method :produce alias_method :produce, :produce_original remove_method :produce_original end end if ::Kafka::Client.method_defined?(:deliver_message_original) ::Kafka::Client.class_eval do remove_method :deliver_message alias_method :deliver_message, :deliver_message_original remove_method :deliver_message_original end end if ::Kafka::Client.method_defined?(:each_message_original) ::Kafka::Client.class_eval do remove_method :each_message alias_method :each_message, :each_message_original remove_method :each_message_original end end if ::Kafka::Consumer.method_defined?(:each_message_original) ::Kafka::Consumer.class_eval do remove_method :each_message alias_method :each_message, :each_message_original remove_method :each_message_original end end end