class Slnky::Transport::Rabbit
Attributes
channel[R]
exchanges[R]
Public Class Methods
new()
click to toggle source
attr_reader :queues
# File lib/slnky/transport.rb, line 18 def initialize @config = Slnky.config @host = @config.rabbit.host @port = @config.rabbit.port @user = @config.rabbit.user @pass = @config.rabbit.pass userpass = @user ? "#{@user}:#{@pass}@" : "" @url = "amqp://#{userpass}#{@host}:#{@port}" @channel = nil @exchanges = {} @queues = {} end
Public Instance Methods
connected?()
click to toggle source
# File lib/slnky/transport.rb, line 71 def connected? @channel != nil end
exchange(desc, type)
click to toggle source
# File lib/slnky/transport.rb, line 75 def exchange(desc, type) raise 'attempting to create exchange without channel' unless @channel name = "slnky.#{desc}" @exchanges[desc] = case type when :fanout @channel.fanout(name) when :direct @channel.direct(name) else raise "unknown exchange type: #{ex.type}" end end
queue(desc, exchange='events', options={})
click to toggle source
# File lib/slnky/transport.rb, line 89 def queue(desc, exchange='events', options={}) raise 'attempting to create queue without channel' unless @channel name = "service.#{desc}.#{exchange}" @queues[name] ||= begin options = { durable: true }.merge(options) routing = options.delete(:routing_key) bindoptions = routing ? {routing_key: routing} : {} @channel.queue(name, options).bind(@exchanges[exchange], bindoptions) end end
start!(service) { |self| ... }
click to toggle source
# File lib/slnky/transport.rb, line 31 def start!(service, &block) AMQP.start(@url) do |connection| @connection = connection @channel = AMQP::Channel.new(@connection) @channel.on_error do |ch, channel_close| raise "Channel-level exception: #{channel_close.reply_text}" end Signal.trap("INT", proc { self.stop!('Interrupted') }) Signal.trap("TERM", proc { self.stop!('Terminated') }) exchange('events', :fanout) exchange('logs', :fanout) exchange('response', :direct) yield self if block_given? if service.is_a?(Slnky::Service::Base) queue(service.name, 'events').subscribe do |raw| event = Slnky::Message.parse(raw) service.subscriber.for(event.name) do |name, method| service.send(method.to_sym, event.name, event.payload) end end service.timers.each do |seconds, method| EventMachine.add_periodic_timer(seconds) do service.send(method.to_sym) end end end end end
stop!(msg=nil)
click to toggle source
# File lib/slnky/transport.rb, line 65 def stop!(msg=nil) return unless @connection puts "#{Time.now}: stopping (#{msg})" if msg @connection.close { EventMachine.stop { exit } } end