class RubyEventStore::Outbox::Consumer
Constants
- MAXIMUM_BATCH_FETCHES_IN_ONE_LOCK
Attributes
batch_size[R]
cleanup_strategy[R]
consumer_uuid[R]
logger[R]
metrics[R]
processor[R]
repository[R]
sleep_on_empty[R]
split_keys[R]
Public Class Methods
new(consumer_uuid, configuration, clock: Time, logger:, metrics:)
click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 51 def initialize(consumer_uuid, configuration, clock: Time, logger:, metrics:) @split_keys = configuration.split_keys @clock = clock @logger = logger @metrics = metrics @batch_size = configuration.batch_size @sleep_on_empty = configuration.sleep_on_empty @consumer_uuid = consumer_uuid raise "Unknown format" if configuration.message_format != SIDEKIQ5_FORMAT @processor = SidekiqProcessor.new(Redis.new(url: configuration.redis_url)) @gracefully_shutting_down = false prepare_traps @repository = Repository.new(configuration.database_url) @cleanup_strategy = case configuration.cleanup when :none CleanupStrategies::None.new else CleanupStrategies::CleanOldEnqueued.new(repository, ActiveSupport::Duration.parse(configuration.cleanup)) end end
Public Instance Methods
handle_split(fetch_specification)
click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 101 def handle_split(fetch_specification) obtained_lock = obtain_lock_for_process(fetch_specification) return false unless obtained_lock something_processed = false MAXIMUM_BATCH_FETCHES_IN_ONE_LOCK.times do batch = retrieve_batch(fetch_specification) if batch.empty? break end failed_record_ids = [] updated_record_ids = [] batch.each do |record| begin now = @clock.now.utc processor.process(record, now) repository.mark_as_enqueued(record, now) something_processed |= true updated_record_ids << record.id rescue => e failed_record_ids << record.id e.full_message.split($/).each {|line| logger.error(line) } end end metrics.write_point_queue( enqueued: updated_record_ids.size, failed: failed_record_ids.size, format: fetch_specification.message_format, split_key: fetch_specification.split_key, remaining: get_remaining_count(fetch_specification) ) logger.info "Sent #{updated_record_ids.size} messages from outbox table" refresh_successful = refresh_lock_for_process(obtained_lock) break unless refresh_successful end metrics.write_point_queue( format: fetch_specification.message_format, split_key: fetch_specification.split_key, remaining: get_remaining_count(fetch_specification) ) unless something_processed release_lock_for_process(fetch_specification) cleanup_strategy.call(fetch_specification) processor.after_batch something_processed end
init()
click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 75 def init logger.info("Initiated RubyEventStore::Outbox v#{VERSION}") logger.info("Handling split keys: #{split_keys ? split_keys.join(", ") : "(all of them)"}") end
one_loop()
click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 91 def one_loop remaining_split_keys = @split_keys.dup was_something_changed = false while (split_key = remaining_split_keys.shift) was_something_changed |= handle_split(FetchSpecification.new(processor.message_format, split_key)) end was_something_changed end
run()
click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 80 def run while !@gracefully_shutting_down do was_something_changed = one_loop if !was_something_changed STDOUT.flush sleep sleep_on_empty end end logger.info "Gracefully shutting down" end
Private Instance Methods
get_remaining_count(fetch_specification)
click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 238 def get_remaining_count(fetch_specification) repository.get_remaining_count(fetch_specification) end
initiate_graceful_shutdown()
click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 230 def initiate_graceful_shutdown @gracefully_shutting_down = true end
obtain_lock_for_process(fetch_specification)
click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 161 def obtain_lock_for_process(fetch_specification) result = repository.obtain_lock_for_process(fetch_specification, consumer_uuid, clock: @clock) case result when :deadlocked logger.warn "Obtaining lock for split_key '#{fetch_specification.split_key}' failed (deadlock)" metrics.write_operation_result("obtain", "deadlocked") return false when :lock_timeout logger.warn "Obtaining lock for split_key '#{fetch_specification.split_key}' failed (lock timeout)" metrics.write_operation_result("obtain", "lock_timeout") return false when :taken logger.debug "Obtaining lock for split_key '#{fetch_specification.split_key}' unsuccessful (taken)" metrics.write_operation_result("obtain", "taken") return false else return result end end
prepare_traps()
click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 221 def prepare_traps Signal.trap("INT") do initiate_graceful_shutdown end Signal.trap("TERM") do initiate_graceful_shutdown end end
refresh_lock_for_process(lock)
click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 199 def refresh_lock_for_process(lock) result = lock.refresh(clock: @clock) case result when :ok return true when :deadlocked logger.warn "Refreshing lock for split_key '#{lock.split_key}' failed (deadlock)" metrics.write_operation_result("refresh", "deadlocked") return false when :lock_timeout logger.warn "Refreshing lock for split_key '#{lock.split_key}' failed (lock timeout)" metrics.write_operation_result("refresh", "lock_timeout") return false when :stolen logger.debug "Refreshing lock for split_key '#{lock.split_key}' unsuccessful (stolen)" metrics.write_operation_result("refresh", "stolen") return false else raise "Unexpected result #{result}" end end
release_lock_for_process(fetch_specification)
click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 181 def release_lock_for_process(fetch_specification) result = repository.release_lock_for_process(fetch_specification, consumer_uuid) case result when :ok when :deadlocked logger.warn "Releasing lock for split_key '#{fetch_specification.split_key}' failed (deadlock)" metrics.write_operation_result("release", "deadlocked") when :lock_timeout logger.warn "Releasing lock for split_key '#{fetch_specification.split_key}' failed (lock timeout)" metrics.write_operation_result("release", "lock_timeout") when :not_taken_by_this_process logger.debug "Releasing lock for split_key '#{fetch_specification.split_key}' failed (not taken by this process)" metrics.write_operation_result("release", "not_taken_by_this_process") else raise "Unexpected result #{result}" end end
retrieve_batch(fetch_specification)
click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 234 def retrieve_batch(fetch_specification) repository.retrieve_batch(fetch_specification, batch_size) end