class AMQP::Client::Properties

Encode/decode AMQP Properties

Attributes

app_id[RW]

User-definable, but often indicates which app that generated the message @return [String, nil]

content_encoding[RW]

Content encoding of the body @return [String, nil]

content_type[RW]

Content type of the message body @return [String, nil]

correlation_id[RW]

A correlation id, most often used used for RPC communication @return [Integer, nil]

delivery_mode[RW]

2 for persisted message, transient messages for all other values @return [Integer, nil]

expiration[RW]

Number of seconds the message will stay in the queue @return [Integer] @return [String] @return [nil]

headers[RW]

Custom headers @return [Hash<String, Object>, nil]

message_id[RW]

@return [String, nil]

priority[RW]

A priority of the message (between 0 and 255) @return [Integer, nil]

reply_to[RW]

Queue to reply RPC responses to @return [String, nil]

timestamp[RW]

User-definable, but often used for the time the message was originally generated @return [Date, nil]

type[RW]

User-definable, but can can indicate what kind of message this is @return [String, nil]

user_id[RW]

User-definable, but can be used to verify that this is the user that published the message @return [String, nil]

Public Class Methods

decode(bytes) click to toggle source

Decode a byte array @return [Properties]

# File lib/amqp/client/properties.rb, line 187
def self.decode(bytes)
  p = new
  flags = bytes.unpack1("S>")
  pos = 2
  if (flags & 0x8000).positive?
    len = bytes.getbyte(pos)
    pos += 1
    p.content_type = bytes.byteslice(pos, len).force_encoding("utf-8")
    pos += len
  end
  if (flags & 0x4000).positive?
    len = bytes.getbyte(pos)
    pos += 1
    p.content_encoding = bytes.byteslice(pos, len).force_encoding("utf-8")
    pos += len
  end
  if (flags & 0x2000).positive?
    len = bytes.byteslice(pos, 4).unpack1("L>")
    pos += 4
    p.headers = Table.decode(bytes.byteslice(pos, len))
    pos += len
  end
  if (flags & 0x1000).positive?
    p.delivery_mode = bytes.getbyte(pos)
    pos += 1
  end
  if (flags & 0x0800).positive?
    p.priority = bytes.getbyte(pos)
    pos += 1
  end
  if (flags & 0x0400).positive?
    len = bytes.getbyte(pos)
    pos += 1
    p.correlation_id = bytes.byteslice(pos, len).force_encoding("utf-8")
    pos += len
  end
  if (flags & 0x0200).positive?
    len = bytes.getbyte(pos)
    pos += 1
    p.reply_to = bytes.byteslice(pos, len).force_encoding("utf-8")
    pos += len
  end
  if (flags & 0x0100).positive?
    len = bytes.getbyte(pos)
    pos += 1
    p.expiration = bytes.byteslice(pos, len).force_encoding("utf-8")
    pos += len
  end
  if (flags & 0x0080).positive?
    len = bytes.getbyte(pos)
    pos += 1
    p.message_id = bytes.byteslice(pos, len).force_encoding("utf-8")
    pos += len
  end
  if (flags & 0x0040).positive?
    p.timestamp = Time.at(bytes.byteslice(pos, 8).unpack1("Q>"))
    pos += 8
  end
  if (flags & 0x0020).positive?
    len = bytes.getbyte(pos)
    pos += 1
    p.type = bytes.byteslice(pos, len).force_encoding("utf-8")
    pos += len
  end
  if (flags & 0x0010).positive?
    len = bytes.getbyte(pos)
    pos += 1
    p.user_id = bytes.byteslice(pos, len).force_encoding("utf-8")
    pos += len
  end
  if (flags & 0x0008).positive?
    len = bytes.getbyte(pos)
    pos += 1
    p.app_id = bytes.byteslice(pos, len).force_encoding("utf-8")
  end
  p
end
new(content_type: nil, content_encoding: nil, headers: nil, delivery_mode: nil, priority: nil, correlation_id: nil, reply_to: nil, expiration: nil, message_id: nil, timestamp: nil, type: nil, user_id: nil, app_id: nil) click to toggle source
# File lib/amqp/client/properties.rb, line 9
def initialize(content_type: nil, content_encoding: nil, headers: nil, delivery_mode: nil, priority: nil, correlation_id: nil,
               reply_to: nil, expiration: nil, message_id: nil, timestamp: nil, type: nil, user_id: nil, app_id: nil)
  @content_type = content_type
  @content_encoding = content_encoding
  @headers = headers
  @delivery_mode = delivery_mode
  @priority = priority
  @correlation_id = correlation_id
  @reply_to = reply_to
  @expiration = expiration
  @message_id = message_id
  @timestamp = timestamp
  @type = type
  @user_id = user_id
  @app_id = app_id
end

Public Instance Methods

encode() click to toggle source

Encode properties into a byte array @return [String] byte array

# File lib/amqp/client/properties.rb, line 69
def encode
  flags = 0
  arr = [flags]
  fmt = StringIO.new(String.new("S>", capacity: 35))
  fmt.pos = 2

  if content_type
    content_type.is_a?(String) || raise(ArgumentError, "content_type must be a string")

    flags |= (1 << 15)
    arr << content_type.bytesize << content_type
    fmt << "Ca*"
  end

  if content_encoding
    content_encoding.is_a?(String) || raise(ArgumentError, "content_encoding must be a string")

    flags |= (1 << 14)
    arr << content_encoding.bytesize << content_encoding
    fmt << "Ca*"
  end

  if headers
    headers.is_a?(Hash) || raise(ArgumentError, "headers must be a hash")

    flags |= (1 << 13)
    tbl = Table.encode(headers)
    arr << tbl.bytesize << tbl
    fmt << "L>a*"
  end

  if delivery_mode
    delivery_mode.is_a?(Integer) || raise(ArgumentError, "delivery_mode must be an int")
    delivery_mode.between?(0, 2) || raise(ArgumentError, "delivery_mode must be be between 0 and 2")

    flags |= (1 << 12)
    arr << delivery_mode
    fmt << "C"
  end

  if priority
    priority.is_a?(Integer) || raise(ArgumentError, "priority must be an int")
    flags |= (1 << 11)
    arr << priority
    fmt << "C"
  end

  if correlation_id
    priority.is_a?(String) || raise(ArgumentError, "correlation_id must be a string")

    flags |= (1 << 10)
    arr << correlation_id.bytesize << correlation_id
    fmt << "Ca*"
  end

  if reply_to
    reply_to.is_a?(String) || raise(ArgumentError, "reply_to must be a string")

    flags |= (1 << 9)
    arr << reply_to.bytesize << reply_to
    fmt << "Ca*"
  end

  if expiration
    self.expiration = expiration.to_s if expiration.is_a?(Integer)
    expiration.is_a?(String) || raise(ArgumentError, "expiration must be a string or integer")

    flags |= (1 << 8)
    arr << expiration.bytesize << expiration
    fmt << "Ca*"
  end

  if message_id
    message_id.is_a?(String) || raise(ArgumentError, "message_id must be a string")

    flags |= (1 << 7)
    arr << message_id.bytesize << message_id
    fmt << "Ca*"
  end

  if timestamp
    timestamp.is_a?(Integer) || timestamp.is_a?(Time) || raise(ArgumentError, "timestamp must be an Integer or a Time")

    flags |= (1 << 6)
    arr << timestamp.to_i
    fmt << "Q>"
  end

  if type
    type.is_a?(String) || raise(ArgumentError, "type must be a string")

    flags |= (1 << 5)
    arr << type.bytesize << type
    fmt << "Ca*"
  end

  if user_id
    user_id.is_a?(String) || raise(ArgumentError, "user_id must be a string")

    flags |= (1 << 4)
    arr << user_id.bytesize << user_id
    fmt << "Ca*"
  end

  if app_id
    app_id.is_a?(String) || raise(ArgumentError, "app_id must be a string")

    flags |= (1 << 3)
    arr << app_id.bytesize << app_id
    fmt << "Ca*"
  end

  arr[0] = flags
  arr.pack(fmt.string)
end