class EventStoreClient::CatchUpSubscriptions

Constants

FILTER_DEFAULT_CHECKPOINT_INTERVAL_MULTIPLIER
FILTER_DEFAULT_MAX

Attributes

connection[R]
logger[R]
subscription_store[R]
subscriptions[R]

Public Class Methods

new(connection:, subscription_store:) click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 65
def initialize(connection:, subscription_store:)
  @connection = connection
  @subscription_store = subscription_store
  @subscriptions = []
  @logger = EventStoreClient.config.logger
end

Public Instance Methods

clean_unused() click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 53
def clean_unused
  subscription_store.clean_unused(subscriptions.map(&:name))
end
create_or_load(subscriber, filter: {}) click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 10
def create_or_load(subscriber, filter: {})
  filter_options = prepare_filter_options(filter)
  position = subscription_store.load_all_position(CatchUpSubscription.name(subscriber))

  subscription = CatchUpSubscription.new(subscriber, position: position, filter: filter_options)
  subscription_store.add(subscription) unless position

  subscriptions << subscription unless @subscriptions.find { |s| s.name == subscription.name }
  subscription
end
each() { |subscription| ... } click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 21
def each
  subscriptions.each { |subscription| yield(subscription) }
end
listen(subscription) click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 25
def listen(subscription)
  connection.subscribe(subscription.options) do |event_data|
    next if recorded_event?(event_data)
    next if confirmation?(event_data)

    new_position = event_data[0]
    event = event_data[1]

    old_position = subscription.position
    subscription.position = new_position
    subscription_store.update_position(subscription)
    next unless event

    subscription.subscriber.call(event)

    if Thread.current.thread_variable_get(:terminate)
      msg =
        "CatchUpSubscriptions: Terminating subscription listener for #{subscription.subscriber}"
      logger&.info(msg)
      break
    end
  rescue StandardError => e
    subscription.position = old_position
    subscription_store.update_position(subscription)
    config.error_handler&.call(e)
  end
end
reset() click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 57
def reset
  subscription_store.reset(subscriptions)
end

Private Instance Methods

confirmation?(event_data) click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 72
def confirmation?(event_data)
  event_data.is_a? EventStore::Client::Streams::ReadResp::SubscriptionConfirmation
end
prepare_filter_options(filter) click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 80
def prepare_filter_options(filter)
  return if filter.nil? || filter.empty?

  {
    event_type: filter[:event_type],
    stream_identifier: filter[:stream_identifier],
    max: FILTER_DEFAULT_MAX,
    checkpointIntervalMultiplier: FILTER_DEFAULT_CHECKPOINT_INTERVAL_MULTIPLIER
  }.compact
end
recorded_event?(event_data) click to toggle source
# File lib/event_store_client/catch_up_subscriptions.rb, line 76
def recorded_event?(event_data)
  event_data.is_a? EventStore::Client::Streams::ReadResp::ReadEvent::RecordedEvent
end