class Legion::Transport::Message
Public Class Methods
new(**options)
click to toggle source
# File lib/legion/transport/message.rb, line 6 def initialize(**options) @options = options validate end
Public Instance Methods
app_id()
click to toggle source
# File lib/legion/transport/message.rb, line 28 def app_id @options[:app_id] if @options.key? :app_id 'legion' end
channel()
click to toggle source
# File lib/legion/transport/message.rb, line 144 def channel Legion::Transport::Connection.channel end
content_encoding()
click to toggle source
# File lib/legion/transport/message.rb, line 128 def content_encoding 'identity' end
content_type()
click to toggle source
# File lib/legion/transport/message.rb, line 124 def content_type 'application/json' end
correlation_id()
click to toggle source
ID of the message that this message is a reply to
# File lib/legion/transport/message.rb, line 48 def correlation_id nil end
encode_message()
click to toggle source
# File lib/legion/transport/message.rb, line 74 def encode_message message_payload = message message_payload = Legion::JSON.dump(message_payload) unless message_payload.is_a? String if encrypt? encrypted = Legion::Crypt.encrypt(message_payload) headers[:iv] = encrypted[:iv] @options[:content_encoding] = 'encrypted/cs' return encrypted[:enciphered_message] else @options[:content_encoding] = 'identity' end message_payload end
encrypt?()
click to toggle source
# File lib/legion/transport/message.rb, line 94 def encrypt? Legion::Settings[:transport][:messages][:encrypt] && Legion::Settings[:crypt][:cs_encrypt_ready] end
encrypt_message(message, _type = 'cs')
click to toggle source
# File lib/legion/transport/message.rb, line 90 def encrypt_message(message, _type = 'cs') Legion::Crypt.encrypt(message) end
exchange()
click to toggle source
# File lib/legion/transport/message.rb, line 103 def exchange Kernel.const_get(exchange_name) end
exchange_name()
click to toggle source
# File lib/legion/transport/message.rb, line 98 def exchange_name lex = self.class.ancestors.first.to_s.split('::')[2].downcase "Legion::Extensions::#{lex.capitalize}::Transport::Exchanges::#{lex.capitalize}" end
expiration()
click to toggle source
# File lib/legion/transport/message.rb, line 56 def expiration if @options.key? :expiration @options[:expiration] elsif @options.key? :ttl @options[:ttl] elsif Legion::Transport.settings[:messages].key? :expiration Legion::Transport.settings[:messages][:expiration] end end
headers()
click to toggle source
# File lib/legion/transport/message.rb, line 107 def headers @options[:headers] ||= Concurrent::Hash.new %i[task_id relationship_id trigger_namespace_id trigger_function_id parent_id master_id runner_namespace runner_class namespace_id function_id function chain_id debug].each do |header| # rubocop:disable Layout/LineLength next unless @options.key? header @options[:headers][header] = @options[header].to_s end @options[:headers] rescue StandardError => e Legion::Transport.logger.error e.message Legion::Transport.logger.error e.backtrace end
message()
click to toggle source
# File lib/legion/transport/message.rb, line 66 def message @options end
message_id()
click to toggle source
# File lib/legion/transport/message.rb, line 34 def message_id @options[:message_id] || @options[:task_id] end
persistent()
click to toggle source
# File lib/legion/transport/message.rb, line 52 def persistent @options[:persistent] || Legion::Transport.settings[:messages][:persistent] end
priority()
click to toggle source
# File lib/legion/transport/message.rb, line 120 def priority 0 end
publish(options = @options)
click to toggle source
# File lib/legion/transport/message.rb, line 11 def publish(options = @options) # rubocop:disable Metrics/AbcSize raise unless @valid exchange_dest = exchange.respond_to?(:new) ? exchange.new : exchange exchange_dest.publish(encode_message, routing_key: routing_key || '', content_type: options[:content_type] || content_type, content_encoding: options[:content_encoding] || content_encoding, type: options[:type] || type, priority: options[:priority] || priority, expiration: options[:expiration] || expiration, headers: headers, persistent: persistent, message_id: message_id, timestamp: timestamp) end
reply_to()
click to toggle source
# File lib/legion/transport/message.rb, line 43 def reply_to @options[:reply_to] end
routing_key()
click to toggle source
# File lib/legion/transport/message.rb, line 70 def routing_key @options[:routing_key] if @options.key? :routing_key end
timestamp()
click to toggle source
# File lib/legion/transport/message.rb, line 136 def timestamp Time.now.to_i end
type()
click to toggle source
# File lib/legion/transport/message.rb, line 132 def type 'task' end
user_id()
click to toggle source
user_id
Sender's identifier. www.rabbitmq.com/extensions.html#validated-user-id
# File lib/legion/transport/message.rb, line 39 def user_id @options[:user_id] || Legion::Transport.settings[:connection][:user] end
validate()
click to toggle source
# File lib/legion/transport/message.rb, line 140 def validate @valid = true end