class OmniBot::AMQPConsumer

AMQP consumer class

Attributes

db[RW]
handlers[RW]

Public Class Methods

new(config) click to toggle source
# File lib/omnibot/amqpconsumer.rb, line 15
def initialize(config)
  @config = config
end

Public Instance Methods

amqp_loop() click to toggle source
# File lib/omnibot/amqpconsumer.rb, line 19
def amqp_loop
  AMQP.start do |connection|
    OmniLog::info "Setup amqp gem #{AMQP::VERSION}, AMQP protocol #{AMQ::Protocol::VERSION}..."

    connection.on_tcp_connection_loss do |conn, _settings|
      OmniLog::info '[network failure] Trying to reconnect...'
      conn.reconnect(false, 30)
    end

    mq = AMQP::Channel.new(connection)
    exchange = mq.direct(Helpers::amqp_exchange_name)
    queue = mq.queue('', exclusive: true).bind(exchange, routing_key: Helpers::amqp_routing_key)

    OmniLog::info 'Setup omnibot...'
    @omnibot = JabberBot.new(Jabber::JID::new(@config['omnibotuser']), @config['omnibotpass'])
    @omnibot.timer_provider = EM
    @omnibot.set_subscriber Jabber::JID::new(@config['notifyjid']), @config['notifyresource']
    @omnibot.connect

    @handlers.each_with_index do |handler, index|
      OmniLog::info "Setup handler #{handler}..."
      handler.timer_provider = EM
      handler.jabber_messenger { |message| send_message message }
      handler.startup_pause = index * 10
      handler.start
    end

    OmniLog::info '==== AMQP is ready ===='

    queue.subscribe do |msg|
      message = Marshal.load(Base64.decode64(msg)).force_encoding('UTF-8')
      send_message message
    end
  end
end
send_message(message) click to toggle source
# File lib/omnibot/amqpconsumer.rb, line 9
def send_message(message)
  @omnibot.add_message [Time.now, message]
rescue Object => e
  OmniLog::error "Sending message error: #{e.message}\ntrace:\n#{Helpers::backtrace e}\nIgnoring..."
end
start() click to toggle source

Main AMQP loop

# File lib/omnibot/amqpconsumer.rb, line 56
def start
  # exit hook
  Signal.trap('INT') do
    OmniLog::info "It's a trap, should exit..."
    AMQP.stop { EM.stop }
  end

  begin
    exception_cb = proc { |e| OmniLog::error "Cannot connect to AMQP: #{e.message}" }
    Retryable.retryable(tries: 5, sleep: ->(n) { 3**n }, exception_cb: exception_cb, on: AMQP::TCPConnectionFailed) do
      amqp_loop
    end
  rescue => e
    OmniLog::error "AMQP/Jabber setup error: #{e.message}\ntrace:\n#{Helpers::backtrace e}\nExiting..."
    AMQP.stop { EM.stop }
  end

  OmniLog::info 'Exited'
end