class RubyEventStore::InMemoryRepository

Attributes

mutex[R]
serializer[R]
storage[R]
streams[R]

Public Class Methods

new(serializer: NULL, ensure_supported_any_usage: false) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 26
def initialize(serializer: NULL, ensure_supported_any_usage: false)
  @serializer = serializer
  @streams = Hash.new { |h, k| h[k] = Array.new }
  @mutex = Mutex.new
  @storage = Hash.new
  @ensure_supported_any_usage = ensure_supported_any_usage
end

Public Instance Methods

append_to_stream(records, stream, expected_version) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 34
def append_to_stream(records, stream, expected_version)
  serialized_records = records.map { |record| record.serialize(serializer) }

  with_synchronize(expected_version, stream) do |resolved_version|
    ensure_supported_any_usage(resolved_version, stream)
    unless resolved_version.nil? || last_stream_version(stream).equal?(resolved_version)
      raise WrongExpectedEventVersion
    end

    serialized_records.each_with_index do |serialized_record, index|
      raise EventDuplicatedInStream if has_event?(serialized_record.event_id)
      storage[serialized_record.event_id] = serialized_record
      add_to_stream(stream, serialized_record, resolved_version, index)
    end
  end
  self
end
count(spec) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 103
def count(spec)
  read_scope(spec).count
end
delete_stream(stream) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 69
def delete_stream(stream)
  streams.delete(stream.name)
end
event_in_stream?(event_id, stream) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 139
def event_in_stream?(event_id, stream)
  !streams[stream.name].find { |event_in_stream| event_in_stream.event_id.eql?(event_id) }.nil?
end
global_position(event_id) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 135
def global_position(event_id)
  storage.keys.index(event_id) or raise EventNotFound.new(event_id)
end
has_event?(event_id) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 73
def has_event?(event_id)
  storage.has_key?(event_id)
end
last_stream_event(stream) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 77
def last_stream_event(stream)
  last_id = event_ids_of_stream(stream).last
  storage.fetch(last_id).deserialize(serializer) if last_id
end
position_in_stream(event_id, stream) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 129
def position_in_stream(event_id, stream)
  event_in_stream = streams[stream.name].find { |event_in_stream| event_in_stream.event_id.eql?(event_id) }
  raise EventNotFoundInStream if event_in_stream.nil?
  event_in_stream.position
end
read(spec) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 82
def read(spec)
  serialized_records = read_scope(spec)
  if spec.batched?
    batch_reader = ->(offset, limit) do
      serialized_records
        .drop(offset)
        .take(limit)
        .map { |serialized_record| serialized_record.deserialize(serializer) }
    end
    BatchEnumerator.new(spec.batch_size, serialized_records.size, batch_reader).each
  elsif spec.first?
    serialized_records.first&.deserialize(serializer)
  elsif spec.last?
    serialized_records.last&.deserialize(serializer)
  else
    Enumerator.new do |y|
      serialized_records.each { |serialized_record| y << serialized_record.deserialize(serializer) }
    end
  end
end
streams_of(event_id) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 125
def streams_of(event_id)
  streams.select { |name,| has_event_in_stream?(event_id, name) }.map { |name,| Stream.new(name) }
end
update_messages(records) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 107
def update_messages(records)
  records.each do |record|
    read_event(record.event_id)
    serialized_record =
      Record
        .new(
          event_id: record.event_id,
          event_type: record.event_type,
          data: record.data,
          metadata: record.metadata,
          timestamp: Time.iso8601(storage.fetch(record.event_id).timestamp),
          valid_at: record.valid_at
        )
        .serialize(serializer)
    storage[record.event_id] = serialized_record
  end
end

Private Instance Methods

add_to_stream(stream, serialized_record, resolved_version, index) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 233
def add_to_stream(stream, serialized_record, resolved_version, index)
  streams[stream.name] << EventInStream.new(serialized_record.event_id, compute_position(resolved_version, index))
end
compute_position(resolved_version, index) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 229
def compute_position(resolved_version, index)
  resolved_version + index + 1 unless resolved_version.nil?
end
ensure_supported_any_usage(resolved_version, stream) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 237
def ensure_supported_any_usage(resolved_version, stream)
  if @ensure_supported_any_usage
    stream_positions = streams.fetch(stream.name, Array.new).map(&:position)
    if resolved_version.nil?
      raise UnsupportedVersionAnyUsage if !stream_positions.compact.empty?
    else
      raise UnsupportedVersionAnyUsage if stream_positions.include?(nil)
    end
  end
end
event_ids_of_stream(stream) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 181
def event_ids_of_stream(stream)
  streams.fetch(stream.name, Array.new).map(&:event_id)
end
has_event_in_stream?(event_id, stream_name) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 218
def has_event_in_stream?(event_id, stream_name)
  streams.fetch(stream_name, Array.new).any? { |event_in_stream| event_in_stream.event_id.eql?(event_id) }
end
index_of(source, event_id) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 222
def index_of(source, event_id)
  index = source.index { |item| item.event_id.eql?(event_id) }
  raise EventNotFound.new(event_id) unless index

  index
end
last_stream_version(stream) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 200
def last_stream_version(stream)
  streams.fetch(stream.name, Array.new).size - 1
end
ordered(serialized_records, spec) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 189
def ordered(serialized_records, spec)
  case spec.time_sort_by
  when :as_at
    serialized_records.sort_by(&:timestamp)
  when :as_of
    serialized_records.sort_by(&:valid_at)
  else
    serialized_records
  end
end
read_event(event_id) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 177
def read_event(event_id)
  storage.fetch(event_id) { raise EventNotFound.new(event_id) }
end
read_scope(spec) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 145
def read_scope(spec)
  serialized_records = serialized_records_of_stream(spec.stream)
  serialized_records = ordered(serialized_records, spec)
  serialized_records = serialized_records.select { |e| spec.with_ids.any? { |x| x.eql?(e.event_id) } } if spec
    .with_ids?
  serialized_records = serialized_records.select { |e| spec.with_types.any? { |x| x.eql?(e.event_type) } } if spec
    .with_types?
  serialized_records = serialized_records.reverse if spec.backward?
  serialized_records = serialized_records.drop(index_of(serialized_records, spec.start) + 1) if spec.start
  serialized_records = serialized_records.take(index_of(serialized_records, spec.stop)) if spec.stop
  serialized_records = serialized_records.take(spec.limit) if spec.limit?
  serialized_records = serialized_records.select { |sr| Time.iso8601(time_comparison_field(spec, sr)) < spec.older_than } if spec
    .older_than
  serialized_records =
    serialized_records.select { |sr| Time.iso8601(time_comparison_field(spec, sr)) <= spec.older_than_or_equal } if spec
    .older_than_or_equal
  serialized_records = serialized_records.select { |sr| Time.iso8601(time_comparison_field(spec, sr)) > spec.newer_than } if spec
    .newer_than
  serialized_records =
    serialized_records.select { |sr| Time.iso8601(time_comparison_field(spec, sr)) >= spec.newer_than_or_equal } if spec
    .newer_than_or_equal
  serialized_records
end
serialized_records_of_stream(stream) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 185
def serialized_records_of_stream(stream)
  stream.global? ? storage.values : storage.fetch_values(*event_ids_of_stream(stream))
end
time_comparison_field(spec, sr) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 169
def time_comparison_field(spec, sr)
  if spec.time_sort_by_as_of?
    sr.valid_at
  else
    sr.timestamp
  end
end
with_synchronize(expected_version, stream, &block) click to toggle source
# File lib/ruby_event_store/in_memory_repository.rb, line 204
def with_synchronize(expected_version, stream, &block)
  resolved_version = expected_version.resolve_for(stream, method(:last_stream_version))

  # expected_version :auto assumes external lock is used
  # which makes reading stream before writing safe.
  #
  # To emulate potential concurrency issues of :auto strategy without
  # such external lock we use Thread.pass to make race
  # conditions more likely. And we only use mutex.synchronize for writing
  # not for the whole read+write algorithm.
  Thread.pass
  mutex.synchronize { block.call(resolved_version) }
end