class PikaQue::Handlers::DelayHandler
Constants
- DEFAULT_DELAY_OPTS
default delays are 1min, 10min, 1hr, 24hr
Public Class Methods
new(opts = {})
click to toggle source
# File lib/pika_que/handlers/delay_handler.rb, line 21 def initialize(opts = {}) @opts = PikaQue.config.merge(DEFAULT_DELAY_OPTS).merge(opts) @connection = opts[:connection] || PikaQue.connection @channel = @connection.create_channel @delay_monitor = Monitor.new @root_monitor = Monitor.new # make sure it is in descending order @delay_periods = @opts[:delay_periods].sort!{ |x,y| y <=> x } @backoff_multiplier = @opts[:delay_backoff_multiplier] # This is for example/dev/test @delay_name = "#{@opts[:exchange]}-delay" @requeue_name = "#{@opts[:exchange]}-delay-requeue" @root_name = @opts[:exchange] setup_exchanges setup_queues end
Public Instance Methods
bind_queue(queue, routing_key)
click to toggle source
# File lib/pika_que/handlers/delay_handler.rb, line 40 def bind_queue(queue, routing_key) # bind the worker queue to requeue exchange queue.bind(@requeue_exchange, :routing_key => routing_key) end
close()
click to toggle source
# File lib/pika_que/handlers/delay_handler.rb, line 63 def close @channel.close unless @channel.closed? end
handle(response_code, channel, delivery_info, metadata, msg, error = nil)
click to toggle source
# File lib/pika_que/handlers/delay_handler.rb, line 45 def handle(response_code, channel, delivery_info, metadata, msg, error = nil) delay_period = next_delay_period(metadata[:headers]) if delay_period > 0 # We will publish the message to the delay exchange PikaQue.logger.info "DelayHandler msg=delaying, delay=#{delay_period}, headers=#{metadata[:headers]}" publish_delay(delivery_info, msg, metadata[:headers].merge({ 'delay' => delay_period })) channel.acknowledge(delivery_info.delivery_tag, false) else # Publish the original message with the routing_key to the root exchange work_queue = metadata[:headers]['work_queue'] PikaQue.logger.info "DelayHandler msg=publishing, queue=#{work_queue}, headers=#{metadata[:headers]}" publish_work(work_queue, msg) channel.acknowledge(delivery_info.delivery_tag, false) end end
Private Instance Methods
exchange_durable?()
click to toggle source
# File lib/pika_que/handlers/delay_handler.rb, line 99 def exchange_durable? @opts.fetch(:exchange_options, {}).fetch(:durable, false) end
next_delay_period(headers)
click to toggle source
# File lib/pika_que/handlers/delay_handler.rb, line 115 def next_delay_period(headers) work_at = headers['work_at'] t = (work_at - Time.now.to_f).round # greater check is to ignore remainder of time (seconds) smaller than the last delay @delay_periods.bsearch{ |e| t >= e && (t / e.to_f).round > 0 } || 0 end
publish_delay(delivery_info, msg, headers)
click to toggle source
# File lib/pika_que/handlers/delay_handler.rb, line 103 def publish_delay(delivery_info, msg, headers) @delay_monitor.synchronize do @delay_exchange.publish(msg, routing_key: delivery_info.routing_key, headers: headers) end end
publish_work(routing_key, msg)
click to toggle source
# File lib/pika_que/handlers/delay_handler.rb, line 109 def publish_work(routing_key, msg) @root_monitor.synchronize do @root_exchange.publish(msg, routing_key: routing_key) end end
queue_durable?()
click to toggle source
# File lib/pika_que/handlers/delay_handler.rb, line 95 def queue_durable? @opts.fetch(:queue_options, {}).fetch(:durable, false) end
setup_exchanges()
click to toggle source
# File lib/pika_que/handlers/delay_handler.rb, line 69 def setup_exchanges PikaQue.logger.debug "DelayHandler creating exchange=#{@delay_name}" @delay_exchange = @channel.exchange(@delay_name, :type => 'headers', :durable => exchange_durable?) PikaQue.logger.debug "DelayHandler creating exchange=#{@requeue_name}" @requeue_exchange = @channel.exchange(@requeue_name, :type => 'topic', :durable => exchange_durable?) PikaQue.logger.debug "DelayHandler getting exchange=#{@root_name}" @root_exchange = @channel.exchange(@root_name, :type => 'direct', :durable => exchange_durable?) end
setup_queues()
click to toggle source
# File lib/pika_que/handlers/delay_handler.rb, line 80 def setup_queues @delay_periods.each do |t| # Create the queues and bindings PikaQue.logger.debug "DelayHandler creating queue=#{@delay_name}-#{t} x-dead-letter-exchange=#{@requeue_name}" delay_queue = @channel.queue("#{@delay_name}-#{t}", :durable => queue_durable?, :arguments => { :'x-dead-letter-exchange' => @requeue_name, :'x-message-ttl' => t * @backoff_multiplier }) delay_queue.bind(@delay_exchange, :arguments => { :delay => t }) end end