class EventSourcery::DynamoDB::Tracker
Attributes
connection[R]
table_name[R]
Public Class Methods
new(connection, table_name:, obtain_processor_lock: true)
click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 4 def initialize(connection, table_name:, obtain_processor_lock: true) @connection = connection @table_name = table_name.to_s @obtain_processor_lock = obtain_processor_lock end
Public Instance Methods
last_processed_event_id(processor_name)
click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 23 def last_processed_event_id(processor_name) track_entry = get_track_entry(processor_name) track_entry.fetch('last_processed_event_id').to_i end
processed_event(processor_name, event_id)
click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 15 def processed_event(processor_name, event_id) update_track_entry(processor_name, event_id) end
reset_last_processed_event_id(processor_name)
click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 19 def reset_last_processed_event_id(processor_name) update_track_entry(processor_name, 0) end
setup(processor_name = nil)
click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 10 def setup(processor_name = nil) create_table_if_not_exists create_track_entry_if_not_exists(processor_name) end
tracked_processors()
click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 28 def tracked_processors end
Private Instance Methods
create_table_if_not_exists()
click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 35 def create_table_if_not_exists tables = connection.list_tables.to_h unless tables[:table_names].include?(table_name) ::EventSourcery::DynamoDB::Schema.create_projector_tracker(db: connection, table_name: table_name) end end
create_track_entry_if_not_exists(processor_name)
click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 42 def create_track_entry_if_not_exists(processor_name) track_entry = get_track_entry(processor_name) if track_entry.nil? update_track_entry(processor_name, 0) end end
get_track_entry(processor_name)
click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 49 def get_track_entry(processor_name) connection.get_item({ table_name: table_name, key: { 'name' => processor_name } }).item end
update_track_entry(processor_name, event_id)
click to toggle source
# File lib/event_sourcery/dynamodb/tracker.rb, line 56 def update_track_entry(processor_name, event_id) connection.update_item({ table_name: table_name, key: { 'name' => processor_name }, attribute_updates: { 'last_processed_event_id' => { value: event_id } } }) end