module Sqeduler::Worker::Synchronization

Module that provides common synchronization infrastructure of workers across multiple hosts `Sqeduler::BaseWorker.synchronize_jobs`.

Constants

SCHEDULE_COLLISION_MARKER

callback for when the job expiration is too short, less < time it took perform the actual work

Public Class Methods

prepended(base) click to toggle source
# File lib/sqeduler/worker/synchronization.rb, line 10
def self.prepended(base)
  if base.ancestors.include?(Sqeduler::Worker::Callbacks)
    raise "Sqeduler::Worker::Callbacks must be the last module that you prepend."
  end

  base.extend(ClassMethods)
  base.class_attribute :synchronize_jobs_mode
  base.class_attribute :synchronize_jobs_expiration
  base.class_attribute :synchronize_jobs_timeout
end

Public Instance Methods

perform(*args) click to toggle source

rubocop:enable Style/Documentation

Calls superclass method
# File lib/sqeduler/worker/synchronization.rb, line 41
def perform(*args)
  if self.class.synchronize_jobs_mode == :one_at_a_time
    perform_locked(self.class.sync_lock_key(*args)) do
      perform_timed do
        super
      end
    end
  else
    super
  end
end

Private Instance Methods

on_lock_timeout(key) click to toggle source

callback for when a lock cannot be obtained

Calls superclass method
# File lib/sqeduler/worker/synchronization.rb, line 56
def on_lock_timeout(key)
  Service.logger.warn(
    "#{self.class.name} unable to acquire lock '#{key}'. Aborting."
  )
  super if defined?(super)
end
on_schedule_collision(duration) click to toggle source
Calls superclass method
# File lib/sqeduler/worker/synchronization.rb, line 66
def on_schedule_collision(duration)
  Service.logger.warn(
    format(
      SCHEDULE_COLLISION_MARKER,
      self.class.name,
      time_duration(duration),
      self.class.synchronize_jobs_expiration
    )
  )
  super if defined?(super)
end
perform_locked(sync_lock_key, &work) click to toggle source
# File lib/sqeduler/worker/synchronization.rb, line 83
def perform_locked(sync_lock_key, &work)
  RedisLock.with_lock(
    sync_lock_key,
    :expiration => self.class.synchronize_jobs_expiration,
    :timeout => self.class.synchronize_jobs_timeout,
    &work
  )
rescue RedisLock::LockTimeoutError
  on_lock_timeout(sync_lock_key)
end
perform_timed(&block) click to toggle source
# File lib/sqeduler/worker/synchronization.rb, line 78
def perform_timed(&block)
  duration = Benchmark.realtime(&block)
  on_schedule_collision(duration) if duration > self.class.synchronize_jobs_expiration
end
time_duration(timespan) click to toggle source

rubocop:disable Metrics/AbcSize

# File lib/sqeduler/worker/synchronization.rb, line 95
def time_duration(timespan)
  rest, secs = timespan.divmod(60) # self is the time difference t2 - t1
  rest, mins = rest.divmod(60)
  days, hours = rest.divmod(24)

  result = []
  result << "#{days} Days" if days > 0
  result << "#{hours} Hours" if hours > 0
  result << "#{mins} Minutes" if mins > 0
  result << "#{secs.round(2)} Seconds" if secs > 0
  result.join(" ")
end