class EventStoreClient::HTTP::Commands::PersistentSubscriptions::Read
Public Instance Methods
call(stream_name, subscription_name, options: {}) { |deserialize_event(entry, skip_decryption: skip_decryption)| ... }
click to toggle source
# File lib/event_store_client/adapters/http/commands/persistent_subscriptions/read.rb, line 11 def call(stream_name, subscription_name, options: {}) count = options[:count] || 20 long_poll = options[:long_poll].to_i headers = long_poll.positive? ? { 'ES-LongPoll' => long_poll.to_s } : {} headers['Content-Type'] = 'application/vnd.eventstore.competingatom+json' headers['Accept'] = 'application/vnd.eventstore.competingatom+json' headers['ES-ResolveLinktos'] = (options[:resolve_links] || true).to_s response = connection.call( :get, "/subscriptions/#{stream_name}/#{subscription_name}/#{count}", headers: headers ) return { events: [] } if response.body.nil? || response.body.empty? body = JSON.parse(response.body) ack_info = body['links'].find { |link| link['relation'] == 'ackAll' } return { events: [] } unless ack_info skip_decryption = options[:skip_decryption] || false body['entries'].map do |entry| yield deserialize_event(entry, skip_decryption: skip_decryption) end Ack.new(connection).call(ack_info['uri']) Success() end
Private Instance Methods
deserialize_event(entry, skip_decryption: false)
click to toggle source
# File lib/event_store_client/adapters/http/commands/persistent_subscriptions/read.rb, line 43 def deserialize_event(entry, skip_decryption: false) event = EventStoreClient::Event.new( id: entry['eventId'], title: entry['title'], type: entry['eventType'], data: entry['data'] || '{}', metadata: entry['isMetaData'] ? entry['metaData'] : '{}' ) config.mapper.deserialize(event, skip_decryption: skip_decryption) rescue EventStoreClient::DeserializedEvent::InvalidDataError event end