module SidekiqBulkJob
Constants
- VERSION
Attributes
async_delay[RW]
batch_size[RW]
logger[RW]
prefix[RW]
process_fail[RW]
queue[RW]
redis[RW]
scheduled_delay[RW]
Public Class Methods
bulk_run(job_class_name, key, queue: self.queue, async: true)
click to toggle source
# File lib/sidekiq_bulk_job.rb, line 202 def bulk_run(job_class_name, key, queue: self.queue, async: true) args_array = flush(key) return if args_array.nil? || args_array.empty? async ? SidekiqBulkJob::BulkJob.client_push("queue" => queue, "class" => SidekiqBulkJob::BulkJob, "args" => [job_class_name, args_array]) : SidekiqBulkJob::BulkJob.new.perform(job_class_name, args_array) end
clear(key)
click to toggle source
# File lib/sidekiq_bulk_job.rb, line 194 def clear(key) script = %Q{ local size = redis.call('llen', KEYS[1]) if size == 0 then redis.call('del', KEYS[1]) end } client.eval script, [key] end
client()
click to toggle source
# File lib/sidekiq_bulk_job.rb, line 164 def client if redis.nil? raise ArgumentError.new("Please initialize redis first!") end redis end
config(redis: , logger: , process_fail: , async_delay: 60, scheduled_delay: 10, queue: :default, batch_size: 3000, prefix: "SidekiqBulkJob")
click to toggle source
# File lib/sidekiq_bulk_job.rb, line 69 def config(redis: , logger: , process_fail: , async_delay: 60, scheduled_delay: 10, queue: :default, batch_size: 3000, prefix: "SidekiqBulkJob") if redis.nil? raise ArgumentError.new("redis not allow nil") end if logger.nil? raise ArgumentError.new("logger not allow nil") end if process_fail.nil? raise ArgumentError.new("process_fail not allow nil") end if async_delay.to_f < 2 raise ArgumentError.new("async_delay not allow less than 2 seconds.") elsif async_delay.to_f > 5 * 60 raise ArgumentError.new("async_delay not allow greater than 5 minutes.") end if scheduled_delay.to_f < 2 raise ArgumentError.new("scheduled_delay not allow less than 2 seconds.") elsif scheduled_delay.to_f > 30 raise ArgumentError.new("scheduled_delay not allow greater than 2 seconds.") end self.redis = redis self.queue = queue self.batch_size = batch_size self.prefix = prefix self.logger = logger self.process_fail = process_fail self.async_delay = async_delay.to_f self.scheduled_delay = scheduled_delay.to_f end
fail_callback(job_class_name: , args:, exception: )
click to toggle source
# File lib/sidekiq_bulk_job.rb, line 223 def fail_callback(job_class_name: , args:, exception: ) process_fail.call(job_class_name, args, exception) end
flush(key)
click to toggle source
# File lib/sidekiq_bulk_job.rb, line 180 def flush(key) result = [] begin _result, success = client.multi do |multi| multi.lrange(key, 0, batch_size) multi.ltrim(key, batch_size+1, -1) end result += _result count = client.llen key end while count > 0 clear(key) result.reverse end
generate_key(job_class_name)
click to toggle source
# File lib/sidekiq_bulk_job.rb, line 160 def generate_key(job_class_name) "#{prefix}:#{job_class_name}" end
monitor(job_class_name, queue: self.queue)
click to toggle source
# File lib/sidekiq_bulk_job.rb, line 208 def monitor(job_class_name, queue: self.queue) scheduled_set = Sidekiq::ScheduledSet.new _monitor = scheduled_set.find do |job| if job.klass == SidekiqBulkJob::Monitor.to_s timestamp, _job_class_name = job.args _job_class_name == job_class_name end end if !_monitor.nil? # TODO debug log else SidekiqBulkJob::Monitor.client_push("queue" => queue, "at" => (time_now + self.async_delay).to_f, "class" => SidekiqBulkJob::Monitor, "args" => [time_now.to_f, job_class_name]) end end
need_flush?(key)
click to toggle source
# File lib/sidekiq_bulk_job.rb, line 175 def need_flush?(key) count = client.llen key return true if count >= batch_size end
perform_async(job_class, *perfrom_args)
click to toggle source
无法定义具体执行时间,相当于perform_async的批量执行 example:
SidekiqBulkJob.perform_async(SomeWorkerClass, *args)
# File lib/sidekiq_bulk_job.rb, line 108 def perform_async(job_class, *perfrom_args) process(job_class_name: job_class.to_s, perfrom_args: perfrom_args) nil end
perform_in(at, job_class, *perfrom_args)
click to toggle source
延迟一段时间执行 example:
SidekiqBulkJob.perform_at(Date.parse("2020-12-01"), SomeWorkerClass, *args)
# File lib/sidekiq_bulk_job.rb, line 116 def perform_in(at, job_class, *perfrom_args) int = at.to_f now = time_now ts = (int < 1_000_000_000 ? now + int : int).to_f # Optimization to enqueue something now that is scheduled to go out now or in the past if ts <= now.to_f process(job_class_name: job_class.to_s, perfrom_args: perfrom_args) else process(at: ts, job_class_name: job_class.to_s, perfrom_args: perfrom_args) end end
Also aliased as: perform_at
process(job_class_name: , at: nil, perfrom_args: [], queue: self.queue)
click to toggle source
# File lib/sidekiq_bulk_job.rb, line 132 def process(job_class_name: , at: nil, perfrom_args: [], queue: self.queue) if at.nil? key = generate_key(job_class_name) client.lpush key, SidekiqBulkJob::Utils.dump(perfrom_args) bulk_run(job_class_name, key, queue: queue) if need_flush?(key) monitor(job_class_name, queue: queue) else scheduled_set = Sidekiq::ScheduledSet.new args_redis_key = nil target = scheduled_set.find do |job| if job.klass == SidekiqBulkJob::ScheduledJob.to_s && job.at.to_i.between?((at - self.scheduled_delay).to_i, (at + self.scheduled_delay).to_i) # 允许30秒延迟 _job_class_name, args_redis_key = job.args _job_class_name == job_class_name end end if !target.nil? && !args_redis_key.nil? && !args_redis_key.empty? # 往现有的job参数set里增加参数 client.lpush args_redis_key, SidekiqBulkJob::Utils.dump(perfrom_args) else # 新增加一个 args_redis_key = SecureRandom.hex client.lpush args_redis_key, SidekiqBulkJob::Utils.dump(perfrom_args) SidekiqBulkJob::ScheduledJob.client_push("queue" => queue, "class" => SidekiqBulkJob::ScheduledJob, "at" => at, "args" => [job_class_name, args_redis_key]) end end end
set(options)
click to toggle source
# File lib/sidekiq_bulk_job.rb, line 100 def set(options) SidekiqBulkJob::Setter.new(options) end
time_now()
click to toggle source
# File lib/sidekiq_bulk_job.rb, line 171 def time_now Time.now end