class DirtyPipeline::PG::Railway
Constants
- DEFAULT_OPERATIONS
- DELETE_OPERATION
- DELETE_TRANSACTION
- SELECT_OPERATION
- SELECT_TRANSACTION
- SWITCH_OPERATION
- SWITCH_TRANSACTION
Public Class Methods
create!(connection)
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 6 def self.create!(connection) connection.exec <<~SQL CREATE TABLE dp_active_operations ( key TEXT CONSTRAINT primary_dp_active_operations_key PRIMARY KEY, name TEXT, created_at TIMESTAMP NOT NULL DEFAULT now() ); CREATE TABLE dp_active_transactions ( key TEXT CONSTRAINT primary_dp_active_tx_key PRIMARY KEY, name TEXT, created_at TIMESTAMP NOT NULL DEFAULT now() ); SQL end
destroy!(connection)
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 21 def self.destroy!(connection) connection.exec <<~SQL DROP TABLE IF EXISTS dp_active_operations; DROP TABLE IF EXISTS dp_active_transactions; SQL end
new(subject, transaction_id)
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 28 def initialize(subject, transaction_id) @tx_id = transaction_id @subject_class = subject.class.to_s @subject_id = subject.id.to_s @root = "dirty-pipeline-rail:#{subject.class}:#{subject.id}:" @queues = Hash[ DEFAULT_OPERATIONS.map do |operation| [operation, create_queue(operation)] end ] end
Public Instance Methods
active()
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 93 def active with_postgres do |c| PG.single c.exec(SELECT_OPERATION, [active_operation_key]) end end
Also aliased as: operation
clear!()
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 50 def clear! @queues.values.each(&:clear!) with_postgres do |c| c.transaction do |tc| tc.exec DELETE_OPERATION, [active_operation_key] tc.exec DELETE_TRANSACTION, [active_transaction_key] end end end
next()
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 60 def next return if other_transaction_in_progress? start_transaction! unless running_transaction queue.pop.tap { |event| finish_transaction! if event.nil? } end
other_transaction_in_progress?()
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 109 def other_transaction_in_progress? return false if running_transaction.nil? running_transaction != @tx_id end
queue(operation_name = active)
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 67 def queue(operation_name = active) @queues.fetch(operation_name.to_s) do @queues.store(operation_name, create_queue(operation_name)) end end
Also aliased as: []
running_transaction()
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 103 def running_transaction with_postgres do |c| PG.single c.exec(SELECT_TRANSACTION, [active_transaction_key]) end end
switch_to(name)
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 79 def switch_to(name) raise ArgumentError unless DEFAULT_OPERATIONS.include?(name.to_s) return if name.to_s == active with_postgres do |c| # c.exec('START TRANSACTION;') c.exec(SWITCH_OPERATION, [active_operation_key, name]) # c.exec('COMMIT;') end end
with_postgres(&block)
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 40 def with_postgres(&block) DirtyPipeline.with_postgres(&block) end
Private Instance Methods
active_operation_key()
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 124 def active_operation_key "#{@root}:active_operation" end
active_transaction_key()
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 120 def active_transaction_key "#{@root}:active_transaction" end
create_queue(operation_name)
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 116 def create_queue(operation_name) Queue.new(operation_name, @subject_class, @subject_id, @tx_id) end
finish_transaction!()
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 142 def finish_transaction! clear! if running_transaction == @tx_id end
start_transaction!()
click to toggle source
# File lib/dirty_pipeline/pg/railway.rb, line 133 def start_transaction! switch_to(DEFAULT_OPERATIONS.first) unless active with_postgres do |c| # c.exec('START TRANSACTION;') c.exec(SWITCH_TRANSACTION, [active_transaction_key, @tx_id]) # c.exec('COMMIT;') end end