class RubyEventStore::Outbox::Consumer

Constants

MAXIMUM_BATCH_FETCHES_IN_ONE_LOCK

Attributes

batch_size[R]
cleanup_strategy[R]
consumer_uuid[R]
logger[R]
metrics[R]
processor[R]
repository[R]
sleep_on_empty[R]
split_keys[R]

Public Class Methods

new(consumer_uuid, configuration, clock: Time, logger:, metrics:) click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 51
def initialize(consumer_uuid, configuration, clock: Time, logger:, metrics:)
  @split_keys = configuration.split_keys
  @clock = clock
  @logger = logger
  @metrics = metrics
  @batch_size = configuration.batch_size
  @sleep_on_empty = configuration.sleep_on_empty
  @consumer_uuid = consumer_uuid

  raise "Unknown format" if configuration.message_format != SIDEKIQ5_FORMAT
  @processor = SidekiqProcessor.new(Redis.new(url: configuration.redis_url))

  @gracefully_shutting_down = false
  prepare_traps

  @repository = Repository.new(configuration.database_url)
  @cleanup_strategy = case configuration.cleanup
  when :none
    CleanupStrategies::None.new
  else
    CleanupStrategies::CleanOldEnqueued.new(repository, ActiveSupport::Duration.parse(configuration.cleanup))
  end
end

Public Instance Methods

handle_split(fetch_specification) click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 101
def handle_split(fetch_specification)
  obtained_lock = obtain_lock_for_process(fetch_specification)
  return false unless obtained_lock

  something_processed = false

  MAXIMUM_BATCH_FETCHES_IN_ONE_LOCK.times do
    batch = retrieve_batch(fetch_specification)
    if batch.empty?
      break
    end

    failed_record_ids = []
    updated_record_ids = []
    batch.each do |record|
      begin
        now = @clock.now.utc
        processor.process(record, now)

        repository.mark_as_enqueued(record, now)
        something_processed |= true
        updated_record_ids << record.id
      rescue => e
        failed_record_ids << record.id
        e.full_message.split($/).each {|line| logger.error(line) }
      end
    end

    metrics.write_point_queue(
      enqueued: updated_record_ids.size,
      failed: failed_record_ids.size,
      format: fetch_specification.message_format,
      split_key: fetch_specification.split_key,
      remaining: get_remaining_count(fetch_specification)
    )

    logger.info "Sent #{updated_record_ids.size} messages from outbox table"

    refresh_successful = refresh_lock_for_process(obtained_lock)
    break unless refresh_successful
  end

  metrics.write_point_queue(
    format: fetch_specification.message_format,
    split_key: fetch_specification.split_key,
    remaining: get_remaining_count(fetch_specification)
  ) unless something_processed

  release_lock_for_process(fetch_specification)

  cleanup_strategy.call(fetch_specification)

  processor.after_batch

  something_processed
end
init() click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 75
def init
  logger.info("Initiated RubyEventStore::Outbox v#{VERSION}")
  logger.info("Handling split keys: #{split_keys ? split_keys.join(", ") : "(all of them)"}")
end
one_loop() click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 91
def one_loop
  remaining_split_keys = @split_keys.dup

  was_something_changed = false
  while (split_key = remaining_split_keys.shift)
    was_something_changed |= handle_split(FetchSpecification.new(processor.message_format, split_key))
  end
  was_something_changed
end
run() click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 80
def run
  while !@gracefully_shutting_down do
    was_something_changed = one_loop
    if !was_something_changed
      STDOUT.flush
      sleep sleep_on_empty
    end
  end
  logger.info "Gracefully shutting down"
end

Private Instance Methods

get_remaining_count(fetch_specification) click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 238
def get_remaining_count(fetch_specification)
  repository.get_remaining_count(fetch_specification)
end
initiate_graceful_shutdown() click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 230
def initiate_graceful_shutdown
  @gracefully_shutting_down = true
end
obtain_lock_for_process(fetch_specification) click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 161
def obtain_lock_for_process(fetch_specification)
  result = repository.obtain_lock_for_process(fetch_specification, consumer_uuid, clock: @clock)
  case result
  when :deadlocked
    logger.warn "Obtaining lock for split_key '#{fetch_specification.split_key}' failed (deadlock)"
    metrics.write_operation_result("obtain", "deadlocked")
    return false
  when :lock_timeout
    logger.warn "Obtaining lock for split_key '#{fetch_specification.split_key}' failed (lock timeout)"
    metrics.write_operation_result("obtain", "lock_timeout")
    return false
  when :taken
    logger.debug "Obtaining lock for split_key '#{fetch_specification.split_key}' unsuccessful (taken)"
    metrics.write_operation_result("obtain", "taken")
    return false
  else
    return result
  end
end
prepare_traps() click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 221
def prepare_traps
  Signal.trap("INT") do
    initiate_graceful_shutdown
  end
  Signal.trap("TERM") do
    initiate_graceful_shutdown
  end
end
refresh_lock_for_process(lock) click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 199
def refresh_lock_for_process(lock)
  result = lock.refresh(clock: @clock)
  case result
  when :ok
    return true
  when :deadlocked
    logger.warn "Refreshing lock for split_key '#{lock.split_key}' failed (deadlock)"
    metrics.write_operation_result("refresh", "deadlocked")
    return false
  when :lock_timeout
    logger.warn "Refreshing lock for split_key '#{lock.split_key}' failed (lock timeout)"
    metrics.write_operation_result("refresh", "lock_timeout")
    return false
  when :stolen
    logger.debug "Refreshing lock for split_key '#{lock.split_key}' unsuccessful (stolen)"
    metrics.write_operation_result("refresh", "stolen")
    return false
  else
    raise "Unexpected result #{result}"
  end
end
release_lock_for_process(fetch_specification) click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 181
def release_lock_for_process(fetch_specification)
  result = repository.release_lock_for_process(fetch_specification, consumer_uuid)
  case result
  when :ok
  when :deadlocked
    logger.warn "Releasing lock for split_key '#{fetch_specification.split_key}' failed (deadlock)"
    metrics.write_operation_result("release", "deadlocked")
  when :lock_timeout
    logger.warn "Releasing lock for split_key '#{fetch_specification.split_key}' failed (lock timeout)"
    metrics.write_operation_result("release", "lock_timeout")
  when :not_taken_by_this_process
    logger.debug "Releasing lock for split_key '#{fetch_specification.split_key}' failed (not taken by this process)"
    metrics.write_operation_result("release", "not_taken_by_this_process")
  else
    raise "Unexpected result #{result}"
  end
end
retrieve_batch(fetch_specification) click to toggle source
# File lib/ruby_event_store/outbox/consumer.rb, line 234
def retrieve_batch(fetch_specification)
  repository.retrieve_batch(fetch_specification, batch_size)
end