class Lowkiq::ShardHandler
Attributes
queue_name[R]
shard_index[R]
worker[R]
Public Class Methods
build_many(worker, wrapper)
click to toggle source
# File lib/lowkiq/shard_handler.rb, line 3 def self.build_many(worker, wrapper) (0...worker.shards_count).map do |shard_index| new shard_index, worker, wrapper end end
new(shard_index, worker, wrapper)
click to toggle source
# File lib/lowkiq/shard_handler.rb, line 11 def initialize(shard_index, worker, wrapper) @shard_index = shard_index @queue_name = worker.queue_name @worker = worker @wrapper = wrapper @timestamp = Utils::Timestamp.method(:now) @queue = Queue::Queue.new Lowkiq.server_redis_pool, worker.queue_name, worker.shards_count end
Public Instance Methods
process()
click to toggle source
# File lib/lowkiq/shard_handler.rb, line 22 def process data = @queue.pop @shard_index, limit: @worker.batch_size return false if data.empty? begin batch = batch_from_data data @wrapper.call @worker, batch do @worker.perform batch end @queue.ack @shard_index, data, :success true rescue => ex fail! data, ex back, morgue = separate data @queue.push_back back @queue.push_to_morgue morgue @queue.ack @shard_index, data, :fail false end end
restore()
click to toggle source
# File lib/lowkiq/shard_handler.rb, line 47 def restore data = @queue.processing_data @shard_index return if data.nil? @queue.push_back data @queue.ack @shard_index, data end
Private Instance Methods
batch_from_data(data)
click to toggle source
# File lib/lowkiq/shard_handler.rb, line 56 def batch_from_data(data) data.each_with_object({}) do |job, h| id = job.fetch(:id) payloads = job.fetch(:payloads).map(&:first) h[id] = payloads end end
fail!(data, ex)
click to toggle source
# File lib/lowkiq/shard_handler.rb, line 64 def fail!(data, ex) data.map! do |job| job[:retry_count] += 1 job[:perform_in] = @timestamp.call + @worker.retry_in(job[:retry_count]) job[:error] = ex.message job end end
separate(data)
click to toggle source
# File lib/lowkiq/shard_handler.rb, line 73 def separate(data) back = [] morgue = [] data.each do |job| id = job.fetch(:id) payloads = job.fetch(:payloads) retry_count = job.fetch(:retry_count) perform_in = job.fetch(:perform_in) error = job.fetch(:error, nil) morgue_payload = payloads.shift if retry_count >= @worker.max_retry_count if payloads.any? job = { id: id, payloads: payloads, retry_count: morgue_payload ? 0 : retry_count, perform_in: morgue_payload ? @timestamp.call : perform_in, error: error, }.compact back << job end if morgue_payload job = { id: id, payloads: [morgue_payload], error: error, }.compact morgue << job end end [back, morgue] end