class DjSplit::Split

Constants

DEFAULT_SPLIT_INDEX
OPTIMAL_SPLIT_SIZE
SLEEP_TIME
STALE_JOBS_TIMEOUT

Public Class Methods

new(options) click to toggle source
# File lib/dj_split/split.rb, line 9
def initialize(options)
  @queue_options  = options[:queue_options] || {}
  @split_group_id = get_random_split_group_id
  @queue_options.merge!(split_group_id: @split_group_id)
  @split_options = options[:split_options] || {}
end

Public Instance Methods

enqueue(object, method_name, *args) click to toggle source
# File lib/dj_split/split.rb, line 16
def enqueue(object, method_name, *args)
  splitting_index = get_splitting_index
  sliced_ids_array = get_sliced_ids(args[splitting_index])

  sliced_ids_array.each do |slice_set|
    args[splitting_index] = slice_set
    delayed_job_object = Delayed::PerformableMethod.new(object, method_name.to_sym, args)
    Delayed::Job.enqueue(delayed_job_object, @queue_options) 
  end

  wait_check_and_execute_delayed_jobs
end

Private Instance Methods

check_for_timeout?(count) click to toggle source

Check whether the timeout is reached?

# File lib/dj_split/split.rb, line 100
def check_for_timeout?(count)
  (SLEEP_TIME * count) > get_stale_job_timeout_value
end
get_processing_jobs_by_other_workers() click to toggle source
# File lib/dj_split/split.rb, line 65
def get_processing_jobs_by_other_workers
  Delayed::Job.where(split_group_id: @split_group_id, failed_at: nil).where.not(locked_by: nil)
end
get_random_split_group_id() click to toggle source

Collision is still OK. Probabilty of collision is negligible.

# File lib/dj_split/split.rb, line 95
def get_random_split_group_id
  Time.now.to_i.to_s[5..-1] + rand(1000000000).to_s
end
get_sliced_ids(ids) click to toggle source
# File lib/dj_split/split.rb, line 86
def get_sliced_ids(ids)
  ids.each_slice(get_split_size)
end
get_split_size() click to toggle source
# File lib/dj_split/split.rb, line 78
def get_split_size
  @split_options[:size] || OPTIMAL_SPLIT_SIZE
end
get_splitting_index() click to toggle source
# File lib/dj_split/split.rb, line 90
def get_splitting_index
  (@split_options[:by] || DEFAULT_SPLIT_INDEX) - 2
end
get_stale_job_timeout_value() click to toggle source
# File lib/dj_split/split.rb, line 82
def get_stale_job_timeout_value
  @split_options[:timeout] || STALE_JOBS_TIMEOUT
end
handle_failed_jobs() click to toggle source
# File lib/dj_split/split.rb, line 69
def handle_failed_jobs
  failed_jobs = Delayed::Job.where(split_group_id: @split_group_id).where.not(failed_at: nil)
  raise "Failed Delayed Jobs of Group Id(#{@split_group_id}): #{failed_jobs.pluck(:id)}" if failed_jobs.exists?
end
handle_stale_jobs() click to toggle source

Raise an error in scenario such as: Job is locked by a worker and worker got killed

# File lib/dj_split/split.rb, line 60
def handle_stale_jobs
  stale_jobs = get_processing_jobs_by_other_workers
  raise "Stale Delayed Jobs of Group Id(#{@split_group_id}): #{stale_jobs.pluck(:id)}"
end
jobs_processed_by_other_workers_currently?() click to toggle source
# File lib/dj_split/split.rb, line 74
def jobs_processed_by_other_workers_currently?
  Delayed::Job.where(split_group_id: @split_group_id, failed_at: nil).exists?
end
pending_jobs_of_group_id?() click to toggle source
# File lib/dj_split/split.rb, line 46
def pending_jobs_of_group_id?
  Delayed::Job.where(split_group_id: @split_group_id, locked_at: nil).exists?
end
pick_and_invoke_delayed_job() click to toggle source
# File lib/dj_split/split.rb, line 40
def pick_and_invoke_delayed_job
  worker_object = Delayed::Worker.new
  worker_object.split_group_id = @split_group_id
  worker_object.work_off(1)
end
wait_check_and_execute_delayed_jobs() click to toggle source
# File lib/dj_split/split.rb, line 31
def wait_check_and_execute_delayed_jobs
  while(pending_jobs_of_group_id?)
    pick_and_invoke_delayed_job
  end

  waiting_for_other_workers_to_process_jobs
  handle_failed_jobs
end
waiting_for_other_workers_to_process_jobs() click to toggle source
# File lib/dj_split/split.rb, line 50
def waiting_for_other_workers_to_process_jobs
  count = 0
  while(jobs_processed_by_other_workers_currently?)
    count += 1
    sleep(SLEEP_TIME)
    handle_stale_jobs if check_for_timeout?(count)
  end
end