module SidekiqBulkJob

Constants

VERSION

Attributes

async_delay[RW]
batch_size[RW]
logger[RW]
prefix[RW]
process_fail[RW]
queue[RW]
redis[RW]
scheduled_delay[RW]

Public Class Methods

bulk_run(job_class_name, key, queue: self.queue, async: true) click to toggle source
# File lib/sidekiq_bulk_job.rb, line 202
def bulk_run(job_class_name, key, queue: self.queue, async: true)
  args_array = flush(key)
  return if args_array.nil? || args_array.empty?
  async ? SidekiqBulkJob::BulkJob.client_push("queue" => queue, "class" => SidekiqBulkJob::BulkJob, "args" => [job_class_name, args_array]) : SidekiqBulkJob::BulkJob.new.perform(job_class_name, args_array)
end
clear(key) click to toggle source
# File lib/sidekiq_bulk_job.rb, line 194
def clear(key)
  script = %Q{
    local size = redis.call('llen', KEYS[1])
    if size == 0 then redis.call('del', KEYS[1]) end
  }
  client.eval script, [key]
end
client() click to toggle source
# File lib/sidekiq_bulk_job.rb, line 164
def client
  if redis.nil?
    raise ArgumentError.new("Please initialize redis first!")
  end
  redis
end
config(redis: , logger: , process_fail: , async_delay: 60, scheduled_delay: 10, queue: :default, batch_size: 3000, prefix: "SidekiqBulkJob") click to toggle source
# File lib/sidekiq_bulk_job.rb, line 69
def config(redis: , logger: , process_fail: , async_delay: 60, scheduled_delay: 10, queue: :default, batch_size: 3000, prefix: "SidekiqBulkJob")
  if redis.nil?
    raise ArgumentError.new("redis not allow nil")
  end
  if logger.nil?
    raise ArgumentError.new("logger not allow nil")
  end
  if process_fail.nil?
    raise ArgumentError.new("process_fail not allow nil")
  end
  if async_delay.to_f < 2
    raise ArgumentError.new("async_delay not allow less than 2 seconds.")
  elsif async_delay.to_f > 5 * 60
    raise ArgumentError.new("async_delay not allow greater than 5 minutes.")
  end
  if scheduled_delay.to_f < 2
    raise ArgumentError.new("scheduled_delay not allow less than 2 seconds.")
  elsif scheduled_delay.to_f > 30
    raise ArgumentError.new("scheduled_delay not allow greater than 2 seconds.")
  end

  self.redis = redis
  self.queue = queue
  self.batch_size = batch_size
  self.prefix = prefix
  self.logger = logger
  self.process_fail = process_fail
  self.async_delay = async_delay.to_f
  self.scheduled_delay = scheduled_delay.to_f
end
fail_callback(job_class_name: , args:, exception: ) click to toggle source
# File lib/sidekiq_bulk_job.rb, line 223
def fail_callback(job_class_name: , args:, exception: )
  process_fail.call(job_class_name, args, exception)
end
flush(key) click to toggle source
# File lib/sidekiq_bulk_job.rb, line 180
def flush(key)
  result = []
  begin
    _result, success = client.multi do |multi|
      multi.lrange(key, 0, batch_size)
      multi.ltrim(key, batch_size+1, -1)
    end
    result += _result
    count = client.llen key
  end while count > 0
  clear(key)
  result.reverse
end
generate_key(job_class_name) click to toggle source
# File lib/sidekiq_bulk_job.rb, line 160
def generate_key(job_class_name)
  "#{prefix}:#{job_class_name}"
end
monitor(job_class_name, queue: self.queue) click to toggle source
# File lib/sidekiq_bulk_job.rb, line 208
def monitor(job_class_name, queue: self.queue)
  scheduled_set = Sidekiq::ScheduledSet.new
  _monitor = scheduled_set.find do |job|
    if job.klass == SidekiqBulkJob::Monitor.to_s
      timestamp, _job_class_name = job.args
      _job_class_name == job_class_name
    end
  end
  if !_monitor.nil?
    # TODO debug log
  else
    SidekiqBulkJob::Monitor.client_push("queue" => queue, "at" => (time_now + self.async_delay).to_f, "class" => SidekiqBulkJob::Monitor, "args" => [time_now.to_f, job_class_name])
  end
end
need_flush?(key) click to toggle source
# File lib/sidekiq_bulk_job.rb, line 175
def need_flush?(key)
  count = client.llen key
  return true if count >= batch_size
end
perform_async(job_class, *perfrom_args) click to toggle source

无法定义具体执行时间,相当于perform_async的批量执行 example:

SidekiqBulkJob.perform_async(SomeWorkerClass, *args)
# File lib/sidekiq_bulk_job.rb, line 108
def perform_async(job_class, *perfrom_args)
  process(job_class_name: job_class.to_s, perfrom_args: perfrom_args)
  nil
end
perform_at(at, job_class, *perfrom_args)
Alias for: perform_in
perform_in(at, job_class, *perfrom_args) click to toggle source

延迟一段时间执行 example:

SidekiqBulkJob.perform_at(Date.parse("2020-12-01"), SomeWorkerClass, *args)
# File lib/sidekiq_bulk_job.rb, line 116
def perform_in(at, job_class, *perfrom_args)
  int = at.to_f
  now = time_now
  ts = (int < 1_000_000_000 ? now + int : int).to_f

  # Optimization to enqueue something now that is scheduled to go out now or in the past
  if ts <= now.to_f
    process(job_class_name: job_class.to_s, perfrom_args: perfrom_args)
  else
    process(at: ts, job_class_name: job_class.to_s, perfrom_args: perfrom_args)
  end
end
Also aliased as: perform_at
process(job_class_name: , at: nil, perfrom_args: [], queue: self.queue) click to toggle source
# File lib/sidekiq_bulk_job.rb, line 132
def process(job_class_name: , at: nil, perfrom_args: [], queue: self.queue)
  if at.nil?
    key = generate_key(job_class_name)
    client.lpush key, SidekiqBulkJob::Utils.dump(perfrom_args)
    bulk_run(job_class_name, key, queue: queue) if need_flush?(key)
    monitor(job_class_name, queue: queue)
  else
    scheduled_set = Sidekiq::ScheduledSet.new
    args_redis_key = nil
    target = scheduled_set.find do |job|
      if job.klass == SidekiqBulkJob::ScheduledJob.to_s &&
            job.at.to_i.between?((at - self.scheduled_delay).to_i, (at + self.scheduled_delay).to_i) # 允许30秒延迟
          _job_class_name, args_redis_key = job.args
          _job_class_name == job_class_name
       end
    end
    if !target.nil? && !args_redis_key.nil? && !args_redis_key.empty?
      # 往现有的job参数set里增加参数
      client.lpush args_redis_key, SidekiqBulkJob::Utils.dump(perfrom_args)
    else
      # 新增加一个
      args_redis_key = SecureRandom.hex
      client.lpush args_redis_key, SidekiqBulkJob::Utils.dump(perfrom_args)
      SidekiqBulkJob::ScheduledJob.client_push("queue" => queue, "class" => SidekiqBulkJob::ScheduledJob, "at" => at, "args" => [job_class_name, args_redis_key])
    end
  end
end
set(options) click to toggle source
# File lib/sidekiq_bulk_job.rb, line 100
def set(options)
  SidekiqBulkJob::Setter.new(options)
end
time_now() click to toggle source
# File lib/sidekiq_bulk_job.rb, line 171
def time_now
  Time.now
end