class Rworkflow::SidekiqFlow

Constants

MAX_EXPECTED_DURATION
STATE_POLICY_GATED

Public Class Methods

build_flow_map() click to toggle source
# File lib/rworkflow/sidekiq_flow.rb, line 135
def build_flow_map
  flow_map = {}
  queues = SidekiqHelper.queue_sizes.keys
  queues.each do |queue_name|
    queue = Sidekiq::Queue.new(queue_name)
    queue.each do |job|
      klass = begin
        job.klass.constantize
      rescue NameError => _
        nil
      end

      if !klass.nil? && klass <= Rworkflow::Worker
        id = job.args.first
        state_name = job.args.second
        state_map = flow_map.fetch(id, {})
        state_map[state_name] = state_map.fetch(state_name, 0) + 1
        flow_map[id] = state_map
      end
    end
  end
  return flow_map
end
cleanup_broken_flows() click to toggle source
# File lib/rworkflow/sidekiq_flow.rb, line 104
def cleanup_broken_flows
  broken = []
  flows = self.all
  flows.each do |flow|
    if flow.valid?
      if flow.finished? && !flow.public?
        broken << [flow, 'finished']
      elsif !flow.started? && flow.created_at < 1.day.ago
        broken << [flow, 'never started']
      end
    else
      broken << [flow, 'invalid']
    end
  end

  broken.each do |flow_pair|
    flow_pair.first.cleanup
    puts "Cleaned up #{flow_pair.second} flow #{flow_pair.first.id}"
  end
  puts ">>> Cleaned up #{broken.size} broken flows <<<"
end
create(lifecycle, name = '', options) click to toggle source
Calls superclass method Rworkflow::Flow::create
# File lib/rworkflow/sidekiq_flow.rb, line 99
def create(lifecycle, name = '', options)
  workflow = super(lifecycle, name, options)
  return workflow
end
create_missing_jobs(flow, state_map) click to toggle source
# File lib/rworkflow/sidekiq_flow.rb, line 159
def create_missing_jobs(flow, state_map)
  counters = flow.counters
  counters.each do |state, num_objects|
    next if flow.class.terminal?(state) || state == :processing
    enqueued = state_map.fetch(state, 0) * flow.get_state_cardinality(state)
    missing = num_objects - enqueued
    if missing > 0
      flow.create_jobs(state, missing)
      puts "Created #{missing} missing jobs for state #{state} in flow #{flow.id}"
    end
  end
end
enqueue_missing_jobs() click to toggle source
# File lib/rworkflow/sidekiq_flow.rb, line 126
def enqueue_missing_jobs
  queued_flow_map = build_flow_map
  running_flows = self.all.select { |f| f.valid? && !f.finished? && !f.paused? }
  running_flows.each do |flow|
    state_map = queued_flow_map.fetch(flow.id, {})
    create_missing_jobs(flow, state_map)
  end
end
new(id) click to toggle source
Calls superclass method Rworkflow::Flow::new
# File lib/rworkflow/sidekiq_flow.rb, line 6
def initialize(id)
  super(id)
  @open_gates = RedisRds::Set.new("#{@redis_key}__open_gates")
end

Public Instance Methods

cleanup() click to toggle source
Calls superclass method Rworkflow::Flow#cleanup
# File lib/rworkflow/sidekiq_flow.rb, line 11
def cleanup
  super()
  @open_gates.delete
end
close_gate(state_name) click to toggle source
# File lib/rworkflow/sidekiq_flow.rb, line 94
def close_gate(state_name)
  @open_gates.remove(state_name)
end
continue() click to toggle source

for now assumes

# File lib/rworkflow/sidekiq_flow.rb, line 44
def continue
  return if self.finished? || !self.valid? || !self.paused?
  if @flow_data.decr(:paused) == 0
    workers = Hash[self.counters.select { |name, _| !self.class.terminal?(name) && name != :processing }]

    # enqueue jobs
    workers.each { |worker, num_objects| create_jobs(worker, num_objects) }
  end
rescue StandardError => e
  Rails.logger.error("Error continuing flow #{self.id}: #{e.message}")
end
create_jobs(state_name, num_objects) click to toggle source
# File lib/rworkflow/sidekiq_flow.rb, line 56
def create_jobs(state_name, num_objects)
  return if paused? || num_objects < 1 || self.class.terminal?(state_name) || gated?(state_name)
  state = @lifecycle.states[state_name]
  worker_class = if state.respond_to?(:worker_class)
    state.worker_class
  else
    begin
      state_name.constantize
    rescue NameError => _
      Rails.logger.error("Trying to push to a non existent worker class #{state_name} in workflow #{@id}")
      nil
    end
  end

  if !worker_class.nil?
    cardinality = get_state_cardinality(state_name)

    amount = if state.policy == State::STATE_POLICY_WAIT
      ((num_objects + get_state_list(state_name).size) / cardinality.to_f).floor
    else
      (num_objects / cardinality.to_f).ceil
    end

    amount.times { worker_class.enqueue_job(@id, state_name) }
  end
end
expected_duration() click to toggle source
# File lib/rworkflow/sidekiq_flow.rb, line 24
def expected_duration
  return MAX_EXPECTED_DURATION
end
gated?(state_name) click to toggle source
# File lib/rworkflow/sidekiq_flow.rb, line 83
def gated?(state_name)
  state = @lifecycle.states[state_name]
  return state.policy == STATE_POLICY_GATED && !@open_gates.include?(state_name)
end
open_gate(state_name) click to toggle source
# File lib/rworkflow/sidekiq_flow.rb, line 88
def open_gate(state_name)
  @open_gates.add(state_name)
  num_objects = count(state_name)
  create_jobs(state_name, num_objects)
end
pause() click to toggle source
# File lib/rworkflow/sidekiq_flow.rb, line 36
def pause
  return if self.finished?
  @flow_data.incr(:paused)
rescue StandardError => e
  Rails.logger.error("Error pausing flow #{self.id}: #{e.message}")
end
paused?() click to toggle source
# File lib/rworkflow/sidekiq_flow.rb, line 28
def paused?
  return @flow_data.get(:paused).to_i > 0
end
push(objects, name) click to toggle source
Calls superclass method Rworkflow::Flow#push
# File lib/rworkflow/sidekiq_flow.rb, line 16
def push(objects, name)
  pushed = 0
  pushed = super(objects, name)
ensure
  create_jobs(name, pushed) if pushed > 0
  return pushed
end
status() click to toggle source
Calls superclass method Rworkflow::Flow#status
# File lib/rworkflow/sidekiq_flow.rb, line 32
def status
  return paused? ? 'Paused' : super()
end