class OpenTelemetry::Instrumentation::Que::Middlewares::ServerMiddleware

Server middleware to trace Que jobs

Public Class Methods

attributes_before_job_completion(job, job_class) click to toggle source
# File lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb, line 51
def self.attributes_before_job_completion(job, job_class)
  attributes = {
    'messaging.system' => 'que',
    'messaging.destination' => job.que_attrs[:queue] || 'default',
    'messaging.destination_kind' => 'queue',
    'messaging.operation' => 'process',
    'messaging.que.job_class' => job_class,
    'messaging.que.priority' => job.que_attrs[:priority] || 100
  }
  attributes['messaging.message_id'] = job.que_attrs[:id] if job.que_attrs[:id]
  attributes
end
call(job, &block) click to toggle source
# File lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb, line 13
def self.call(job, &block) # rubocop:disable Metrics/AbcSize,Metrics/MethodLength
  job_class = job_class(job)
  span_name = "#{job_class} process"
  attributes = attributes_before_job_completion(job, job_class)

  extracted_context = extract_context_from_tags(job.que_attrs[:data][:tags] || [])

  OpenTelemetry::Context.with_current(extracted_context) do
    if otel_config[:propagation_style] == :child
      tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span|
        block.call
        enhance_span_after_job_completion(span, job)
      end
    else
      span_links = otel_config[:propagation_style] == :link ? prepare_span_links(extracted_context) : []

      root_span = tracer.start_root_span(span_name, attributes: attributes, links: span_links, kind: :consumer)
      OpenTelemetry::Trace.with_span(root_span) do |span|
        block.call
        enhance_span_after_job_completion(span, job)
      ensure
        root_span.finish
      end
    end
  end

  # return value is not important
  nil
end
enhance_span_after_job_completion(span, job) click to toggle source
# File lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb, line 64
def self.enhance_span_after_job_completion(span, job)
  span.set_attribute('messaging.que.attempts', job.que_attrs[:error_count])

  error = job.que_error
  return unless error

  span.record_exception(error)
  span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{error.class}")
end
extract_context_from_tags(tags) click to toggle source

tags is an array looking something like [“tag1”, “traceparent:…”]

# File lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb, line 75
def self.extract_context_from_tags(tags)
  # Convert tags into Hash (ignoring elements that cannot be converted)
  tags_hash = Hash[
    tags.map { |value| value.split(':', 2) }.select { |value| value.size == 2 }
  ]
  OpenTelemetry.propagation.extract(tags_hash)
end
job_class(job) click to toggle source
# File lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb, line 83
def self.job_class(job)
  job.que_attrs[:job_class] || job.class.name
end
otel_config() click to toggle source
# File lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb, line 47
def self.otel_config
  Que::Instrumentation.instance.config
end
tracer() click to toggle source
# File lib/opentelemetry/instrumentation/que/middlewares/server_middleware.rb, line 43
def self.tracer
  OpenTelemetry::Instrumentation::Que::Instrumentation.instance.tracer
end