class DirtyPipeline::Base
Constants
- DEFAULT_CLEANUP_DELAY
- DEFAULT_RETRY_DELAY
Attributes
cleanup_delay[RW]
pipeline_storage[RW]
retry_delay[RW]
transitions_map[R]
queue[R]
railway[R]
status[R]
storage[R]
subject[R]
uuid[R]
Public Class Methods
find_subject(*args)
click to toggle source
# File lib/dirty_pipeline/base.rb, line 9 def find_subject(*args) fail NotImplemented end
inherited(child)
click to toggle source
# File lib/dirty_pipeline/base.rb, line 14 def inherited(child) child.instance_variable_set( :@transitions_map, transitions_map || Hash.new ) end
new(subject, uuid: nil)
click to toggle source
# File lib/dirty_pipeline/base.rb, line 37 def initialize(subject, uuid: nil) @uuid = uuid || SecureRandom.uuid @subject = subject @storage = Storage.new(subject, self.class.pipeline_storage) @railway = Railway.new(subject, @uuid) @status = Status.success(subject) end
transition(name, from:, to:, action: nil, attempts: 1)
click to toggle source
# File lib/dirty_pipeline/base.rb, line 24 def transition(name, from:, to:, action: nil, attempts: 1) action ||= method(name) if respond_to?(name) action ||= const_get(name.to_s.camelcase(:upper)) @transitions_map[name.to_s] = { action: action, from: Array(from).map(&:to_s), to: to.to_s, attempts: attempts, } end
Public Instance Methods
call()
click to toggle source
# File lib/dirty_pipeline/base.rb, line 74 def call # HANDLE ANOTHER ACTION IN PROGRESS EXPLICITLY return self if (enqueued_event = railway.next).nil? execute(load_event(enqueued_event)) end
Also aliased as: call_next
chain(*args, **kwargs)
click to toggle source
FIXME operation :call - argument
# File lib/dirty_pipeline/base.rb, line 68 def chain(*args, **kwargs) operation = kwargs.fetch(:operation) { :call } railway[operation] << Event.create(*args, **kwargs.merge(tx_id: @uuid)) self end
clean()
click to toggle source
# File lib/dirty_pipeline/base.rb, line 81 def clean finished = railway.queue.to_a.empty? finished &&= railway.queue.processing_event.nil? return self if finished railway.switch_to(:undo) call end
cleanup_delay()
click to toggle source
# File lib/dirty_pipeline/base.rb, line 109 def cleanup_delay; self.class.cleanup_delay || DEFAULT_CLEANUP_DELAY; end
clear!()
click to toggle source
# File lib/dirty_pipeline/base.rb, line 62 def clear! storage.reset! reset! end
find_transition!(event)
click to toggle source
# File lib/dirty_pipeline/base.rb, line 45 def find_transition!(event) tname = event.transition event.source = storage.status self.class.transitions_map.fetch(tname.to_s).tap do |from:, **kwargs| next unless railway.operation.eql?("call") next if from == Array(event.source) next if from.include?(event.source.to_s) raise InvalidTransition, "from `#{event.source}` by `#{tname}`" end.tap do |to:, **| event.destination = to if railway.operation.eql?("call") end end
reset!()
click to toggle source
# File lib/dirty_pipeline/base.rb, line 58 def reset! railway.clear! end
retry()
click to toggle source
# File lib/dirty_pipeline/base.rb, line 89 def retry return self if (enqueued_event = railway.queue.processing_event).nil? execute(load_event(enqueued_event), attempt_retry: true) end
retry_delay()
click to toggle source
# File lib/dirty_pipeline/base.rb, line 112 def retry_delay; self.class.retry_delay || DEFAULT_RETRY_DELAY; end
schedule(operation = "call", delay = nil)
click to toggle source
# File lib/dirty_pipeline/base.rb, line 94 def schedule(operation = "call", delay = nil) job_args = { "transaction_id" => @uuid, "enqueued_pipeline" => self.class.to_s, "find_subject_args" => find_subject_args, "operation" => operation, } if delay.nil? ::DirtyPipeline::Worker.perform_async(job_args) else ::DirtyPipeline::Worker.perform_in(delay, job_args) end end
schedule_cleanup()
click to toggle source
# File lib/dirty_pipeline/base.rb, line 110 def schedule_cleanup; schedule("cleanup", cleanup_delay); end
schedule_retry()
click to toggle source
# File lib/dirty_pipeline/base.rb, line 113 def schedule_retry; schedule("retry", retry_delay); end
when_failure(tag = status.tag) { |data, self| ... }
click to toggle source
# File lib/dirty_pipeline/base.rb, line 125 def when_failure(tag = status.tag) yield(status.data, self) if status.failure? && status.tag == tag self end
when_skipped() { |nil, self| ... }
click to toggle source
# File lib/dirty_pipeline/base.rb, line 115 def when_skipped yield(nil, self) if railway.other_transaction_in_progress? self end
when_success() { |data, self| ... }
click to toggle source
# File lib/dirty_pipeline/base.rb, line 120 def when_success yield(status.data, self) if status.success? self end
Private Instance Methods
Failure(event, cause, type:)
click to toggle source
# File lib/dirty_pipeline/base.rb, line 181 def Failure(event, cause, type:) event.failure! @status = Status.failure(cause, tag: type) end
Success(event)
click to toggle source
# File lib/dirty_pipeline/base.rb, line 186 def Success(event) @status = Status.success(subject) end
execute(event, attempt_retry: false)
click to toggle source
# File lib/dirty_pipeline/base.rb, line 132 def execute(event, attempt_retry: false) attempt_retry ? event.attempt_retry! : event.start! Transaction.new(self, event).call do |action, *args| state_changes = process_action(action, event, *args) event.assign_changes(state_changes) event.complete if event.start? next if status.failure? Success(event) end call_next self end
find_subject_args()
click to toggle source
# File lib/dirty_pipeline/base.rb, line 177 def find_subject_args subject.id end
interupt_on_error(event) { || ... }
click to toggle source
# File lib/dirty_pipeline/base.rb, line 171 def interupt_on_error(event) return unless (fail_cause = catch(:fail_transition) { yield; nil }) Failure(event, fail_cause, type: :error) throw :abort_transaction, true end
load_event(enqueued_event)
click to toggle source
# File lib/dirty_pipeline/base.rb, line 149 def load_event(enqueued_event) storage.find_event(enqueued_event.id) || enqueued_event end
process_action(action, event, *args)
click to toggle source
# File lib/dirty_pipeline/base.rb, line 153 def process_action(action, event, *args) catch(:success) do return if interupt_on_error(event) do throw :success, run_operation(action, event, *args) end nil end rescue => exception Failure(event, exception, type: :exception) raise end
run_operation(action, event, *args)
click to toggle source
# File lib/dirty_pipeline/base.rb, line 165 def run_operation(action, event, *args) raise ArgumentError unless action return unless action.respond_to?(operation = railway.active) action.public_send(operation, event, self, *args) end