class KafkaSession
Constants
- VERSION
Attributes
clock[W]
producer[W]
id[R]
producer[RW]
Public Class Methods
clock()
click to toggle source
# File lib/kafka_session.rb, line 36 def self.clock @clock ||= proc { Time.now.utc } end
configure(options = {})
click to toggle source
# File lib/kafka_session.rb, line 15 def self.configure(options = {}) producer_options = { name: options.fetch(:name), brokers: options.fetch(:brokers) } @producer = Producer.new(producer_options) end
new(id: self.class.session_id, producer: self.class.producer)
click to toggle source
# File lib/kafka_session.rb, line 47 def initialize(id: self.class.session_id, producer: self.class.producer) @id = id @producer = producer end
now()
click to toggle source
# File lib/kafka_session.rb, line 40 def self.now clock.call end
producer()
click to toggle source
# File lib/kafka_session.rb, line 24 def self.producer @producer ||= MockProducer.new end
session_id()
click to toggle source
# File lib/kafka_session.rb, line 28 def self.session_id Thread.current[:kafka_session_session_id] ||= SecureRandom.uuid end
session_id=(session_id)
click to toggle source
# File lib/kafka_session.rb, line 32 def self.session_id=(session_id) Thread.current[:kafka_session_session_id] = session_id end
Public Instance Methods
publish(topic, *message_values)
click to toggle source
# File lib/kafka_session.rb, line 52 def publish(topic, *message_values) messages = message_values.map do |message_value| Message.new(message_value, session_id: id).to_json end producer.publish(topic: topic, messages: messages) end