class ChiliLogger::AWS::SqsBroker
class that handles errors when message broker can't be reached, etc…
Public Class Methods
new(custom_config = {})
click to toggle source
# File lib/brokers/sqs_broker.rb, line 9 def initialize(custom_config = {}) custom_config ||= {} @default_secrets = Values::Secrets.new.get_secrets_collection('ChiliLoggerSQSCredentials') @default = ChiliLogger::Values::Default.new @queue_name = custom_config[:queue_name] || @default_secrets['queue_name'] @sqs_config = sqs_config(custom_config) @sqs = Aws::SQS::Client.new(@sqs_config) ENV['AWS_REGION'] = @sqs_config[:region] # Aws::SQS::QueuePoller requires region as an env variable @poller = Aws::SQS::QueuePoller.new(queue_url(@queue_name), skip_delete: true) end
Public Instance Methods
delete_message(msg)
click to toggle source
# File lib/brokers/sqs_broker.rb, line 49 def delete_message(msg) @poller.delete_messages([msg]) end
fetch_messages()
click to toggle source
# File lib/brokers/sqs_broker.rb, line 32 def fetch_messages return if ChiliLogger.instance.deactivated messages = [] remaining_msgs = num_of_msgs_in_queue return messages unless remaining_msgs.positive? @poller.poll do |sqs_msg| messages << { fallback_broker_msg: sqs_msg, log: JSON.parse(sqs_msg.body)['body']['log'] } remaining_msgs -= 1 throw :stop_polling if remaining_msgs <= 0 # prevents infinite loop end messages end
publish(message = @default.sqs_message)
click to toggle source
# File lib/brokers/sqs_broker.rb, line 23 def publish(message = @default.sqs_message) return if ChiliLogger.instance.deactivated message_body = { body: message } queue_message = queue_message(message_body) @sqs.send_message(queue_message) end
Private Instance Methods
num_of_msgs_in_queue()
click to toggle source
# File lib/brokers/sqs_broker.rb, line 68 def num_of_msgs_in_queue sqs_queue_attrs = @sqs.get_queue_attributes(queue_url: queue_url(@queue_name), attribute_names: ['All']) sqs_queue_attrs.attributes['ApproximateNumberOfMessages'].to_i end
queue_message(message_body)
click to toggle source
# File lib/brokers/sqs_broker.rb, line 55 def queue_message(message_body) { queue_url: queue_url(@queue_name), message_body: message_body.to_json, message_group_id: SecureRandom.hex(8).to_s, message_deduplication_id: SecureRandom.hex(8).to_s } end
queue_url(queue_name)
click to toggle source
# File lib/brokers/sqs_broker.rb, line 64 def queue_url(queue_name) @sqs.get_queue_url(queue_name: queue_name).queue_url end
sqs_config(custom_config)
click to toggle source
# File lib/brokers/sqs_broker.rb, line 73 def sqs_config(custom_config) default_keys = %i[region access_key_id secret_access_key] config = {} default_keys.each do |key| config[key] = custom_config[key] || @default_secrets[key.to_s] end config end