module ManageIQ::Messaging::Stomp::Topic
Private Instance Methods
publish_topic_impl(messages)
click to toggle source
# File lib/manageiq/messaging/stomp/topic.rb, line 15 def publish_topic_impl(messages) messages.each { |message| publish_topic_single(message) } end
publish_topic_single(options)
click to toggle source
# File lib/manageiq/messaging/stomp/topic.rb, line 7 def publish_topic_single(options) address, headers = topic_for_publish(options) headers[:sender] = options[:sender] if options[:sender] headers[:event_type] = options[:event] if options[:event] raw_publish(address, options[:payload], headers) end
subscribe_topic_impl(options) { |received_message| ... }
click to toggle source
# File lib/manageiq/messaging/stomp/topic.rb, line 19 def subscribe_topic_impl(options) queue_name, headers = topic_for_subscribe(options) subscribe(queue_name, headers) do |event| begin ack(event) if auto_ack?(options) sender = event.headers['sender'] event_type = event.headers['event_type'] client_headers = event.headers.except(*internal_header_keys) event_body = decode_body(event.headers, event.body) logger.info("Event received: queue(#{queue_name}), event(#{event_body}), headers(#{event.headers})") yield ManageIQ::Messaging::ReceivedMessage.new(sender, event_type, event_body, client_headers, event, self) logger.info("Event processed") rescue => e logger.error("Event processing error: #{e.message}") logger.error(e.backtrace.join("\n")) raise end end end