module ManageIQ::Messaging::Stomp::Queue

Private Instance Methods

publish_message_impl(options, &block) click to toggle source
# File lib/manageiq/messaging/stomp/queue.rb, line 7
def publish_message_impl(options, &block)
  address, headers = queue_for_publish(options)
  headers[:sender]         = options[:sender] if options[:sender]
  headers[:message_type]   = options[:message] if options[:message]
  headers[:class_name]     = options[:class_name] if options[:class_name]
  headers[:correlation_id] = Time.now.to_i.to_s if block_given?

  raw_publish(address, options[:payload] || '', headers)

  return unless block_given?

  receive_response(options[:service], headers[:correlation_id], &block)
end
publish_messages_impl(messages) click to toggle source
# File lib/manageiq/messaging/stomp/queue.rb, line 21
def publish_messages_impl(messages)
  messages.each { |msg_options| publish_message(msg_options) }
end
subscribe_messages_impl(options) { |messages| ... } click to toggle source
# File lib/manageiq/messaging/stomp/queue.rb, line 25
def subscribe_messages_impl(options)
  queue_name, headers = queue_for_subscribe(options)

  # for STOMP we can get message one at a time
  subscribe(queue_name, headers) do |msg|
    begin
      ack(msg) if auto_ack?(options)

      sender = msg.headers['sender']
      message_type = msg.headers['message_type']
      message_body = decode_body(msg.headers, msg.body)
      client_headers = msg.headers.except(*internal_header_keys)

      logger.info("Message received: queue(#{queue_name}), msg(#{payload_log(message_body)}), headers(#{msg.headers})")

      messages = [
        ManageIQ::Messaging::ReceivedMessage.new(sender, message_type, message_body, client_headers, msg, self)
      ]

      result = yield messages
      logger.info("Message processed")

      correlation_ref = msg.headers['correlation_id']
      if correlation_ref
        result = result.first if result.kind_of?(Array)
        send_response(options[:service], correlation_ref, result)
      end
    rescue => e
      logger.error("Message processing error: #{e.message}")
      logger.error(e.backtrace.join("\n"))
      raise
    end
  end
end