class RubyEventStore::InMemoryRepository
Attributes
mutex[R]
serializer[R]
storage[R]
streams[R]
Public Class Methods
new(serializer: NULL, ensure_supported_any_usage: false)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 26 def initialize(serializer: NULL, ensure_supported_any_usage: false) @serializer = serializer @streams = Hash.new { |h, k| h[k] = Array.new } @mutex = Mutex.new @storage = Hash.new @ensure_supported_any_usage = ensure_supported_any_usage end
Public Instance Methods
append_to_stream(records, stream, expected_version)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 34 def append_to_stream(records, stream, expected_version) serialized_records = records.map { |record| record.serialize(serializer) } with_synchronize(expected_version, stream) do |resolved_version| ensure_supported_any_usage(resolved_version, stream) unless resolved_version.nil? || last_stream_version(stream).equal?(resolved_version) raise WrongExpectedEventVersion end serialized_records.each_with_index do |serialized_record, index| raise EventDuplicatedInStream if has_event?(serialized_record.event_id) storage[serialized_record.event_id] = serialized_record add_to_stream(stream, serialized_record, resolved_version, index) end end self end
count(spec)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 103 def count(spec) read_scope(spec).count end
delete_stream(stream)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 69 def delete_stream(stream) streams.delete(stream.name) end
event_in_stream?(event_id, stream)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 139 def event_in_stream?(event_id, stream) !streams[stream.name].find { |event_in_stream| event_in_stream.event_id.eql?(event_id) }.nil? end
global_position(event_id)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 135 def global_position(event_id) storage.keys.index(event_id) or raise EventNotFound.new(event_id) end
has_event?(event_id)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 73 def has_event?(event_id) storage.has_key?(event_id) end
last_stream_event(stream)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 77 def last_stream_event(stream) last_id = event_ids_of_stream(stream).last storage.fetch(last_id).deserialize(serializer) if last_id end
link_to_stream(event_ids, stream, expected_version)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 52 def link_to_stream(event_ids, stream, expected_version) serialized_records = event_ids.map { |id| read_event(id) } with_synchronize(expected_version, stream) do |resolved_version| ensure_supported_any_usage(resolved_version, stream) unless resolved_version.nil? || last_stream_version(stream).equal?(resolved_version) raise WrongExpectedEventVersion end serialized_records.each_with_index do |serialized_record, index| raise EventDuplicatedInStream if has_event_in_stream?(serialized_record.event_id, stream.name) add_to_stream(stream, serialized_record, resolved_version, index) end end self end
position_in_stream(event_id, stream)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 129 def position_in_stream(event_id, stream) event_in_stream = streams[stream.name].find { |event_in_stream| event_in_stream.event_id.eql?(event_id) } raise EventNotFoundInStream if event_in_stream.nil? event_in_stream.position end
read(spec)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 82 def read(spec) serialized_records = read_scope(spec) if spec.batched? batch_reader = ->(offset, limit) do serialized_records .drop(offset) .take(limit) .map { |serialized_record| serialized_record.deserialize(serializer) } end BatchEnumerator.new(spec.batch_size, serialized_records.size, batch_reader).each elsif spec.first? serialized_records.first&.deserialize(serializer) elsif spec.last? serialized_records.last&.deserialize(serializer) else Enumerator.new do |y| serialized_records.each { |serialized_record| y << serialized_record.deserialize(serializer) } end end end
streams_of(event_id)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 125 def streams_of(event_id) streams.select { |name,| has_event_in_stream?(event_id, name) }.map { |name,| Stream.new(name) } end
update_messages(records)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 107 def update_messages(records) records.each do |record| read_event(record.event_id) serialized_record = Record .new( event_id: record.event_id, event_type: record.event_type, data: record.data, metadata: record.metadata, timestamp: Time.iso8601(storage.fetch(record.event_id).timestamp), valid_at: record.valid_at ) .serialize(serializer) storage[record.event_id] = serialized_record end end
Private Instance Methods
add_to_stream(stream, serialized_record, resolved_version, index)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 233 def add_to_stream(stream, serialized_record, resolved_version, index) streams[stream.name] << EventInStream.new(serialized_record.event_id, compute_position(resolved_version, index)) end
compute_position(resolved_version, index)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 229 def compute_position(resolved_version, index) resolved_version + index + 1 unless resolved_version.nil? end
ensure_supported_any_usage(resolved_version, stream)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 237 def ensure_supported_any_usage(resolved_version, stream) if @ensure_supported_any_usage stream_positions = streams.fetch(stream.name, Array.new).map(&:position) if resolved_version.nil? raise UnsupportedVersionAnyUsage if !stream_positions.compact.empty? else raise UnsupportedVersionAnyUsage if stream_positions.include?(nil) end end end
event_ids_of_stream(stream)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 181 def event_ids_of_stream(stream) streams.fetch(stream.name, Array.new).map(&:event_id) end
has_event_in_stream?(event_id, stream_name)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 218 def has_event_in_stream?(event_id, stream_name) streams.fetch(stream_name, Array.new).any? { |event_in_stream| event_in_stream.event_id.eql?(event_id) } end
index_of(source, event_id)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 222 def index_of(source, event_id) index = source.index { |item| item.event_id.eql?(event_id) } raise EventNotFound.new(event_id) unless index index end
last_stream_version(stream)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 200 def last_stream_version(stream) streams.fetch(stream.name, Array.new).size - 1 end
ordered(serialized_records, spec)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 189 def ordered(serialized_records, spec) case spec.time_sort_by when :as_at serialized_records.sort_by(&:timestamp) when :as_of serialized_records.sort_by(&:valid_at) else serialized_records end end
read_event(event_id)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 177 def read_event(event_id) storage.fetch(event_id) { raise EventNotFound.new(event_id) } end
read_scope(spec)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 145 def read_scope(spec) serialized_records = serialized_records_of_stream(spec.stream) serialized_records = ordered(serialized_records, spec) serialized_records = serialized_records.select { |e| spec.with_ids.any? { |x| x.eql?(e.event_id) } } if spec .with_ids? serialized_records = serialized_records.select { |e| spec.with_types.any? { |x| x.eql?(e.event_type) } } if spec .with_types? serialized_records = serialized_records.reverse if spec.backward? serialized_records = serialized_records.drop(index_of(serialized_records, spec.start) + 1) if spec.start serialized_records = serialized_records.take(index_of(serialized_records, spec.stop)) if spec.stop serialized_records = serialized_records.take(spec.limit) if spec.limit? serialized_records = serialized_records.select { |sr| Time.iso8601(time_comparison_field(spec, sr)) < spec.older_than } if spec .older_than serialized_records = serialized_records.select { |sr| Time.iso8601(time_comparison_field(spec, sr)) <= spec.older_than_or_equal } if spec .older_than_or_equal serialized_records = serialized_records.select { |sr| Time.iso8601(time_comparison_field(spec, sr)) > spec.newer_than } if spec .newer_than serialized_records = serialized_records.select { |sr| Time.iso8601(time_comparison_field(spec, sr)) >= spec.newer_than_or_equal } if spec .newer_than_or_equal serialized_records end
serialized_records_of_stream(stream)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 185 def serialized_records_of_stream(stream) stream.global? ? storage.values : storage.fetch_values(*event_ids_of_stream(stream)) end
time_comparison_field(spec, sr)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 169 def time_comparison_field(spec, sr) if spec.time_sort_by_as_of? sr.valid_at else sr.timestamp end end
with_synchronize(expected_version, stream, &block)
click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 204 def with_synchronize(expected_version, stream, &block) resolved_version = expected_version.resolve_for(stream, method(:last_stream_version)) # expected_version :auto assumes external lock is used # which makes reading stream before writing safe. # # To emulate potential concurrency issues of :auto strategy without # such external lock we use Thread.pass to make race # conditions more likely. And we only use mutex.synchronize for writing # not for the whole read+write algorithm. Thread.pass mutex.synchronize { block.call(resolved_version) } end