module Bunny::Hop

Constants

VERSION

Public Class Methods

new() click to toggle source
# File lib/bunny/hop.rb, line 5
def initialize
  @connection = Bunny.new(
    host: ENV.fetch('RABBITMQ_HOST', 'rabbitmq'), 
    user: ENV.fetch('RABBITMQ_USER', 'rabbitmq'), 
    password: ENV.fetch('RABBITMQ_PASSWORD', 'rabbitmq')
  )
  @connection.start
  @channel = @connection.create_channel
end

Public Instance Methods

publish(queue, msg) click to toggle source
# File lib/bunny/hop.rb, line 15
def publish(queue, msg)
  lock = Mutex.new
  condition = ConditionVariable.new
  response = nil
  call_id = "#{rand}#{rand}#{rand}"
  exchange = @channel.default_exchange

  reply_queue = @channel.queue('', exclusive: true)
  reply_queue.subscribe do |_delivery_info, properties, payload|
    if properties[:correlation_id] == call_id
      response = payload

      lock.synchronize { condition.signal }
    end
  end

  exchange.publish(msg,
    routing_key: queue,
    correlation_id: call_id,
    reply_to: reply_queue.name)

  Timeout::timeout(5) {
    lock.synchronize { condition.wait(lock) }
  }

  @channel.close
  @connection.close

  response
rescue Timeout::Error => e
  @channel.close
  @connection.close
  raise e
end
subscribe(queue) { || ... } click to toggle source
# File lib/bunny/hop.rb, line 50
def subscribe(queue)
  queue = @channel.queue(queue)
  exchange = @channel.default_exchange

  queue.subscribe do |_delivery_info, properties, payload|
    puts _delivery_info
    puts properties
    puts payload
    
    result = yield

    exchange.publish(
      result,
      routing_key: properties.reply_to,
      correlation_id: properties.correlation_id
    )
  end
end