class RapidsRivers::RabbitMqRapids

Understands an event bus based on RabbitMQ

Constants

RAPIDS

Public Class Methods

new(host_ip, port) click to toggle source
# File lib/rapids_rivers/rabbit_mq/rabbit_mq_rapids.rb, line 17
def initialize(host_ip, port)
  host_ip = host_ip || ENV['RABBITMQ_IP'] || throw("Need IP address for RabbitMQ")
  port = port || ENV['RABBITMQ_PORT'] || 5672
  @connection = Bunny.new(
    :host => host_ip,
    :port => port.to_i,
    :automatically_recover => false)
end

Public Instance Methods

close() click to toggle source
# File lib/rapids_rivers/rabbit_mq/rabbit_mq_rapids.rb, line 36
def close
  channel.close
  @connection.close
end
publish(packet) click to toggle source
# File lib/rapids_rivers/rabbit_mq/rabbit_mq_rapids.rb, line 26
def publish(packet)
  exchange.publish packet.to_json
end
queue(queue_name = "") click to toggle source
# File lib/rapids_rivers/rabbit_mq/rabbit_mq_rapids.rb, line 30
def queue queue_name = ""
  channel.queue(queue_name || "", exclusive: true, auto_delete: true).tap do |queue|
    queue.bind exchange
  end
end

Private Instance Methods

channel() click to toggle source
# File lib/rapids_rivers/rabbit_mq/rabbit_mq_rapids.rb, line 43
def channel
  return @channel if @channel
  @connection.start
  @channel = @connection.create_channel
end
exchange() click to toggle source
# File lib/rapids_rivers/rabbit_mq/rabbit_mq_rapids.rb, line 49
def exchange
  @exchange ||= channel.fanout(RAPIDS, durable: true, auto_delete: true)
end