class Kyklos::Adapters::ShoryukenAdapter

Attributes

queue_url[R]

Public Class Methods

new(*args) click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 30
def initialize(*args)
  @queue_url = args[0]
end

Public Instance Methods

assign_cloudwatchevents(job_id:, rule_name_prefix:, rule:) click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 34
def assign_cloudwatchevents(job_id:, rule_name_prefix:, rule:)
  assign_queue_policy(job_id, rule_name_prefix, rule.arn)
  [
      {
          id: target_id(job_id),
          arn: target_arn,
          input: {
              job_id: job_id
          }.to_json
      }
  ]
end

Private Instance Methods

add_statement(statements, new_sttement) click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 104
def add_statement(statements, new_sttement)
  index = statements.find_index{|statement| statement['Sid'] == new_sttement['Sid'] }
  if index
    statements[index] = new_sttement
  else
    statements.push(new_sttement)
  end
  statements
end
assign_queue_policy(job_id, rule_name_prefix, rule_arn) click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 61
def assign_queue_policy(job_id, rule_name_prefix, rule_arn)
  policy = get_queue_policy
  new_statement = {
      'Sid' => rule_name_prefix.to_s,
      'Effect' => 'Allow',
      'Principal' => {
          "AWS" => '*'
      },
      'Action' =>  'sqs:SendMessage',
      'Resource' => target_arn,
      'Condition' => {
          'ArnLike' => {
              'aws:SourceArn' => rule_arn_like(rule_name_prefix, rule_arn)
          }
      }
  }
  policy['Statement'] = add_statement(policy['Statement'], new_statement)

  sqs.set_queue_attributes(
      queue_url: queue_url,
      attributes: {
          'Policy' =>  policy.to_json,
      },
  )
end
get_queue_policy() click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 87
def get_queue_policy
  policy_str = sqs.get_queue_attributes(
      queue_url: queue_url,
      attribute_names: ['Policy']
  ).attributes['Policy']

  policy = JSON.load(policy_str)
  unless policy
    policy = {
        'Version' => '2012-10-17',
        'Id' => "#{target_arn}/SQSDefaultPolicy",
        'Statement'=> []
    }
  end
  policy
end
rule_arn_like(rule_name_prefix, rule_arn) click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 114
def rule_arn_like(rule_name_prefix, rule_arn)
  [rule_arn.split(rule_name_prefix).first, rule_name_prefix, '*'].join
end
sqs() click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 118
def sqs
  Aws::SQS::Client.new
end
target_arn() click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 53
def target_arn
  resp = sqs.get_queue_attributes(
      queue_url: @queue_url,
      attribute_names: ['QueueArn']
  )
  resp.attributes['QueueArn']
end
target_id(job_id) click to toggle source
# File lib/kyklos/adapters/shoryuken_adapter.rb, line 49
def target_id(job_id)
  Digest::MD5.hexdigest(job_id.to_s)
end