class DirtyPipeline::PG::Storage
Constants
- FIND_EVENT
- SAVE_EVENT
Attributes
field[R]
store[R]
subject[R]
subject_key[R]
to_h[R]
Public Class Methods
create!(connection)
click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 13 def self.create!(connection) connection.exec <<~SQL CREATE TABLE dp_events_store ( uuid TEXT CONSTRAINT primary_active_operations_key PRIMARY KEY, context TEXT NOT NULL, data TEXT, error TEXT, created_at TIMESTAMP NOT NULL DEFAULT now() ); SQL end
destroy!(connection)
click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 25 def self.destroy!(connection) connection.exec <<~SQL DROP TABLE IF EXISTS dp_events_store; SQL end
new(subject, field)
click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 33 def initialize(subject, field) @subject = subject @field = field @store = subject.send(@field).to_h reset if @store.empty? @subject_key = "#{subject.class}:#{subject.id}" raise InvalidPipelineStorage, store unless valid_store? end
Public Instance Methods
commit!(event)
click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 61 def commit!(event) store["status"] = event.destination if event.success? store["state"].merge!(event.changes) unless event.changes.to_h.empty? data, error = {}, {} data = event.data.to_h if event.data.respond_to?(:to_h) error = event.error.to_h if event.error.respond_to?(:to_h) with_postgres do |c| c.exec( SAVE_EVENT, [event.id, subject_key, JSON.dump(data), JSON.dump(error)] ) end save! end
find_event(event_id)
click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 80 def find_event(event_id) with_postgres do |c| found_event, found_error = PG.multi(c.exec(FIND_EVENT, [event_id, subject_key])) return unless found_event Event.new( data: JSON.parse(found_event), error: JSON.parse(found_error) ) end end
reset!()
click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 46 def reset! reset save! end
status()
click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 51 def status store["status"] end
with_postgres(&block)
click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 42 def with_postgres(&block) DirtyPipeline.with_postgres(&block) end
Private Instance Methods
reset()
click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 103 def reset @store = subject.send( "#{field}=", { "status" => nil, "state" => {}, } ) end
save!()
click to toggle source
FIXME: save! - configurable method
# File lib/dirty_pipeline/pg/storage.rb, line 98 def save! subject.send("#{field}=", store) subject.save! end
valid_store?()
click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 93 def valid_store? (store.keys & %w(status state)).size.eql?(2) end