class EventStoreClient::CatchUpSubscriptions
Constants
- FILTER_DEFAULT_CHECKPOINT_INTERVAL_MULTIPLIER
- FILTER_DEFAULT_MAX
Attributes
connection[R]
logger[R]
subscription_store[R]
subscriptions[R]
Public Class Methods
new(connection:, subscription_store:)
click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 65 def initialize(connection:, subscription_store:) @connection = connection @subscription_store = subscription_store @subscriptions = [] @logger = EventStoreClient.config.logger end
Public Instance Methods
clean_unused()
click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 53 def clean_unused subscription_store.clean_unused(subscriptions.map(&:name)) end
create_or_load(subscriber, filter: {})
click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 10 def create_or_load(subscriber, filter: {}) filter_options = prepare_filter_options(filter) position = subscription_store.load_all_position(CatchUpSubscription.name(subscriber)) subscription = CatchUpSubscription.new(subscriber, position: position, filter: filter_options) subscription_store.add(subscription) unless position subscriptions << subscription unless @subscriptions.find { |s| s.name == subscription.name } subscription end
each() { |subscription| ... }
click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 21 def each subscriptions.each { |subscription| yield(subscription) } end
listen(subscription)
click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 25 def listen(subscription) connection.subscribe(subscription.options) do |event_data| next if recorded_event?(event_data) next if confirmation?(event_data) new_position = event_data[0] event = event_data[1] old_position = subscription.position subscription.position = new_position subscription_store.update_position(subscription) next unless event subscription.subscriber.call(event) if Thread.current.thread_variable_get(:terminate) msg = "CatchUpSubscriptions: Terminating subscription listener for #{subscription.subscriber}" logger&.info(msg) break end rescue StandardError => e subscription.position = old_position subscription_store.update_position(subscription) config.error_handler&.call(e) end end
reset()
click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 57 def reset subscription_store.reset(subscriptions) end
Private Instance Methods
confirmation?(event_data)
click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 72 def confirmation?(event_data) event_data.is_a? EventStore::Client::Streams::ReadResp::SubscriptionConfirmation end
prepare_filter_options(filter)
click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 80 def prepare_filter_options(filter) return if filter.nil? || filter.empty? { event_type: filter[:event_type], stream_identifier: filter[:stream_identifier], max: FILTER_DEFAULT_MAX, checkpointIntervalMultiplier: FILTER_DEFAULT_CHECKPOINT_INTERVAL_MULTIPLIER }.compact end
recorded_event?(event_data)
click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 76 def recorded_event?(event_data) event_data.is_a? EventStore::Client::Streams::ReadResp::ReadEvent::RecordedEvent end