class OmniBot::AMQPConsumer
AMQP consumer class
Attributes
db[RW]
handlers[RW]
Public Class Methods
new(config)
click to toggle source
# File lib/omnibot/amqpconsumer.rb, line 15 def initialize(config) @config = config end
Public Instance Methods
amqp_loop()
click to toggle source
# File lib/omnibot/amqpconsumer.rb, line 19 def amqp_loop AMQP.start do |connection| OmniLog::info "Setup amqp gem #{AMQP::VERSION}, AMQP protocol #{AMQ::Protocol::VERSION}..." connection.on_tcp_connection_loss do |conn, _settings| OmniLog::info '[network failure] Trying to reconnect...' conn.reconnect(false, 30) end mq = AMQP::Channel.new(connection) exchange = mq.direct(Helpers::amqp_exchange_name) queue = mq.queue('', exclusive: true).bind(exchange, routing_key: Helpers::amqp_routing_key) OmniLog::info 'Setup omnibot...' @omnibot = JabberBot.new(Jabber::JID::new(@config['omnibotuser']), @config['omnibotpass']) @omnibot.timer_provider = EM @omnibot.set_subscriber Jabber::JID::new(@config['notifyjid']), @config['notifyresource'] @omnibot.connect @handlers.each_with_index do |handler, index| OmniLog::info "Setup handler #{handler}..." handler.timer_provider = EM handler.jabber_messenger { |message| send_message message } handler.startup_pause = index * 10 handler.start end OmniLog::info '==== AMQP is ready ====' queue.subscribe do |msg| message = Marshal.load(Base64.decode64(msg)).force_encoding('UTF-8') send_message message end end end
send_message(message)
click to toggle source
# File lib/omnibot/amqpconsumer.rb, line 9 def send_message(message) @omnibot.add_message [Time.now, message] rescue Object => e OmniLog::error "Sending message error: #{e.message}\ntrace:\n#{Helpers::backtrace e}\nIgnoring..." end
start()
click to toggle source
Main AMQP loop
# File lib/omnibot/amqpconsumer.rb, line 56 def start # exit hook Signal.trap('INT') do OmniLog::info "It's a trap, should exit..." AMQP.stop { EM.stop } end begin exception_cb = proc { |e| OmniLog::error "Cannot connect to AMQP: #{e.message}" } Retryable.retryable(tries: 5, sleep: ->(n) { 3**n }, exception_cb: exception_cb, on: AMQP::TCPConnectionFailed) do amqp_loop end rescue => e OmniLog::error "AMQP/Jabber setup error: #{e.message}\ntrace:\n#{Helpers::backtrace e}\nExiting..." AMQP.stop { EM.stop } end OmniLog::info 'Exited' end