class RubyEventStore::Outbox::SidekiqProcessor

Constants

InvalidPayload

Attributes

redis[R]

Public Class Methods

new(redis) click to toggle source
# File lib/ruby_event_store/outbox/sidekiq_processor.rb, line 10
def initialize(redis)
  @redis = redis
  @recently_used_queues = Set.new
end

Public Instance Methods

after_batch() click to toggle source
# File lib/ruby_event_store/outbox/sidekiq_processor.rb, line 29
def after_batch
  if !@recently_used_queues.empty?
    redis.sadd("queues", @recently_used_queues.to_a)
    @recently_used_queues.clear
  end
end
message_format() click to toggle source
# File lib/ruby_event_store/outbox/sidekiq_processor.rb, line 36
def message_format
  SIDEKIQ5_FORMAT
end
process(record, now) click to toggle source
# File lib/ruby_event_store/outbox/sidekiq_processor.rb, line 15
def process(record, now)
  parsed_record = JSON.parse(record.payload)

  queue = parsed_record["queue"]
  raise InvalidPayload.new("Missing queue") if queue.nil? || queue.empty?
  payload = JSON.generate(parsed_record.merge({
    "enqueued_at" => now.to_f,
  }))

  redis.lpush("queue:#{queue}", payload)

  @recently_used_queues << queue
end