class KafkaSession

Constants

VERSION

Attributes

clock[W]
producer[W]
id[R]
producer[RW]

Public Class Methods

clock() click to toggle source
# File lib/kafka_session.rb, line 36
def self.clock
  @clock ||= proc { Time.now.utc }
end
configure(options = {}) click to toggle source
# File lib/kafka_session.rb, line 15
def self.configure(options = {})
  producer_options = {
    name:    options.fetch(:name),
    brokers: options.fetch(:brokers)
  }

  @producer = Producer.new(producer_options)
end
new(id: self.class.session_id, producer: self.class.producer) click to toggle source
# File lib/kafka_session.rb, line 47
def initialize(id: self.class.session_id, producer: self.class.producer)
  @id       = id
  @producer = producer
end
now() click to toggle source
# File lib/kafka_session.rb, line 40
def self.now
  clock.call
end
producer() click to toggle source
# File lib/kafka_session.rb, line 24
def self.producer
  @producer ||= MockProducer.new
end
session_id() click to toggle source
# File lib/kafka_session.rb, line 28
def self.session_id
  Thread.current[:kafka_session_session_id] ||= SecureRandom.uuid
end
session_id=(session_id) click to toggle source
# File lib/kafka_session.rb, line 32
def self.session_id=(session_id)
  Thread.current[:kafka_session_session_id] = session_id
end

Public Instance Methods

publish(topic, *message_values) click to toggle source
# File lib/kafka_session.rb, line 52
def publish(topic, *message_values)
  messages = message_values.map do |message_value|
    Message.new(message_value, session_id: id).to_json
  end

  producer.publish(topic: topic, messages: messages)
end