class Legion::Transport::Message

Public Class Methods

new(**options) click to toggle source
# File lib/legion/transport/message.rb, line 6
def initialize(**options)
  @options = options
  validate
end

Public Instance Methods

app_id() click to toggle source
# File lib/legion/transport/message.rb, line 28
def app_id
  @options[:app_id] if @options.key? :app_id

  'legion'
end
channel() click to toggle source
# File lib/legion/transport/message.rb, line 144
def channel
  Legion::Transport::Connection.channel
end
content_encoding() click to toggle source
# File lib/legion/transport/message.rb, line 128
def content_encoding
  'identity'
end
content_type() click to toggle source
# File lib/legion/transport/message.rb, line 124
def content_type
  'application/json'
end
correlation_id() click to toggle source

ID of the message that this message is a reply to

# File lib/legion/transport/message.rb, line 48
def correlation_id
  nil
end
encode_message() click to toggle source
# File lib/legion/transport/message.rb, line 74
def encode_message
  message_payload = message
  message_payload = Legion::JSON.dump(message_payload) unless message_payload.is_a? String

  if encrypt?
    encrypted = Legion::Crypt.encrypt(message_payload)
    headers[:iv] = encrypted[:iv]
    @options[:content_encoding] = 'encrypted/cs'
    return encrypted[:enciphered_message]
  else
    @options[:content_encoding] = 'identity'
  end

  message_payload
end
encrypt?() click to toggle source
# File lib/legion/transport/message.rb, line 94
def encrypt?
  Legion::Settings[:transport][:messages][:encrypt] && Legion::Settings[:crypt][:cs_encrypt_ready]
end
encrypt_message(message, _type = 'cs') click to toggle source
# File lib/legion/transport/message.rb, line 90
def encrypt_message(message, _type = 'cs')
  Legion::Crypt.encrypt(message)
end
exchange() click to toggle source
# File lib/legion/transport/message.rb, line 103
def exchange
  Kernel.const_get(exchange_name)
end
exchange_name() click to toggle source
# File lib/legion/transport/message.rb, line 98
def exchange_name
  lex = self.class.ancestors.first.to_s.split('::')[2].downcase
  "Legion::Extensions::#{lex.capitalize}::Transport::Exchanges::#{lex.capitalize}"
end
expiration() click to toggle source
# File lib/legion/transport/message.rb, line 56
def expiration
  if @options.key? :expiration
    @options[:expiration]
  elsif @options.key? :ttl
    @options[:ttl]
  elsif Legion::Transport.settings[:messages].key? :expiration
    Legion::Transport.settings[:messages][:expiration]
  end
end
headers() click to toggle source
# File lib/legion/transport/message.rb, line 107
def headers
  @options[:headers] ||= Concurrent::Hash.new
  %i[task_id relationship_id trigger_namespace_id trigger_function_id parent_id master_id runner_namespace runner_class namespace_id function_id function chain_id debug].each do |header| # rubocop:disable Layout/LineLength
    next unless @options.key? header

    @options[:headers][header] = @options[header].to_s
  end
  @options[:headers]
rescue StandardError => e
  Legion::Transport.logger.error e.message
  Legion::Transport.logger.error e.backtrace
end
message() click to toggle source
# File lib/legion/transport/message.rb, line 66
def message
  @options
end
message_id() click to toggle source
# File lib/legion/transport/message.rb, line 34
def message_id
  @options[:message_id] || @options[:task_id]
end
persistent() click to toggle source
# File lib/legion/transport/message.rb, line 52
def persistent
  @options[:persistent] || Legion::Transport.settings[:messages][:persistent]
end
priority() click to toggle source
# File lib/legion/transport/message.rb, line 120
def priority
  0
end
publish(options = @options) click to toggle source
# File lib/legion/transport/message.rb, line 11
def publish(options = @options) # rubocop:disable Metrics/AbcSize
  raise unless @valid

  exchange_dest = exchange.respond_to?(:new) ? exchange.new : exchange
  exchange_dest.publish(encode_message,
                        routing_key: routing_key || '',
                        content_type: options[:content_type] || content_type,
                        content_encoding: options[:content_encoding] || content_encoding,
                        type: options[:type] || type,
                        priority: options[:priority] || priority,
                        expiration: options[:expiration] || expiration,
                        headers: headers,
                        persistent: persistent,
                        message_id: message_id,
                        timestamp: timestamp)
end
reply_to() click to toggle source
# File lib/legion/transport/message.rb, line 43
def reply_to
  @options[:reply_to]
end
routing_key() click to toggle source
# File lib/legion/transport/message.rb, line 70
def routing_key
  @options[:routing_key] if @options.key? :routing_key
end
timestamp() click to toggle source
# File lib/legion/transport/message.rb, line 136
def timestamp
  Time.now.to_i
end
type() click to toggle source
# File lib/legion/transport/message.rb, line 132
def type
  'task'
end
user_id() click to toggle source

user_id Sender's identifier. www.rabbitmq.com/extensions.html#validated-user-id

# File lib/legion/transport/message.rb, line 39
def user_id
  @options[:user_id] || Legion::Transport.settings[:connection][:user]
end
validate() click to toggle source
# File lib/legion/transport/message.rb, line 140
def validate
  @valid = true
end