class RubyEventStore::Outbox::SidekiqProcessor
Constants
- InvalidPayload
Attributes
redis[R]
Public Class Methods
new(redis)
click to toggle source
# File lib/ruby_event_store/outbox/sidekiq_processor.rb, line 10 def initialize(redis) @redis = redis @recently_used_queues = Set.new end
Public Instance Methods
after_batch()
click to toggle source
# File lib/ruby_event_store/outbox/sidekiq_processor.rb, line 29 def after_batch if !@recently_used_queues.empty? redis.sadd("queues", @recently_used_queues.to_a) @recently_used_queues.clear end end
message_format()
click to toggle source
# File lib/ruby_event_store/outbox/sidekiq_processor.rb, line 36 def message_format SIDEKIQ5_FORMAT end
process(record, now)
click to toggle source
# File lib/ruby_event_store/outbox/sidekiq_processor.rb, line 15 def process(record, now) parsed_record = JSON.parse(record.payload) queue = parsed_record["queue"] raise InvalidPayload.new("Missing queue") if queue.nil? || queue.empty? payload = JSON.generate(parsed_record.merge({ "enqueued_at" => now.to_f, })) redis.lpush("queue:#{queue}", payload) @recently_used_queues << queue end