class DirtyPipeline::Redis::Railway

Constants

DEFAULT_OPERATIONS

Public Class Methods

new(subject, transaction_id) click to toggle source
# File lib/dirty_pipeline/redis/railway.rb, line 6
def initialize(subject, transaction_id)
  @tx_id = transaction_id
  @subject_class = subject.class.to_s
  @subject_id = subject.id.to_s
  @root = "dirty-pipeline-rail:#{subject.class}:#{subject.id}:"
  @queues = Hash[
    DEFAULT_OPERATIONS.map do |operation|
      [operation, create_queue(operation)]
    end
  ]
end

Public Instance Methods

[](operation_name = active)
Alias for: queue
active() click to toggle source
# File lib/dirty_pipeline/redis/railway.rb, line 48
def active
  DirtyPipeline.with_redis { |r| r.get(active_operation_key) }
end
Also aliased as: operation
clear!() click to toggle source
# File lib/dirty_pipeline/redis/railway.rb, line 18
def clear!
  @queues.values.each(&:clear!)
  DirtyPipeline.with_redis do |r|
    r.multi do |mr|
      mr.del(active_operation_key)
      mr.del(active_transaction_key)
    end
  end
end
next() click to toggle source
# File lib/dirty_pipeline/redis/railway.rb, line 28
def next
  return if other_transaction_in_progress?
  start_transaction! unless running_transaction

  queue.pop.tap { |event| finish_transaction! if event.nil? }
end
operation()
Alias for: active
other_transaction_in_progress?() click to toggle source
# File lib/dirty_pipeline/redis/railway.rb, line 57
def other_transaction_in_progress?
  return false if running_transaction.nil?
  running_transaction != @tx_id
end
queue(operation_name = active) click to toggle source
# File lib/dirty_pipeline/redis/railway.rb, line 35
def queue(operation_name = active)
  @queues.fetch(operation_name.to_s) do
    @queues.store(operation_name, create_queue(operation_name))
  end
end
Also aliased as: []
running_transaction() click to toggle source
# File lib/dirty_pipeline/redis/railway.rb, line 53
def running_transaction
  DirtyPipeline.with_redis { |r| r.get(active_transaction_key) }
end
switch_to(name) click to toggle source
# File lib/dirty_pipeline/redis/railway.rb, line 42
def switch_to(name)
  raise ArgumentError unless DEFAULT_OPERATIONS.include?(name.to_s)
  return if name.to_s == active
  DirtyPipeline.with_redis { |r| r.set(active_operation_key, name) }
end

Private Instance Methods

active_operation_key() click to toggle source
# File lib/dirty_pipeline/redis/railway.rb, line 72
def active_operation_key
  "#{@root}:active_operation"
end
active_transaction_key() click to toggle source
# File lib/dirty_pipeline/redis/railway.rb, line 68
def active_transaction_key
  "#{@root}:active_transaction"
end
create_queue(operation_name) click to toggle source
# File lib/dirty_pipeline/redis/railway.rb, line 64
def create_queue(operation_name)
  Queue.new(operation_name, @subject_class, @subject_id, @tx_id)
end
finish_transaction!() click to toggle source
# File lib/dirty_pipeline/redis/railway.rb, line 81
def finish_transaction!
  clear! if running_transaction == @tx_id
end
start_transaction!() click to toggle source
# File lib/dirty_pipeline/redis/railway.rb, line 76
def start_transaction!
  switch_to(DEFAULT_OPERATIONS.first) unless active
  DirtyPipeline.with_redis { |r| r.set(active_transaction_key, @tx_id) }
end