class Sidekiq::Paquet::Bundle

Public Class Methods

append(item) click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 5
def self.append(item)
  worker_name = item['class'.freeze]
  args = item.fetch('args'.freeze, [])

  Sidekiq.redis do |conn|
    conn.multi do
      conn.zadd('bundles'.freeze, 0, worker_name)
      conn.rpush("bundle:#{worker_name}", Sidekiq.dump_json(args))
    end
  end
end
enqueue_jobs() click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 17
def self.enqueue_jobs
  Sidekiq.redis do |conn|
    workers = conn.zrange('bundles'.freeze, 0, -1)

    workers.each do |worker|
      klass = Object.const_get(worker)
      opts  = klass.get_sidekiq_options
      min_interval = opts['minimum_execution_interval'.freeze]

      if min_interval
        next unless conn.set("bundle:#{worker}:next", 'queue'.freeze, nx: true, ex: min_interval)
      end

      items = conn.lrange("bundle:#{worker}", 0, -1)
      items.map! { |i| Sidekiq.load_json(i) }

      bundle_size = opts['bundle_size'.freeze] || Paquet.options[:default_bundle_size]
      items.each_slice(bundle_size) do |vs|
        Sidekiq::Client.push(
          'class' => worker,
          'queue' => opts['queue'.freeze],
          'args'  => vs
        )
      end

      conn.ltrim("bundle:#{worker}", items.size, -1)
    end
  end
end
new(name) click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 47
def initialize(name)
  @lname = "bundle:#{name}"
end

Public Instance Methods

clear() click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 67
def clear
  Sidekiq.redis { |c| c.del(@lname) }
end
items() click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 63
def items
  Sidekiq.redis { |c| c.lrange(@lname, 0, -1) }
end
queue() click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 51
def queue
  worker_name.constantize.get_sidekiq_options['queue'.freeze]
end
size() click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 59
def size
  Sidekiq.redis { |c| c.llen(@lname) }
end
worker_name() click to toggle source
# File lib/sidekiq/paquet/bundle.rb, line 55
def worker_name
  @lname.split(':').last
end