class Totoro::EnqueueService
Public Class Methods
new(connection, config)
click to toggle source
# File lib/totoro/services/enqueue_service.rb, line 5 def initialize(connection, config) @connection = connection @config = config end
Public Instance Methods
enqueue(id, payload, attrs = {})
click to toggle source
# File lib/totoro/services/enqueue_service.rb, line 10 def enqueue(id, payload, attrs = {}) @connection.start unless @connection.connected? queue = channel.queue(*@config.queue(id)) payload = JSON.dump payload exchange.publish(payload, options(id, queue.name, attrs)) Rails.logger.debug "send message to #{queue.name}" STDOUT.flush channel.close rescue Bunny::TCPConnectionFailedForAllHosts, Bunny::NetworkErrorWrapper, Bunny::ChannelAlreadyClosed, Bunny::ConnectionAlreadyClosed, AMQ::Protocol::EmptyResponseError => error @channel.close if @channel.present? raise(Totoro::ConnectionBreakError, "type: #{error.class}, message: #{error.message}") end
Private Instance Methods
channel()
click to toggle source
# File lib/totoro/services/enqueue_service.rb, line 33 def channel @channel ||= @connection.create_channel end
exchange()
click to toggle source
default exchange is a direct exchange
# File lib/totoro/services/enqueue_service.rb, line 38 def exchange @exchange ||= channel.default_exchange end
options(queue_id, queue_name, attrs)
click to toggle source
# File lib/totoro/services/enqueue_service.rb, line 29 def options(queue_id, queue_name, attrs) { persistent: @config.queue_persistent?(queue_id), routing_key: queue_name }.merge(attrs) end