class EventStoreClient::GRPC::Commands::Streams::Subscribe
Constants
- StreamNotFound
Public Instance Methods
call(options = {}) { |prepared_response(res)| ... }
click to toggle source
# File lib/event_store_client/adapters/grpc/commands/streams/subscribe.rb, line 22 def call(options = {}) opts = options_with_defaults(options) service.read(request.new(options: opts), metadata: metadata).map do |res| raise StreamNotFound if res.stream_not_found yield prepared_response(res) if block_given? end rescue StreamNotFound Failure(:not_found) end
Private Instance Methods
default_all_options()
click to toggle source
# File lib/event_store_client/adapters/grpc/commands/streams/subscribe.rb, line 101 def default_all_options { position: { commit_position: 0, prepare_position: 0 } } end
deserialize_event(entry)
click to toggle source
# File lib/event_store_client/adapters/grpc/commands/streams/subscribe.rb, line 110 def deserialize_event(entry) data = (entry.data.nil? || entry.data.empty?) ? '{}' : entry.data metadata = JSON.parse(entry.custom_metadata || '{}').merge( entry.metadata.to_h || {} ).to_json event = EventStoreClient::Event.new( id: entry.id.string, title: "#{entry.stream_revision}@#{entry.stream_identifier.streamName}", type: entry.metadata['type'], data: data, metadata: metadata ) config.mapper.deserialize(event) end
options_with_defaults(options)
click to toggle source
# File lib/event_store_client/adapters/grpc/commands/streams/subscribe.rb, line 58 def options_with_defaults(options) options[:without_system_events] = true unless options[:without_system_events] == false opts = { subscription: {}, read_direction: read_direction(options[:direction]), resolve_links: options[:resolve_links] || true, uuid_option: { string: {} } } if options[:stream] opts[:stream] = { stream_identifier: { streamName: stream } } else opts[:all] = options[:all] || default_all_options end if options[:filter] opts[:filter] = options[:filter] elsif options[:without_system_events] opts[:filter] = { event_type: { regex: '^[^$].*' }, max: 32, checkpointIntervalMultiplier: 1000 } else opts[:no_filter] = {} end options[:start] ||= 0 return opts unless options[:stream] if options[:start].zero? opts[:stream][:start] = {} else opts[:stream][:revision] = options[:start] end opts end
position(event_or_checkpoint)
click to toggle source
# File lib/event_store_client/adapters/grpc/commands/streams/subscribe.rb, line 47 def position(event_or_checkpoint) { prepare_position: event_or_checkpoint.prepare_position, commit_position: event_or_checkpoint.commit_position } end
prepared_response(res)
click to toggle source
# File lib/event_store_client/adapters/grpc/commands/streams/subscribe.rb, line 36 def prepared_response(res) if res.event event = res.event.event [position(event), deserialize_event(event)] rescue event elsif res.checkpoint [position(res.checkpoint), nil] elsif res.confirmation res.confirmation end end
read_direction(direction)
click to toggle source
# File lib/event_store_client/adapters/grpc/commands/streams/subscribe.rb, line 54 def read_direction(direction) EventStoreClient::ReadDirection.new(direction || 'forwards').to_sym end