class EventSourcery::DynamoDB::Tracker

Attributes

connection[R]
table_name[R]

Public Class Methods

new(connection, table_name:, obtain_processor_lock: true) click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 4
def initialize(connection, table_name:, obtain_processor_lock: true)
  @connection = connection
  @table_name = table_name.to_s
  @obtain_processor_lock = obtain_processor_lock
end

Public Instance Methods

last_processed_event_id(processor_name) click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 23
def last_processed_event_id(processor_name)
  track_entry = get_track_entry(processor_name)
  track_entry.fetch('last_processed_event_id').to_i
end
processed_event(processor_name, event_id) click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 15
def processed_event(processor_name, event_id)
  update_track_entry(processor_name, event_id)
end
reset_last_processed_event_id(processor_name) click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 19
def reset_last_processed_event_id(processor_name)
  update_track_entry(processor_name, 0)
end
setup(processor_name = nil) click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 10
def setup(processor_name = nil)
  create_table_if_not_exists
  create_track_entry_if_not_exists(processor_name)
end
tracked_processors() click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 28
def tracked_processors
end

Private Instance Methods

create_table_if_not_exists() click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 35
def create_table_if_not_exists
  tables = connection.list_tables.to_h
  unless tables[:table_names].include?(table_name)
    ::EventSourcery::DynamoDB::Schema.create_projector_tracker(db: connection, table_name: table_name)
  end
end
create_track_entry_if_not_exists(processor_name) click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 42
def create_track_entry_if_not_exists(processor_name)
  track_entry = get_track_entry(processor_name)
  if track_entry.nil?
    update_track_entry(processor_name, 0)
  end
end
get_track_entry(processor_name) click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 49
def get_track_entry(processor_name)
  connection.get_item({
    table_name: table_name,
    key: { 'name' => processor_name }
  }).item
end
update_track_entry(processor_name, event_id) click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 56
def update_track_entry(processor_name, event_id)
  connection.update_item({
    table_name: table_name,
    key: { 'name' => processor_name },
    attribute_updates: {
      'last_processed_event_id' => { value: event_id }
    }
  })
end