class Sidekiq::Group::Collection
Constants
- CID_EXPIRE_TTL
- LOCK_TTL
Attributes
callback_class[R]
callback_options[R]
cid[R]
Public Class Methods
new(cid = nil)
click to toggle source
# File lib/sidekiq/group/collection.rb, line 12 def initialize(cid = nil) @cid = cid || SecureRandom.urlsafe_base64(16) end
Public Instance Methods
add(jid)
click to toggle source
# File lib/sidekiq/group/collection.rb, line 26 def add(jid) Sidekiq::Logging.logger.info "Scheduling child job #{jid} for parent #{@cid}" if Sidekiq::Group.debug Sidekiq.redis do |r| r.multi do r.sadd("#{@cid}-jids", jid) r.expire("#{@cid}-jids", CID_EXPIRE_TTL) end end end
callback_class=(value)
click to toggle source
# File lib/sidekiq/group/collection.rb, line 16 def callback_class=(value) @callback_class = value persist('callback_class', value) end
callback_options=(value)
click to toggle source
# File lib/sidekiq/group/collection.rb, line 21 def callback_options=(value) @callback_options = value persist('callback_options', value.to_json) end
spawned_jobs!()
click to toggle source
# File lib/sidekiq/group/collection.rb, line 37 def spawned_jobs! persist('spawned_jobs', cid) end
success(jid)
click to toggle source
# File lib/sidekiq/group/collection.rb, line 41 def success(jid) remove_processed(jid) return unless processed_all_jobs? return if locked? callback_class, callback_options = callback_data options = JSON(callback_options) Sidekiq::Logging.logger.info "Scheduling callback job #{callback_class} with #{options}" if Sidekiq::Group.debug Sidekiq::Group::Worker.perform_async(callback_class, options) cleanup_redis end
Private Instance Methods
callback_data()
click to toggle source
# File lib/sidekiq/group/collection.rb, line 82 def callback_data Sidekiq.redis do |r| r.multi do r.hget(@cid, 'callback_class') r.hget(@cid, 'callback_options') end end end
cleanup_redis()
click to toggle source
# File lib/sidekiq/group/collection.rb, line 100 def cleanup_redis Sidekiq.redis { |r| r.del(@cid, "#{@cid}-jids") } end
locked?()
click to toggle source
# File lib/sidekiq/group/collection.rb, line 104 def locked? Sidekiq.redis do |r| r.multi do r.getset("#{@cid}-finished", 1) r.expire("#{@cid}-finished", LOCK_TTL) end.first end end
pending()
click to toggle source
# File lib/sidekiq/group/collection.rb, line 68 def pending @pending ||= Sidekiq.redis { |r| r.scard("#{@cid}-jids") } end
persist(attribute, value)
click to toggle source
# File lib/sidekiq/group/collection.rb, line 91 def persist(attribute, value) Sidekiq.redis do |r| r.multi do r.hset(@cid, attribute, value) r.expire(@cid, CID_EXPIRE_TTL) end end end
processed_all_jobs?()
click to toggle source
# File lib/sidekiq/group/collection.rb, line 72 def processed_all_jobs? Sidekiq::Logging.logger.info "Pending jobs: #{pending}" if Sidekiq::Group.debug spawned_all_jobs? && pending.zero? end
remove_processed(jid)
click to toggle source
# File lib/sidekiq/group/collection.rb, line 58 def remove_processed(jid) Sidekiq::Logging.logger.info "Child job #{jid} completed" if Sidekiq::Group.debug return if Sidekiq.redis { |r| r.srem("#{@cid}-jids", jid) } Sidekiq::Logging.logger.info "Could not remove child job #{jid} from Redis" if Sidekiq::Group.debug sleep 1 Sidekiq.redis { |r| r.srem("#{@cid}-jids", jid) } end
spawned_all_jobs?()
click to toggle source
# File lib/sidekiq/group/collection.rb, line 78 def spawned_all_jobs? Sidekiq.redis { |r| r.hget(@cid, 'spawned_jobs') }.present? end