class RubyEventStore::Client
Constants
- EMPTY_HASH
Attributes
Public Class Methods
# File lib/ruby_event_store/client.rb, line 7 def initialize( repository: InMemoryRepository.new, mapper: Mappers::Default.new, subscriptions: Subscriptions.new, dispatcher: Dispatcher.new, clock: default_clock, correlation_id_generator: default_correlation_id_generator, event_type_resolver: EventTypeResolver.new ) @repository = repository @mapper = mapper @subscriptions = subscriptions @broker = Broker.new(subscriptions: subscriptions, dispatcher: dispatcher) @clock = clock @metadata = Concurrent::ThreadLocalVar.new @correlation_id_generator = correlation_id_generator @event_type_resolver = event_type_resolver end
Public Instance Methods
Persists new event(s) without notifying any subscribed handlers
@param (see publish
) @return [self]
# File lib/ruby_event_store/client.rb, line 48 def append(events, stream_name: GLOBAL_STREAM, expected_version: :any) append_records_to_stream( transform(enrich_events_metadata(events)), stream_name: stream_name, expected_version: expected_version ) self end
Deletes a stream. All events from the stream remain intact but they are no longer linked to the stream.
@param stream_name [String] name of the stream to be cleared. @return [self]
# File lib/ruby_event_store/client.rb, line 75 def delete_stream(stream_name) repository.delete_stream(Stream.new(stream_name)) self end
Deserialize event which was serialized for async event handlers {railseventstore.org/docs/subscribe/#async-handlers Read more}
@return [Event] deserialized event
# File lib/ruby_event_store/client.rb, line 273 def deserialize(serializer:, event_type:, event_id:, data:, metadata:, timestamp: nil, valid_at: nil) extract_timestamp = lambda { |m| (m[:timestamp] || Time.parse(m.fetch("timestamp"))).iso8601 } mapper.record_to_event( SerializedRecord .new( event_type: event_type, event_id: event_id, data: data, metadata: metadata, timestamp: timestamp || timestamp_ = extract_timestamp[serializer.load(metadata)], valid_at: valid_at || timestamp_ ) .deserialize(serializer) ) end
Checks whether event is linked in given stream
@param event_id [String] @param stream_name [String] @return [Boolean] true if event is linked to given stream, false otherwise
# File lib/ruby_event_store/client.rb, line 127 def event_in_stream?(event_id, stream_name) stream = Stream.new(stream_name) stream.global? ? repository.has_event?(event_id) : repository.event_in_stream?(event_id, stream) end
Gets position of the event in global stream
The position is always nonnegative. Global position may have gaps, meaning, there may be event at position 40, but no event at position 39.
@param event_id [String] @raise [EventNotFound] @return [Integer] nonnegno ative integer position of event in global stream
# File lib/ruby_event_store/client.rb, line 118 def global_position(event_id) repository.global_position(event_id) end
# File lib/ruby_event_store/client.rb, line 332 def inspect "#<#{self.class}:0x#{__id__.to_s(16)}>" end
Links already persisted event(s) to a different stream. Does not notify any subscribed handlers.
@param event_ids [String, Array<String>] ids of events @param stream_name (see publish
) @param expected_version (see publish
) @return [self]
# File lib/ruby_event_store/client.rb, line 64 def link(event_ids, stream_name:, expected_version: :any) repository.link_to_stream(Array(event_ids), Stream.new(stream_name), ExpectedVersion.new(expected_version)) self end
Read additional metadata which will be added for published events {railseventstore.org/docs/request_metadata#passing-your-own-metadata-using-with_metadata-method Read more}
@return [Hash]
# File lib/ruby_event_store/client.rb, line 294 def metadata @metadata.value || EMPTY_HASH end
Overwrite existing event(s) with the same ID.
Does not notify any subscribed handlers. Does not enrich with additional current metadata. Does not allow changing which streams these events are in. {railseventstore.org/docs/migrating_messages Read more}
@example Add data and metadata to existing events
events = event_store.read.limit(10).to_a events.each do |ev| ev.data[:tenant_id] = 1 ev.metadata[:server_id] = "eu-west-2" end event_store.overwrite(events)
@example Change event type
events = event_store.read.limit(10).each.select{|ev| OldType === ev }.map do |ev| NewType.new( event_id: ev.event_id, data: ev.data, metadata: ev.metadata, ) end event_store.overwrite(events)
@param events [Array<Event>, Event] event(s) to serialize and overwrite again @return [self]
# File lib/ruby_event_store/client.rb, line 327 def overwrite(events_or_event) repository.update_messages(transform(Array(events_or_event))) self end
Gets position of the event in given stream
The position is always nonnegative. Returns nil if the event has no specific position in stream. Raise error if event is not present in stream.
@param event_id [String] @param stream_name [String] @return [Integer] nonnegative integer position of event in stream @raise [EventNotInStream]
# File lib/ruby_event_store/client.rb, line 105 def position_in_stream(event_id, stream_name) repository.position_in_stream(event_id, Stream.new(stream_name)) end
Persists events and notifies subscribed handlers about them
@param events [Array<Event>, Event] event(s) @param stream_name [String] name of the stream for persisting events. @param expected_version [:any, :auto, :none, Integer] controls optimistic locking strategy. {railseventstore.org/docs/expected_version/ Read more} @return [self]
# File lib/ruby_event_store/client.rb, line 32 def publish(events, stream_name: GLOBAL_STREAM, expected_version: :any) enriched_events = enrich_events_metadata(events) records = transform(enriched_events) append_records_to_stream(records, stream_name: stream_name, expected_version: expected_version) enriched_events.zip(records) do |event, record| with_metadata(correlation_id: event.metadata.fetch(:correlation_id), causation_id: event.event_id) do broker.(event, record) end end self end
Starts building a query specification for reading events. {railseventstore.org/docs/read/ More info.}
@return [Specification]
# File lib/ruby_event_store/client.rb, line 84 def read Specification.new(SpecificationReader.new(repository, mapper)) end
Gets list of streams where event is stored or linked
@return [Array<Stream>] where event is stored or linked
# File lib/ruby_event_store/client.rb, line 91 def streams_of(event_id) repository.streams_of(event_id) end
Subscribes a handler (subscriber) that will be invoked for published events of provided type.
@overload subscribe(subscriber, to:)
@param to [Array<Class>] types of events to subscribe @param subscriber [Object, Class] handler @return [Proc] - unsubscribe proc. Call to unsubscribe. @raise [ArgumentError, SubscriberNotExist]
@overload subscribe(to:, &subscriber)
@param to [Array<Class>] types of events to subscribe @param subscriber [Proc] handler @return [Proc] - unsubscribe proc. Call to unsubscribe. @raise [ArgumentError, SubscriberNotExist]
# File lib/ruby_event_store/client.rb, line 144 def subscribe(subscriber = nil, to:, &proc) raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc subscriber ||= proc broker.add_subscription(subscriber, to.map { |event_klass| event_type_resolver.call(event_klass) }) end
Subscribes a handler (subscriber) that will be invoked for all published events
@overload subscribe_to_all_events
(subscriber)
@param subscriber [Object, Class] handler @return [Proc] - unsubscribe proc. Call to unsubscribe. @raise [ArgumentError, SubscriberNotExist]
@overload subscribe_to_all_events
(&subscriber)
@param subscriber [Proc] handler @return [Proc] - unsubscribe proc. Call to unsubscribe. @raise [ArgumentError, SubscriberNotExist]
# File lib/ruby_event_store/client.rb, line 160 def subscribe_to_all_events(subscriber = nil, &proc) raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc broker.add_global_subscription(subscriber || proc) end
Get list of handlers subscribed to an event
@param to [Class, String] type of events to get list of sybscribed handlers @return [Array<Object, Class>]
# File lib/ruby_event_store/client.rb, line 169 def subscribers_for(event_class) subscriptions.all_for(event_type_resolver.call(event_class)) end
Set additional metadata for all events published within the provided block {railseventstore.org/docs/request_metadata#passing-your-own-metadata-using-with_metadata-method Read more}
@param metadata [Hash] metadata to set for events @param block [Proc] block of code during which the metadata will be added @return [Object] last value returned by the provided block
# File lib/ruby_event_store/client.rb, line 261 def with_metadata(metadata_for_block, &block) previous_metadata = metadata self.metadata = previous_metadata.merge(metadata_for_block) block.call if block_given? ensure self.metadata = previous_metadata end
Use for starting temporary subscriptions. {railseventstore.org/docs/subscribe/#temporary-subscriptions Read more}
@param block [Proc] block of code during which the temporary subscriptions will be active @return [Within] builder object which collects temporary subscriptions
# File lib/ruby_event_store/client.rb, line 250 def within(&block) raise ArgumentError if block.nil? Within.new(block, broker, event_type_resolver) end
Protected Instance Methods
# File lib/ruby_event_store/client.rb, line 368 def default_clock -> { Time.now.utc.round(TIMESTAMP_PRECISION) } end
# File lib/ruby_event_store/client.rb, line 372 def default_correlation_id_generator -> { SecureRandom.uuid } end
# File lib/ruby_event_store/client.rb, line 364 def metadata=(value) @metadata.value = value end
Private Instance Methods
# File lib/ruby_event_store/client.rb, line 358 def append_records_to_stream(records, stream_name:, expected_version:) repository.append_to_stream(records, Stream.new(stream_name), ExpectedVersion.new(expected_version)) end
# File lib/ruby_event_store/client.rb, line 351 def enrich_event_metadata(event) metadata.each { |key, value| event.metadata[key] ||= value } event.metadata[:timestamp] ||= clock.call event.metadata[:valid_at] ||= event.metadata.fetch(:timestamp) event.metadata[:correlation_id] ||= correlation_id_generator.call end
# File lib/ruby_event_store/client.rb, line 345 def enrich_events_metadata(events) events = Array(events) events.each { |event| enrich_event_metadata(event) } events end
# File lib/ruby_event_store/client.rb, line 341 def transform(events) events.map { |ev| mapper.event_to_record(ev) } end