class Trace::ZipkinKafkaSender
This class sends information to Zipkin through Kafka. Spans are encoded using Thrift
Constants
- DEFAULT_KAFKA_TOPIC
- IP_FORMAT
Public Class Methods
new(options = {})
click to toggle source
Calls superclass method
# File lib/zipkin-tracer/zipkin_kafka_sender.rb, line 19 def initialize(options = {}) @topic = options[:topic] || DEFAULT_KAFKA_TOPIC if options[:producer] && options[:producer].respond_to?(:push) @producer = options[:producer] elsif options[:zookeepers] initialize_hermann_producer(options[:zookeepers]) else raise ArgumentError, "No (kafka) :producer option (accepting #push) and no :zookeeper option provided." end super(options) end
Public Instance Methods
flush!()
click to toggle source
# File lib/zipkin-tracer/zipkin_kafka_sender.rb, line 32 def flush! resolved_spans = ::ZipkinTracer::HostnameResolver.new.spans_with_ips(spans, IP_FORMAT) resolved_spans.each do |span| buf = '' trans = Thrift::MemoryBufferTransport.new(buf) oprot = Thrift::BinaryProtocol.new(trans) span.to_thrift.write(oprot) retval = @producer.push(buf, topic: @topic) # If @producer#push returns a promise/promise-like object, block until it # resolves retval.value! if retval.respond_to?(:value!) retval end rescue Exception # Ignore socket errors, etc end
Private Instance Methods
initialize_hermann_producer(zookeepers)
click to toggle source
# File lib/zipkin-tracer/zipkin_kafka_sender.rb, line 52 def initialize_hermann_producer(zookeepers) broker_ids = Hermann::Discovery::Zookeeper.new(zookeepers).get_brokers @producer = Hermann::Producer.new(nil, broker_ids) end