class Queuel::SQS::Message

Constants

MAX_KNOWN_BYTE_SIZE

Public Instance Methods

delete() click to toggle source
# File lib/queuel/sqs/message.rb, line 11
def delete
  message_object.delete
end
raw_body() click to toggle source
# File lib/queuel/sqs/message.rb, line 7
def raw_body
  @raw_body ||= message_object ? pull_message : push_message
end

Private Instance Methods

generate_key() click to toggle source
# File lib/queuel/sqs/message.rb, line 85
def generate_key
  key = [
    (Time.now.to_f * 10000).to_i,
    SecureRandom.urlsafe_base64,
    Thread.current.object_id
  ].join('-')
  key
end
max_bytesize() click to toggle source
# File lib/queuel/sqs/message.rb, line 45
def max_bytesize
  options[:max_bytesize] || Queuel::SQS::Message::MAX_KNOWN_BYTE_SIZE * 1024
end
pull_message() click to toggle source
# File lib/queuel/sqs/message.rb, line 31
def pull_message
  begin
    decoded_body = decoder.call(message_object.body)
    if decoded_body.key?(:queuel_s3_object)
      s3_transaction(:read, decoded_body[:queuel_s3_object])
    else
      message_object.body
    end
  rescue Queuel::Serialization::Json::SerializationError, TypeError
    raw_body_with_sns_check
  end
end
push_message() click to toggle source
# File lib/queuel/sqs/message.rb, line 21
def push_message
  if encoded_body.bytesize > max_bytesize
    key = generate_key
    s3_transaction(:write, key, encoded_body)
    self.body = { 'queuel_s3_object' => key }
  end
  encoded_body
end
raw_body_with_sns_check() click to toggle source
# File lib/queuel/sqs/message.rb, line 95
def raw_body_with_sns_check
  begin
    message_object.as_sns_message.body
  rescue ::JSON::ParserError, TypeError
    message_object.body
  end
end
s3() click to toggle source
# File lib/queuel/sqs/message.rb, line 50
def s3
  @s3 ||= ::AWS::S3.new(
            :access_key_id => options[:s3_access_key_id],
            :secret_access_key => options[:s3_secret_access_key] )
end
s3_read(bucket, *args) click to toggle source
# File lib/queuel/sqs/message.rb, line 75
def s3_read(bucket, *args)
  bucket.objects[args[0]].read
end
s3_transaction(method, *args) click to toggle source

@method - write or read @args - key and message if writing

# File lib/queuel/sqs/message.rb, line 59
def s3_transaction(method, *args)
  bucket_name = options[:s3_bucket_name]
  raise NoBucketNameSupplied if bucket_name.nil?
  my_bucket = s3.buckets[bucket_name]
  if my_bucket.exists?
    begin
      send("s3_#{method}", my_bucket, *args)
    rescue AWS::S3::Errors::AccessDenied => e
      raise InsufficientPermissions, "Unable to read from bucket: #{e.message}"
    end
  else
    raise BucketDoesNotExistError, "Bucket has either expired or does not exist"
  end
end
s3_write(bucket, *args) click to toggle source
# File lib/queuel/sqs/message.rb, line 80
def s3_write(bucket, *args)
  bucket.objects[args[0]].write(args[1])
end