class Raq::Server

Constants

CONNECTION_DEFAULTS

Attributes

app[R]
connection[R]
connection_options[R]
queue_names[R]

Public Class Methods

new(options={}, app=nil, &block) click to toggle source
# File lib/raq/server.rb, line 11
def initialize(options={}, app=nil, &block)
  @queue_names = options.fetch(:queues) { raise ArgumentError, "You must provide a list of at least 1 queue to subscribe to." }
  @connection_options = options.fetch(:connection,CONNECTION_DEFAULTS)
  @app = app
  @app = Server::Builder.new(&block).to_app if block
end

Public Instance Methods

connect() click to toggle source
# File lib/raq/server.rb, line 30
def connect
  @connection = AMQP.connect(self.connection_options)
  @channel    = AMQP::Channel.new(@connection)
  #@channel.prefetch(1)
  @queues     = Array(self.queue_names).collect do |queue_name|
    queue = @channel.queue(queue_name, durable: true, auto_delete: false)
    queue.subscribe(ack: true, &method(:handle_message))
  end
end
handle_message(meta, payload) click to toggle source
# File lib/raq/server.rb, line 40
def handle_message(meta, payload)
  @app.call(meta,payload)
end
run() click to toggle source
# File lib/raq/server.rb, line 18
def run
  starter = proc do
    connect
  end

  if EventMachine.reactor_running?
    starter.call
  else
    EventMachine.run(&starter)
  end
end