class StompPublisher

Constants

FRAME_READ_SIZE
MAX_FRAME_SIZE
VERSION

Public Class Methods

new(host: "localhost", port: 61613, login: nil, passcode: nil, vhost: host, **socket_args) click to toggle source
# File lib/stomp_publisher.rb, line 12
def initialize(host: "localhost", port: 61613, login: nil, passcode: nil, vhost: host, **socket_args)
  @host = host
  @port = port
  @login = login or raise ArgumentError.new("missing argument login")
  @passcode = passcode or raise ArgumentError.new("missing argument passcode")
  @vhost = vhost
  @socket_args = socket_args
end

Public Instance Methods

publish(queue, message, **properties) click to toggle source
# File lib/stomp_publisher.rb, line 21
def publish(queue, message, **properties)
  socket = TCPTimeout::TCPSocket.new(@host, @port, **@socket_args)
  connect(socket, @login, @passcode, @vhost)
  send(socket, message, properties.merge(destination: "/queue/#{queue}"))
end

Protected Instance Methods

connect(socket, login, passcode, vhost) click to toggle source
# File lib/stomp_publisher.rb, line 28
def connect(socket, login, passcode, vhost)
  header = Header.new(
    login: login,
    passcode: passcode,
    host: vhost,
    "accept-version" => "1.2"
  )
  frame = Frame.new("CONNECT", header)
  socket.write(frame.to_s)

  response_frame = read_frame(socket)
  if (response_frame.command != "CONNECTED")
    raise ConnectionError.new("Failed to login", response_frame)
  end
end
read_frame(socket) click to toggle source
# File lib/stomp_publisher.rb, line 60
def read_frame(socket)
  response = ""
  begin
    response << socket.readpartial(FRAME_READ_SIZE)
    if (response.bytesize > MAX_FRAME_SIZE)
      raise ConnectionError.new("Frame was larger than the max size of #{MAX_FRAME_SIZE}", nil)
    end
    Frame.parse(response)
  rescue Frame::InvalidFrame
    retry
  end
end
send(socket, message, receipt_id: SecureRandom.hex(16), **properties) click to toggle source
# File lib/stomp_publisher.rb, line 44
def send(socket, message, receipt_id: SecureRandom.hex(16), **properties)
  frame = Frame.new("SEND", Header.new(properties.merge(receipt: receipt_id)), message)
  socket.write(frame.to_s)

  response_frame = read_frame(socket)
  if (response_frame.command == "ERROR")
    raise ConnectionError.new("Connection error", response_frame)
  elsif (response_frame.command != "RECEIPT")
    raise ConnectionError.new("Unexpected response: #{response_frame.command}", response_frame)
  elsif ((response_receipt = response_frame.header["receipt-id"]) != receipt_id)
    raise ConnectionError.new("Unexpected receipt id: #{response_receipt}", response_frame)
  end

  receipt_id
end