module Insque
Constants
- DEFAULT_INBOX_TTL
- DEFAULT_PROCESSING_TTL
- VERSION
Public Class Methods
broadcast(message, params = nil, recipient = :any)
click to toggle source
# File lib/insque.rb, line 67 def self.broadcast message, params = nil, recipient = :any keys = [] case recipient when :any pointers = @redis.keys('{insque}inbox_pointer_*') keys = pointers.count > 0 ? @redis.mget(*pointers) : [] when :self keys = [@inbox] when :slow keys = [@slow_inbox] else keys = recipient.is_a?(Array) ? recipient : [recipient] end value = { message: "#{@sender}_#{message}", params: params, broadcasted_at: Time.now.utc }.to_json logger.debug event: :sending, message: value, to: keys.to_json @redis.multi do |r| keys.each {|k| r.lpush k, value} end end
inbox_ttl()
click to toggle source
# File lib/insque.rb, line 16 def self.inbox_ttl @inbox_ttl || DEFAULT_INBOX_TTL end
inbox_ttl=(val)
click to toggle source
# File lib/insque.rb, line 12 def self.inbox_ttl= val @inbox_ttl = val end
janitor(redis=nil)
click to toggle source
# File lib/insque.rb, line 96 def self.janitor redis=nil real_janitor @inbox, @processing, (redis || create_redis_connection), @inbox_pointer end
listen(worker_name='', redis=nil)
click to toggle source
# File lib/insque.rb, line 87 def self.listen worker_name='', redis=nil redis ||= create_redis_connection do_listen @inbox, @processing, redis, worker_name, @inbox_pointer end
logger()
click to toggle source
# File lib/insque.rb, line 40 def self.logger @logger ||= JsonLogger.new STDOUT, additional_fields: { tag: 'insque' } end
logger=(l)
click to toggle source
# File lib/insque.rb, line 36 def self.logger= l @logger = l end
processing_ttl()
click to toggle source
# File lib/insque.rb, line 24 def self.processing_ttl @processing_ttl || DEFAULT_PROCESSING_TTL end
processing_ttl=(val)
click to toggle source
# File lib/insque.rb, line 20 def self.processing_ttl= val @processing_ttl = val end
redis()
click to toggle source
# File lib/insque.rb, line 32 def self.redis @redis end
redis=(redis)
click to toggle source
# File lib/insque.rb, line 28 def self.redis= redis @redis = redis end
redis_class=(klass)
click to toggle source
# File lib/insque.rb, line 44 def self.redis_class= klass @redis_class = klass end
redis_config()
click to toggle source
# File lib/insque.rb, line 48 def self.redis_config @redis_config end
redis_config=(redis)
click to toggle source
# File lib/insque.rb, line 52 def self.redis_config= redis @redis_config = redis @redis = self.create_redis_connection end
sender=(sender)
click to toggle source
# File lib/insque.rb, line 57 def self.sender= sender @sender = sender @inbox = "{insque}inbox_#{sender}" @inbox_pointer = "{insque}inbox_pointer_#{sender}" @processing = "{insque}processing_#{sender}" @slow_inbox = "{insque}slow_inbox_#{sender}" @slow_processing = "{insque}slow_processing_#{sender}" create_send_later_handler end
slow_janitor(redis=nil)
click to toggle source
# File lib/insque.rb, line 100 def self.slow_janitor redis=nil real_janitor @slow_inbox, @slow_processing, (redis || create_redis_connection) end
slow_listen(worker_name='', redis=nil)
click to toggle source
# File lib/insque.rb, line 92 def self.slow_listen worker_name='', redis=nil do_listen @slow_inbox, @slow_processing, (redis || create_redis_connection), worker_name end
Private Class Methods
create_redis_connection()
click to toggle source
# File lib/insque.rb, line 159 def self.create_redis_connection (@redis_class || Redis).new @redis_config end
create_send_later_handler()
click to toggle source
# File lib/insque.rb, line 163 def self.create_send_later_handler define_singleton_method("#{@sender}_send_later") do |msg| Kernel.const_get(msg['params']['class']).unscoped.find(msg['params']['id']).send(msg['params']['method'], *msg['params']['args']) end end
do_listen(inbox, processing, redis, worker_name, pointer=nil)
click to toggle source
# File lib/insque.rb, line 105 def self.do_listen inbox, processing, redis, worker_name, pointer=nil logger.info event: :starting, worker_name: worker_name, inbox: inbox loop do redis.setex(pointer, inbox_ttl, inbox) if pointer message = redis.brpoplpush(inbox, processing, 0) begin logger.debug event: :receiving, message: message, inbox: inbox parsed_message = JSON.parse message send(parsed_message['message'], parsed_message) rescue NoMethodError rescue => e logger.error e ensure redis.lrem processing, 0, message end end end
real_janitor(inbox, processing, redis, pointer=nil)
click to toggle source
# File lib/insque.rb, line 123 def self.real_janitor inbox, processing, redis, pointer=nil loop do redis.setex(pointer, inbox_ttl, inbox) if pointer redis.watch processing errors = [] restart = [] delete = [] redis.lrange(processing, 0, -1).each do |m| begin parsed_message = JSON.parse(m) if parsed_message['restarted_at'] && Time.now.to_i - Time.parse(parsed_message['restarted_at']).to_i > processing_ttl errors << m delete << m elsif Time.now.to_i - Time.parse(parsed_message['broadcasted_at']).to_i > processing_ttl restart << parsed_message.merge(restarted_at: Time.now.utc).to_json delete << m end rescue => e logger.error e end end result = redis.multi do |r| restart.each {|m| r.lpush inbox, m } delete.each {|m| r.lrem processing, 0, m } end if result errors.each {|m| logger.debug event: :deleting, message: m } restart.each {|m| logger.debug event: :restarting, message: m } logger.info event: :cleaning, status: 'success', inbox: inbox else logger.info event: :cleaning, status: 'failed', inbox: inbox end sleep(Random.rand((inbox_ttl.to_f / 10).ceil) + 1) end end