class DirtyPipeline::PG::Railway

Constants

DEFAULT_OPERATIONS
DELETE_OPERATION
DELETE_TRANSACTION
SELECT_OPERATION
SELECT_TRANSACTION
SWITCH_OPERATION
SWITCH_TRANSACTION

Public Class Methods

create!(connection) click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 6
      def self.create!(connection)
        connection.exec <<~SQL
          CREATE TABLE dp_active_operations (
            key TEXT CONSTRAINT primary_dp_active_operations_key PRIMARY KEY,
            name TEXT,
            created_at TIMESTAMP NOT NULL DEFAULT now()
          );
          CREATE TABLE dp_active_transactions (
            key TEXT CONSTRAINT primary_dp_active_tx_key PRIMARY KEY,
            name TEXT,
            created_at TIMESTAMP NOT NULL DEFAULT now()
          );
        SQL
      end
destroy!(connection) click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 21
      def self.destroy!(connection)
        connection.exec <<~SQL
          DROP TABLE IF EXISTS dp_active_operations;
          DROP TABLE IF EXISTS dp_active_transactions;
        SQL
      end
new(subject, transaction_id) click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 28
def initialize(subject, transaction_id)
  @tx_id = transaction_id
  @subject_class = subject.class.to_s
  @subject_id = subject.id.to_s
  @root = "dirty-pipeline-rail:#{subject.class}:#{subject.id}:"
  @queues = Hash[
    DEFAULT_OPERATIONS.map do |operation|
      [operation, create_queue(operation)]
    end
  ]
end

Public Instance Methods

[](operation_name = active)
Alias for: queue
active() click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 93
def active
  with_postgres do |c|
    PG.single c.exec(SELECT_OPERATION, [active_operation_key])
  end
end
Also aliased as: operation
clear!() click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 50
def clear!
  @queues.values.each(&:clear!)
  with_postgres do |c|
    c.transaction do |tc|
      tc.exec DELETE_OPERATION, [active_operation_key]
      tc.exec DELETE_TRANSACTION, [active_transaction_key]
    end
  end
end
next() click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 60
def next
  return if other_transaction_in_progress?
  start_transaction! unless running_transaction

  queue.pop.tap { |event| finish_transaction! if event.nil? }
end
operation()
Alias for: active
other_transaction_in_progress?() click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 109
def other_transaction_in_progress?
  return false if running_transaction.nil?
  running_transaction != @tx_id
end
queue(operation_name = active) click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 67
def queue(operation_name = active)
  @queues.fetch(operation_name.to_s) do
    @queues.store(operation_name, create_queue(operation_name))
  end
end
Also aliased as: []
running_transaction() click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 103
def running_transaction
  with_postgres do |c|
    PG.single c.exec(SELECT_TRANSACTION, [active_transaction_key])
  end
end
switch_to(name) click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 79
def switch_to(name)
  raise ArgumentError unless DEFAULT_OPERATIONS.include?(name.to_s)
  return if name.to_s == active

  with_postgres do |c|
    # c.exec('START TRANSACTION;')
    c.exec(SWITCH_OPERATION, [active_operation_key, name])
    # c.exec('COMMIT;')
  end
end
with_postgres(&block) click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 40
def with_postgres(&block)
  DirtyPipeline.with_postgres(&block)
end

Private Instance Methods

active_operation_key() click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 124
def active_operation_key
  "#{@root}:active_operation"
end
active_transaction_key() click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 120
def active_transaction_key
  "#{@root}:active_transaction"
end
create_queue(operation_name) click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 116
def create_queue(operation_name)
  Queue.new(operation_name, @subject_class, @subject_id, @tx_id)
end
finish_transaction!() click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 142
def finish_transaction!
  clear! if running_transaction == @tx_id
end
start_transaction!() click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 133
def start_transaction!
  switch_to(DEFAULT_OPERATIONS.first) unless active
  with_postgres do |c|
    # c.exec('START TRANSACTION;')
    c.exec(SWITCH_TRANSACTION, [active_transaction_key, @tx_id])
    # c.exec('COMMIT;')
  end
end