class Sidekiq::Paquet::Bundle
Public Class Methods
append(item)
click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 5 def self.append(item) worker_name = item['class'.freeze] args = item.fetch('args'.freeze, []) Sidekiq.redis do |conn| conn.multi do conn.zadd('bundles'.freeze, 0, worker_name) conn.rpush("bundle:#{worker_name}", Sidekiq.dump_json(args)) end end end
enqueue_jobs()
click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 17 def self.enqueue_jobs Sidekiq.redis do |conn| workers = conn.zrange('bundles'.freeze, 0, -1) workers.each do |worker| klass = Object.const_get(worker) opts = klass.get_sidekiq_options min_interval = opts['minimum_execution_interval'.freeze] if min_interval next unless conn.set("bundle:#{worker}:next", 'queue'.freeze, nx: true, ex: min_interval) end items = conn.lrange("bundle:#{worker}", 0, -1) items.map! { |i| Sidekiq.load_json(i) } bundle_size = opts['bundle_size'.freeze] || Paquet.options[:default_bundle_size] items.each_slice(bundle_size) do |vs| Sidekiq::Client.push( 'class' => worker, 'queue' => opts['queue'.freeze], 'args' => vs ) end conn.ltrim("bundle:#{worker}", items.size, -1) end end end
new(name)
click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 47 def initialize(name) @lname = "bundle:#{name}" end
Public Instance Methods
clear()
click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 67 def clear Sidekiq.redis { |c| c.del(@lname) } end
items()
click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 63 def items Sidekiq.redis { |c| c.lrange(@lname, 0, -1) } end
queue()
click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 51 def queue worker_name.constantize.get_sidekiq_options['queue'.freeze] end
size()
click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 59 def size Sidekiq.redis { |c| c.llen(@lname) } end
worker_name()
click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 55 def worker_name @lname.split(':').last end