class EventStoreClient::GRPC::Commands::Streams::Read

Constants

StreamNotFound

Public Instance Methods

call(name, options: {}) click to toggle source
# File lib/event_store_client/adapters/grpc/commands/streams/read.rb, line 22
def call(name, options: {})
  direction =
    EventStoreClient::ReadDirection.new(options[:direction] || 'forwards').to_sym
  opts = {
    stream: {
      stream_identifier: {
        streamName: name
      }
    },
    read_direction: direction,
    resolve_links: options[:resolve_links] || true,
    count: options[:count] || config.per_page,
    uuid_option: {
      string: {}
    },
    no_filter: {}
  }
  options[:start] ||= 0
  if options[:start].zero?
    opts[:stream][:start] = {}
  else
    opts[:stream][:revision] = options[:start]
  end

  skip_decryption = options[:skip_decryption] || false
  events =
    if options[:skip_deserialization]
      read_stream_raw(opts)
    else
      read_stream(opts, skip_decryption)
    end
  Success(events)
rescue StreamNotFound
  Failure(:not_found)
end

Private Instance Methods

deserialize_event(entry, skip_decryption: false) click to toggle source
# File lib/event_store_client/adapters/grpc/commands/streams/read.rb, line 84
def deserialize_event(entry, skip_decryption: false)
  data = (entry.data.nil? || entry.data.empty?) ? '{}' : entry.data

  metadata =
    JSON.parse(entry.custom_metadata || '{}').merge(
      entry.metadata.to_h || {}
    ).to_json

  event = EventStoreClient::Event.new(
    id: entry.id.string,
    title: "#{entry.stream_revision}@#{entry.stream_identifier.streamName}",
    type: entry.metadata['type'],
    data: data,
    metadata: metadata
  )

  config.mapper.deserialize(event, skip_decryption: skip_decryption)
end
read_stream(options, skip_decryption) click to toggle source
# File lib/event_store_client/adapters/grpc/commands/streams/read.rb, line 60
def read_stream(options, skip_decryption)
  retries ||= 0
  service.read(request.new(options: options), metadata: metadata).map do |res|
    raise StreamNotFound if res.stream_not_found
    deserialize_event(res.event.event, skip_decryption: skip_decryption)
  end
rescue ::GRPC::Unavailable
  sleep config.grpc_unavailable_retry_sleep
  retry if (retries += 1) <= config.grpc_unavailable_retry_count
  raise GRPCUnavailableRetryFailed
end
read_stream_raw(options) click to toggle source
# File lib/event_store_client/adapters/grpc/commands/streams/read.rb, line 72
def read_stream_raw(options)
  retries ||= 0
  service.read(request.new(options: options), metadata: metadata).map do |res|
    raise StreamNotFound if res.stream_not_found
    res.event.event
  end
rescue ::GRPC::Unavailable
  sleep config.grpc_unavailable_retry_sleep
  retry if (retries += 1) <= config.grpc_unavailable_retry_count
  raise GRPCUnavailableRetryFailed
end