class RubyEventStore::Outbox::Repository
Constants
- RECENTLY_LOCKED_DURATION
Public Class Methods
new(database_url)
click to toggle source
# File lib/ruby_event_store/outbox/repository.rb, line 118 def initialize(database_url) ActiveRecord::Base.establish_connection(database_url) unless ActiveRecord::Base.connected? if ActiveRecord::Base.connection.adapter_name == "Mysql2" ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1;") end end
Public Instance Methods
delete_enqueued_older_than(fetch_specification, duration)
click to toggle source
# File lib/ruby_event_store/outbox/repository.rb, line 145 def delete_enqueued_older_than(fetch_specification, duration) Record .for_fetch_specification(fetch_specification) .where("enqueued_at < ?", duration.ago) .delete_all end
get_remaining_count(fetch_specification)
click to toggle source
# File lib/ruby_event_store/outbox/repository.rb, line 129 def get_remaining_count(fetch_specification) Record.remaining_for(fetch_specification).count end
mark_as_enqueued(record, now)
click to toggle source
# File lib/ruby_event_store/outbox/repository.rb, line 141 def mark_as_enqueued(record, now) record.update_column(:enqueued_at, now) end
obtain_lock_for_process(fetch_specification, process_uuid, clock:)
click to toggle source
# File lib/ruby_event_store/outbox/repository.rb, line 133 def obtain_lock_for_process(fetch_specification, process_uuid, clock:) Lock.obtain(fetch_specification, process_uuid, clock: clock) end
release_lock_for_process(fetch_specification, process_uuid)
click to toggle source
# File lib/ruby_event_store/outbox/repository.rb, line 137 def release_lock_for_process(fetch_specification, process_uuid) Lock.release(fetch_specification, process_uuid) end
retrieve_batch(fetch_specification, batch_size)
click to toggle source
# File lib/ruby_event_store/outbox/repository.rb, line 125 def retrieve_batch(fetch_specification, batch_size) Record.remaining_for(fetch_specification).order("id ASC").limit(batch_size).to_a end