class DirtyPipeline::Base

Constants

DEFAULT_CLEANUP_DELAY
DEFAULT_RETRY_DELAY

Attributes

cleanup_delay[RW]
pipeline_storage[RW]
retry_delay[RW]
transitions_map[R]
queue[R]
railway[R]
status[R]
storage[R]
subject[R]
uuid[R]

Public Class Methods

find_subject(*args) click to toggle source
# File lib/dirty_pipeline/base.rb, line 9
def find_subject(*args)
  fail NotImplemented
end
inherited(child) click to toggle source
# File lib/dirty_pipeline/base.rb, line 14
def inherited(child)
  child.instance_variable_set(
    :@transitions_map,
    transitions_map || Hash.new
  )
end
new(subject, uuid: nil) click to toggle source
# File lib/dirty_pipeline/base.rb, line 37
def initialize(subject, uuid: nil)
  @uuid = uuid || SecureRandom.uuid
  @subject = subject
  @storage = Storage.new(subject, self.class.pipeline_storage)
  @railway = Railway.new(subject, @uuid)
  @status = Status.success(subject)
end
transition(name, from:, to:, action: nil, attempts: 1) click to toggle source
# File lib/dirty_pipeline/base.rb, line 24
def transition(name, from:, to:, action: nil, attempts: 1)
  action ||= method(name) if respond_to?(name)
  action ||= const_get(name.to_s.camelcase(:upper))
  @transitions_map[name.to_s] = {
    action: action,
    from: Array(from).map(&:to_s),
    to: to.to_s,
    attempts: attempts,
  }
end

Public Instance Methods

call() click to toggle source
# File lib/dirty_pipeline/base.rb, line 74
def call
  # HANDLE ANOTHER ACTION IN PROGRESS EXPLICITLY
  return self if (enqueued_event = railway.next).nil?
  execute(load_event(enqueued_event))
end
Also aliased as: call_next
call_next()
Alias for: call
chain(*args, **kwargs) click to toggle source

FIXME operation :call - argument

# File lib/dirty_pipeline/base.rb, line 68
def chain(*args, **kwargs)
  operation = kwargs.fetch(:operation) { :call }
  railway[operation] << Event.create(*args, **kwargs.merge(tx_id: @uuid))
  self
end
clean() click to toggle source
# File lib/dirty_pipeline/base.rb, line 81
def clean
  finished = railway.queue.to_a.empty?
  finished &&= railway.queue.processing_event.nil?
  return self if finished
  railway.switch_to(:undo)
  call
end
cleanup_delay() click to toggle source
# File lib/dirty_pipeline/base.rb, line 109
def cleanup_delay; self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY; end
clear!() click to toggle source
# File lib/dirty_pipeline/base.rb, line 62
def clear!
  storage.reset!
  reset!
end
find_transition!(event) click to toggle source
# File lib/dirty_pipeline/base.rb, line 45
def find_transition!(event)
  tname = event.transition
  event.source = storage.status
  self.class.transitions_map.fetch(tname.to_s).tap do |from:, **kwargs|
    next unless railway.operation.eql?("call")
    next if from == Array(event.source)
    next if from.include?(event.source.to_s)
    raise InvalidTransition, "from `#{event.source}` by `#{tname}`"
  end.tap do |to:, **|
    event.destination = to if railway.operation.eql?("call")
  end
end
reset!() click to toggle source
# File lib/dirty_pipeline/base.rb, line 58
def reset!
  railway.clear!
end
retry() click to toggle source
# File lib/dirty_pipeline/base.rb, line 89
def retry
  return self if (enqueued_event = railway.queue.processing_event).nil?
  execute(load_event(enqueued_event), attempt_retry: true)
end
retry_delay() click to toggle source
# File lib/dirty_pipeline/base.rb, line 112
def retry_delay; self.class.retry_delay || DEFAULT_RETRY_DELAY; end
schedule(operation = "call", delay = nil) click to toggle source
# File lib/dirty_pipeline/base.rb, line 94
def schedule(operation = "call", delay = nil)
  job_args = {
    "transaction_id" => @uuid,
    "enqueued_pipeline" => self.class.to_s,
    "find_subject_args" => find_subject_args,
    "operation" => operation,
  }

  if delay.nil?
    ::DirtyPipeline::Worker.perform_async(job_args)
  else
    ::DirtyPipeline::Worker.perform_in(delay, job_args)
  end
end
schedule_cleanup() click to toggle source
# File lib/dirty_pipeline/base.rb, line 110
def schedule_cleanup; schedule("cleanup", cleanup_delay); end
schedule_retry() click to toggle source
# File lib/dirty_pipeline/base.rb, line 113
def schedule_retry; schedule("retry",   retry_delay); end
when_failure(tag = status.tag) { |data, self| ... } click to toggle source
# File lib/dirty_pipeline/base.rb, line 125
def when_failure(tag = status.tag)
  yield(status.data, self) if status.failure? && status.tag == tag
  self
end
when_skipped() { |nil, self| ... } click to toggle source
# File lib/dirty_pipeline/base.rb, line 115
def when_skipped
  yield(nil, self) if railway.other_transaction_in_progress?
  self
end
when_success() { |data, self| ... } click to toggle source
# File lib/dirty_pipeline/base.rb, line 120
def when_success
  yield(status.data, self) if status.success?
  self
end

Private Instance Methods

Failure(event, cause, type:) click to toggle source
# File lib/dirty_pipeline/base.rb, line 181
def Failure(event, cause, type:)
  event.failure!
  @status = Status.failure(cause, tag: type)
end
Success(event) click to toggle source
# File lib/dirty_pipeline/base.rb, line 186
def Success(event)
  @status = Status.success(subject)
end
execute(event, attempt_retry: false) click to toggle source
# File lib/dirty_pipeline/base.rb, line 132
def execute(event, attempt_retry: false)
  attempt_retry ? event.attempt_retry! : event.start!

  Transaction.new(self, event).call do |action, *args|
    state_changes = process_action(action, event, *args)

    event.assign_changes(state_changes)
    event.complete if event.start?

    next if status.failure?
    Success(event)
  end
  call_next

  self
end
find_subject_args() click to toggle source
# File lib/dirty_pipeline/base.rb, line 177
def find_subject_args
  subject.id
end
interupt_on_error(event) { || ... } click to toggle source
# File lib/dirty_pipeline/base.rb, line 171
def interupt_on_error(event)
  return unless (fail_cause = catch(:fail_transition) { yield; nil })
  Failure(event, fail_cause, type: :error)
  throw :abort_transaction, true
end
load_event(enqueued_event) click to toggle source
# File lib/dirty_pipeline/base.rb, line 149
def load_event(enqueued_event)
  storage.find_event(enqueued_event.id) || enqueued_event
end
process_action(action, event, *args) click to toggle source
# File lib/dirty_pipeline/base.rb, line 153
def process_action(action, event, *args)
  catch(:success) do
    return if interupt_on_error(event) do
      throw :success, run_operation(action, event, *args)
    end
    nil
  end
rescue => exception
  Failure(event, exception, type: :exception)
  raise
end
run_operation(action, event, *args) click to toggle source
# File lib/dirty_pipeline/base.rb, line 165
def run_operation(action, event, *args)
  raise ArgumentError unless action
  return unless action.respond_to?(operation = railway.active)
  action.public_send(operation, event, self, *args)
end