class DirtyPipeline::PG::Storage

Constants

FIND_EVENT
SAVE_EVENT

Attributes

field[R]
store[R]
subject[R]
subject_key[R]
to_h[R]

Public Class Methods

create!(connection) click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 13
      def self.create!(connection)
        connection.exec <<~SQL
          CREATE TABLE dp_events_store (
            uuid TEXT CONSTRAINT primary_active_operations_key PRIMARY KEY,
            context TEXT NOT NULL,
            data TEXT,
            error TEXT,
            created_at TIMESTAMP NOT NULL DEFAULT now()
          );
        SQL
      end
destroy!(connection) click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 25
      def self.destroy!(connection)
        connection.exec <<~SQL
          DROP TABLE IF EXISTS dp_events_store;
        SQL
      end
new(subject, field) click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 33
def initialize(subject, field)
  @subject = subject
  @field = field
  @store = subject.send(@field).to_h
  reset if @store.empty?
  @subject_key = "#{subject.class}:#{subject.id}"
  raise InvalidPipelineStorage, store unless valid_store?
end

Public Instance Methods

commit!(event) click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 61
def commit!(event)
  store["status"] = event.destination  if event.success?
  store["state"].merge!(event.changes) unless event.changes.to_h.empty?
  data, error = {}, {}
  data = event.data.to_h if event.data.respond_to?(:to_h)
  error = event.error.to_h if event.error.respond_to?(:to_h)
  with_postgres do |c|
    c.exec(
      SAVE_EVENT,
      [event.id, subject_key, JSON.dump(data), JSON.dump(error)]
    )
  end
  save!
end
find_event(event_id) click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 80
def find_event(event_id)
  with_postgres do |c|
    found_event, found_error =
      PG.multi(c.exec(FIND_EVENT, [event_id, subject_key]))
    return unless found_event
    Event.new(
      data: JSON.parse(found_event), error: JSON.parse(found_error)
    )
  end
end
reset!() click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 46
def reset!
  reset
  save!
end
status() click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 51
def status
  store["status"]
end
with_postgres(&block) click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 42
def with_postgres(&block)
  DirtyPipeline.with_postgres(&block)
end

Private Instance Methods

reset() click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 103
def reset
  @store = subject.send(
    "#{field}=",
    {
      "status" => nil,
      "state" => {},
    }
  )
end
save!() click to toggle source

FIXME: save! - configurable method

# File lib/dirty_pipeline/pg/storage.rb, line 98
def save!
  subject.send("#{field}=", store)
  subject.save!
end
valid_store?() click to toggle source
# File lib/dirty_pipeline/pg/storage.rb, line 93
def valid_store?
  (store.keys & %w(status state)).size.eql?(2)
end