class EventStoreClient::HTTP::Client

Attributes

connection[R]

Public Class Methods

new() click to toggle source
# File lib/event_store_client/adapters/http/client.rb, line 128
def initialize
  @connection =
    Connection.new(config.eventstore_url, ssl: { verify: config.verify_ssl })
end

Public Instance Methods

append_to_stream(stream_name, events, options: {}) click to toggle source

Appends given events to the stream @param [String] Stream name to append events to @param [Array](each: EventStoreClient::DeserializedEvent) list of events to publish @return Dry::Monads::Result::Success or Dry::Monads::Result::Failure

# File lib/event_store_client/adapters/http/client.rb, line 16
def append_to_stream(stream_name, events, options: {})
  Commands::Streams::Append.new(connection).call(
    stream_name, events, options: options
  )
end
delete_stream(stream_name, options: {}) click to toggle source

Softly deletes the given stream @param [String] Stream name to delete @param options [Hash] additional options to the request @return Dry::Monads::Result::Success or Dry::Monads::Result::Failure

# File lib/event_store_client/adapters/http/client.rb, line 27
def delete_stream(stream_name, options: {})
  Commands::Streams::Delete.new(connection).call(
    stream_name, options: options
  )
end
listen(subscription, options: {}) { |event| ... } click to toggle source

Runs the persistent subscription indeinitely @param [EventStoreClient::Subscription] subscription to observe @param options [Hash] additional options to the request @return - Nothing, it is a blocking operation, yields the given block with event instead

# File lib/event_store_client/adapters/http/client.rb, line 111
def listen(subscription, options: {})
  loop do
    begin
      consume_feed(subscription) do |event|
        yield event if block_given?
      end
    rescue StandardError => e
      config.error_handler&.call(e)
    end
    sleep(options[:interval] || 5) # wait for events to be processed
  end
end
read(stream_name, options: {}) click to toggle source

Reads a page of events from the given stream @param [String] Stream name to read events from @param options [Hash] additional options to the request @return Dry::Monads::Result::Success with returned events or Dry::Monads::Result::Failure

# File lib/event_store_client/adapters/http/client.rb, line 49
def read(stream_name, options: {})
  Commands::Streams::Read.new(connection).call(stream_name, options: options)
end
read_all_from_stream(stream_name, options: {}) click to toggle source

Reads all events from the given stream @param [String] Stream name to read events from @param options [Hash] additional options to the request @return Dry::Monads::Result::Success with returned events or Dry::Monads::Result::Failure

# File lib/event_store_client/adapters/http/client.rb, line 58
def read_all_from_stream(stream_name, options: {})
  start ||= options[:start] || 0
  count ||= options[:count] || 20
  events = []
  failed_requests_count = 0

  while failed_requests_count < 3
    res = read(stream_name, options: options.merge(start: start, count: count))
    if res.failure?
      failed_requests_count += 1
    else
      break if res.value!.empty?
      events += res.value!
      failed_requests_count = 0
      start += count
    end
  end
  return Failure(:connection_failed) if failed_requests_count >= 3

  Success(events)
end
subscribe_to_stream(subscription, options: {}) click to toggle source

Creates the subscription for the given stream @param [EventStoreClient::Subscription] subscription to observe @param options [Hash] additional options to the request @return Dry::Monads::Result::Success or Dry::Monads::Result::Failure

# File lib/event_store_client/adapters/http/client.rb, line 85
def subscribe_to_stream(subscription, options: {})
  join_streams(subscription.name, subscription.observed_streams)
  Commands::PersistentSubscriptions::Create.new(connection).call(
    subscription.stream,
    subscription.name,
    options: options
  )
end
tombstone_stream(stream_name, options: {}) click to toggle source

Completely removes the given stream @param [String] Stream name to delete @param options [Hash] additional options to the request @return Dry::Monads::Result::Success or Dry::Monads::Result::Failure

# File lib/event_store_client/adapters/http/client.rb, line 38
def tombstone_stream(stream_name, options: {})
  Commands::Streams::Tombstone.new(connection).call(
    stream_name, options: options
  )
end

Private Instance Methods

consume_feed(subscription, options: {}) { |event| ... } click to toggle source

@api private Consumes the new events from the subscription @param [EventStoreClient::Subscription] subscription to observe @param options [Hash] additional options to the request @return Dry::Monads::Result::Success or Dry::Monads::Result::Failure

# File lib/event_store_client/adapters/http/client.rb, line 152
def consume_feed(subscription, options: {})
  Commands::PersistentSubscriptions::Read.new(connection).call(
    subscription.stream, subscription.name, options: options
  ) do |event|
    yield event if block_given?
  end
end
join_streams(name, streams, options: {}) click to toggle source

@api private Joins multiple streams into the new one under the given name @param [String] Name of the stream containing the ones to join @param [Array] (each: String) list of streams to join together @return Dry::Monads::Result::Success or Dry::Monads::Result::Failure

# File lib/event_store_client/adapters/http/client.rb, line 139
def join_streams(name, streams, options: {})
  res = Commands::Projections::Create.new(connection).call(name, streams, options: options)
  return res if res.success?

  Commands::Projections::Update.new(connection).call(name, streams, options: options)
end