class EventStoreClient::GRPC::Commands::PersistentSubscriptions::Read
Public Instance Methods
call(stream, group, options: {}) { |deserialize_event(event.event, skip_decryption: skip_decryption)| ... }
click to toggle source
Read
given persistent subscription @param [String] name of the stream to subscribe @param [String] name of the subscription group @param [Hash] options - additional settings to be set on subscription.
Refer to SettingsSchema for detailed attributes allowed
@return [Dry::Monads::Result::Success, Dry::Monads::Result::Failure]
# File lib/event_store_client/adapters/grpc/commands/persistent_subscriptions/read.rb, line 25 def call(stream, group, options: {}) count = options[:count].to_i opts = { stream_identifier: { streamName: stream }, buffer_size: count, group_name: group, uuid_option: { structured: {} } } requests = [request.new(options: opts)] # please notice that it's an array. Should be? skip_decryption = options[:skip_decryption] || false service.read(requests, metadata: metadata).each do |res| next if res.subscription_confirmation yield deserialize_event(res.event.event, skip_decryption: skip_decryption) if block_given? end Success() end
Private Instance Methods
deserialize_event(entry, skip_decryption: false)
click to toggle source
# File lib/event_store_client/adapters/grpc/commands/persistent_subscriptions/read.rb, line 51 def deserialize_event(entry, skip_decryption: false) id = entry.id.string id = SecureRandom.uuid if id.nil? || id.empty? data = (entry.data.nil? || entry.data.empty?) ? '{}' : entry.data metadata = JSON.parse(entry.custom_metadata || '{}').merge( entry.metadata.to_h || {} ).to_json config.mapper.deserialize( EventStoreClient::Event.new( id: id, title: "#{entry.stream_revision}@#{entry.stream_identifier.streamName}", type: entry.metadata['type'], data: data, metadata: metadata ), skip_decryption: skip_decryption ) end