class Totoro::EnqueueService

Public Class Methods

new(connection, config) click to toggle source
# File lib/totoro/services/enqueue_service.rb, line 5
def initialize(connection, config)
  @connection = connection
  @config = config
end

Public Instance Methods

enqueue(id, payload, attrs = {}) click to toggle source
# File lib/totoro/services/enqueue_service.rb, line 10
def enqueue(id, payload, attrs = {})
  @connection.start unless @connection.connected?
  queue = channel.queue(*@config.queue(id))
  payload = JSON.dump payload
  exchange.publish(payload, options(id, queue.name, attrs))
  Rails.logger.debug "send message to #{queue.name}"
  STDOUT.flush
  channel.close
rescue Bunny::TCPConnectionFailedForAllHosts,
       Bunny::NetworkErrorWrapper,
       Bunny::ChannelAlreadyClosed,
       Bunny::ConnectionAlreadyClosed,
       AMQ::Protocol::EmptyResponseError => error
  @channel.close if @channel.present?
  raise(Totoro::ConnectionBreakError, "type: #{error.class}, message: #{error.message}")
end

Private Instance Methods

channel() click to toggle source
# File lib/totoro/services/enqueue_service.rb, line 33
def channel
  @channel ||= @connection.create_channel
end
exchange() click to toggle source

default exchange is a direct exchange

# File lib/totoro/services/enqueue_service.rb, line 38
def exchange
  @exchange ||= channel.default_exchange
end
options(queue_id, queue_name, attrs) click to toggle source
# File lib/totoro/services/enqueue_service.rb, line 29
def options(queue_id, queue_name, attrs)
  { persistent: @config.queue_persistent?(queue_id), routing_key: queue_name }.merge(attrs)
end