class Slnky::Transport::Rabbit

Attributes

channel[R]
exchanges[R]

Public Class Methods

new() click to toggle source

attr_reader :queues

# File lib/slnky/transport.rb, line 18
def initialize
  @config = Slnky.config
  @host = @config.rabbit.host
  @port = @config.rabbit.port
  @user = @config.rabbit.user
  @pass = @config.rabbit.pass
  userpass = @user ? "#{@user}:#{@pass}@" : ""
  @url = "amqp://#{userpass}#{@host}:#{@port}"
  @channel = nil
  @exchanges = {}
  @queues = {}
end

Public Instance Methods

connected?() click to toggle source
# File lib/slnky/transport.rb, line 71
def connected?
  @channel != nil
end
exchange(desc, type) click to toggle source
# File lib/slnky/transport.rb, line 75
def exchange(desc, type)
  raise 'attempting to create exchange without channel' unless @channel
  name = "slnky.#{desc}"
  @exchanges[desc] =
      case type
        when :fanout
          @channel.fanout(name)
        when :direct
          @channel.direct(name)
        else
          raise "unknown exchange type: #{ex.type}"
      end
end
queue(desc, exchange='events', options={}) click to toggle source
# File lib/slnky/transport.rb, line 89
def queue(desc, exchange='events', options={})
  raise 'attempting to create queue without channel' unless @channel
  name = "service.#{desc}.#{exchange}"
  @queues[name] ||= begin
    options = {
        durable: true
    }.merge(options)
    routing = options.delete(:routing_key)
    bindoptions = routing ? {routing_key: routing} : {}
    @channel.queue(name, options).bind(@exchanges[exchange], bindoptions)
  end
end
start!(service) { |self| ... } click to toggle source
# File lib/slnky/transport.rb, line 31
def start!(service, &block)
  AMQP.start(@url) do |connection|
    @connection = connection
    @channel = AMQP::Channel.new(@connection)
    @channel.on_error do |ch, channel_close|
      raise "Channel-level exception: #{channel_close.reply_text}"
    end

    Signal.trap("INT", proc { self.stop!('Interrupted') })
    Signal.trap("TERM", proc { self.stop!('Terminated') })

    exchange('events', :fanout)
    exchange('logs', :fanout)
    exchange('response', :direct)

    yield self if block_given?

    if service.is_a?(Slnky::Service::Base)
      queue(service.name, 'events').subscribe do |raw|
        event = Slnky::Message.parse(raw)
        service.subscriber.for(event.name) do |name, method|
          service.send(method.to_sym, event.name, event.payload)
        end
      end

      service.timers.each do |seconds, method|
        EventMachine.add_periodic_timer(seconds) do
          service.send(method.to_sym)
        end
      end
    end
  end
end
stop!(msg=nil) click to toggle source
# File lib/slnky/transport.rb, line 65
def stop!(msg=nil)
  return unless @connection
  puts "#{Time.now}: stopping (#{msg})" if msg
  @connection.close { EventMachine.stop { exit } }
end