module ManageIQ::Messaging::Stomp::Common

Private Instance Methods

internal_header_keys() click to toggle source
# File lib/manageiq/messaging/stomp/common.rb, line 70
def internal_header_keys
  [:"destination-type", :persistent, :expires, :AMQ_SCHEDULED_TIME, :priority, :_AMQ_GROUP_ID]
end
queue_for_publish(options) click to toggle source
# File lib/manageiq/messaging/stomp/common.rb, line 15
def queue_for_publish(options)
  affinity = options[:affinity] || 'none'
  address = "queue/#{options[:service]}.#{affinity}"

  headers = {:"destination-type" => 'ANYCAST', :persistent => true}
  headers.merge!(options[:headers].except(*internal_header_keys)) if options.key?(:headers)

  headers[:expires]            = options[:expires_on].to_i * 1000 if options[:expires_on]
  headers[:AMQ_SCHEDULED_TIME] = options[:deliver_on].to_i * 1000 if options[:deliver_on]
  headers[:priority]           = options[:priority] if options[:priority]
  headers[:_AMQ_GROUP_ID]      = options[:group_name] if options[:group_name]

  [address, headers]
end
queue_for_subscribe(options) click to toggle source
# File lib/manageiq/messaging/stomp/common.rb, line 30
def queue_for_subscribe(options)
  affinity = options[:affinity] || 'none'
  queue_name = "queue/#{options[:service]}.#{affinity}"

  headers = {:"subscription-type" => 'ANYCAST', :ack => 'client'}

  [queue_name, headers]
end
raw_publish(address, body, headers) click to toggle source
# File lib/manageiq/messaging/stomp/common.rb, line 10
def raw_publish(address, body, headers)
  publish(address, encode_body(headers, body), headers)
  logger.info("Published to address(#{address}), msg(#{payload_log(body.inspect)}), headers(#{headers.inspect})")
end
receive_response(service, correlation_ref) { |decode_body(headers, body)| ... } click to toggle source
# File lib/manageiq/messaging/stomp/common.rb, line 74
def receive_response(service, correlation_ref)
  response_options = {
    :service  => "#{service}.response",
    :affinity => correlation_ref
  }
  queue_name, response_headers = queue_for_subscribe(response_options)
  subscribe(queue_name, response_headers) do |msg|
    ack(msg)
    begin
      yield decode_body(msg.headers, msg.body)
    ensure
      unsubscribe(queue_name)
    end
  end
end
send_response(service, correlation_ref, result) click to toggle source
# File lib/manageiq/messaging/stomp/common.rb, line 61
def send_response(service, correlation_ref, result)
  response_options = {
    :service  => "#{service}.response",
    :affinity => correlation_ref
  }
  address, response_headers = queue_for_publish(response_options)
  raw_publish(address, result || '', response_headers.merge(:correlation_id => correlation_ref))
end
topic_for_publish(options) click to toggle source
# File lib/manageiq/messaging/stomp/common.rb, line 39
def topic_for_publish(options)
  address = "topic/#{options[:service]}"

  headers = {:"destination-type" => 'MULTICAST', :persistent => true}
  headers.merge!(options[:headers].except(*internal_header_keys)) if options.key?(:headers)

  headers[:expires]            = options[:expires_on].to_i * 1000 if options[:expires_on]
  headers[:AMQ_SCHEDULED_TIME] = options[:deliver_on].to_i * 1000 if options[:deliver_on]
  headers[:priority]           = options[:priority] if options[:priority]

  [address, headers]
end
topic_for_subscribe(options) click to toggle source
# File lib/manageiq/messaging/stomp/common.rb, line 52
def topic_for_subscribe(options)
  queue_name = "topic/#{options[:service]}"

  headers = {:"subscription-type" => 'MULTICAST', :ack => 'client'}
  headers[:"durable-subscription-name"] = options[:persist_ref] if options[:persist_ref]

  [queue_name, headers]
end