class Kyklos::Adapters::ShoryukenAdapter
Attributes
queue_url[R]
Public Class Methods
new(*args)
click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 30 def initialize(*args) @queue_url = args[0] end
Public Instance Methods
assign_cloudwatchevents(job_id:, rule_name_prefix:, rule:)
click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 34 def assign_cloudwatchevents(job_id:, rule_name_prefix:, rule:) assign_queue_policy(job_id, rule_name_prefix, rule.arn) [ { id: target_id(job_id), arn: target_arn, input: { job_id: job_id }.to_json } ] end
Private Instance Methods
add_statement(statements, new_sttement)
click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 104 def add_statement(statements, new_sttement) index = statements.find_index{|statement| statement['Sid'] == new_sttement['Sid'] } if index statements[index] = new_sttement else statements.push(new_sttement) end statements end
assign_queue_policy(job_id, rule_name_prefix, rule_arn)
click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 61 def assign_queue_policy(job_id, rule_name_prefix, rule_arn) policy = get_queue_policy new_statement = { 'Sid' => rule_name_prefix.to_s, 'Effect' => 'Allow', 'Principal' => { "AWS" => '*' }, 'Action' => 'sqs:SendMessage', 'Resource' => target_arn, 'Condition' => { 'ArnLike' => { 'aws:SourceArn' => rule_arn_like(rule_name_prefix, rule_arn) } } } policy['Statement'] = add_statement(policy['Statement'], new_statement) sqs.set_queue_attributes( queue_url: queue_url, attributes: { 'Policy' => policy.to_json, }, ) end
get_queue_policy()
click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 87 def get_queue_policy policy_str = sqs.get_queue_attributes( queue_url: queue_url, attribute_names: ['Policy'] ).attributes['Policy'] policy = JSON.load(policy_str) unless policy policy = { 'Version' => '2012-10-17', 'Id' => "#{target_arn}/SQSDefaultPolicy", 'Statement'=> [] } end policy end
rule_arn_like(rule_name_prefix, rule_arn)
click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 114 def rule_arn_like(rule_name_prefix, rule_arn) [rule_arn.split(rule_name_prefix).first, rule_name_prefix, '*'].join end
sqs()
click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 118 def sqs Aws::SQS::Client.new end
target_arn()
click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 53 def target_arn resp = sqs.get_queue_attributes( queue_url: @queue_url, attribute_names: ['QueueArn'] ) resp.attributes['QueueArn'] end
target_id(job_id)
click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 49 def target_id(job_id) Digest::MD5.hexdigest(job_id.to_s) end