module Lowkiq

Constants

VERSION

Attributes

build_scheduler[RW]
build_splitter[RW]
client_pool_size[RW]
dump_payload[RW]
last_words[RW]
load_payload[RW]
on_server_init[RW]
poll_interval[RW]
pool_timeout[RW]
redis[RW]
server_middlewares[RW]
threads_per_node[RW]
workers[RW]

Public Class Methods

build_by_node_splitter(number_of_nodes, node_number) click to toggle source
# File lib/lowkiq.rb, line 89
def build_by_node_splitter(number_of_nodes, node_number)
  Lowkiq::Splitters::ByNode.new(
    number_of_nodes,
    node_number,
    Lowkiq.threads_per_node,
  )
end
build_default_splitter() click to toggle source
# File lib/lowkiq.rb, line 85
def build_default_splitter
  Lowkiq::Splitters::Default.new Lowkiq.threads_per_node
end
build_lag_scheduler() click to toggle source
# File lib/lowkiq.rb, line 72
def build_lag_scheduler
  Schedulers::Lag.new(
    ->() { sleep Lowkiq.poll_interval },
    Queue::ShardMetrics.new(self.server_redis_pool)
  )
end
build_seq_scheduler() click to toggle source
# File lib/lowkiq.rb, line 79
def build_seq_scheduler
  Schedulers::Seq.new(
    ->() { sleep Lowkiq.poll_interval }
  )
end
client_redis_pool() click to toggle source
# File lib/lowkiq.rb, line 51
def client_redis_pool
  @client_redis_pool ||= ConnectionPool.new(size: client_pool_size, timeout: pool_timeout, &redis)
end
server_redis_pool() click to toggle source
# File lib/lowkiq.rb, line 47
def server_redis_pool
  @server_redis_pool ||= ConnectionPool.new(size: threads_per_node, timeout: pool_timeout, &redis)
end
server_wrapper() click to toggle source
# File lib/lowkiq.rb, line 55
def server_wrapper
  null = -> (worker, batch, &block) { block.call }
  server_middlewares.reduce(null) do |wrapper, m|
    -> (worker, batch, &block) do
      wrapper.call worker, batch do
        m.call worker, batch, &block
      end
    end
  end
end
shard_handlers() click to toggle source
# File lib/lowkiq.rb, line 66
def shard_handlers
  self.workers.flat_map do |w|
    ShardHandler.build_many w, self.server_wrapper
  end
end

Public Instance Methods

client_actions() click to toggle source
# File lib/lowkiq/worker.rb, line 33
def client_actions
  Queue::Actions.new client_queue, client_queries
end
client_queries() click to toggle source
# File lib/lowkiq/worker.rb, line 29
def client_queries
  Queue::Queries.new Lowkiq.client_redis_pool, self.queue_name
end
client_queue() click to toggle source
# File lib/lowkiq/worker.rb, line 25
def client_queue
  Queue::Queue.new Lowkiq.client_redis_pool, self.queue_name, self.shards_count
end
perform(payload) click to toggle source
# File lib/lowkiq/worker.rb, line 21
def perform(payload)
  fail "not implemented"
end
perform_async(batch) click to toggle source
# File lib/lowkiq/worker.rb, line 37
def perform_async(batch)
  client_queue.push batch
end
retry_in(retry_count) click to toggle source

i.e. 15, 16, 31, 96, 271, … seconds + a random amount of time

# File lib/lowkiq/worker.rb, line 17
def retry_in(retry_count)
  (retry_count ** 4) + 15 + (rand(30) * (retry_count + 1))
end