class StompPublisher
Constants
- FRAME_READ_SIZE
- MAX_FRAME_SIZE
- VERSION
Public Class Methods
new(host: "localhost", port: 61613, login: nil, passcode: nil, vhost: host, **socket_args)
click to toggle source
# File lib/stomp_publisher.rb, line 12 def initialize(host: "localhost", port: 61613, login: nil, passcode: nil, vhost: host, **socket_args) @host = host @port = port @login = login or raise ArgumentError.new("missing argument login") @passcode = passcode or raise ArgumentError.new("missing argument passcode") @vhost = vhost @socket_args = socket_args end
Public Instance Methods
publish(queue, message, **properties)
click to toggle source
# File lib/stomp_publisher.rb, line 21 def publish(queue, message, **properties) socket = TCPTimeout::TCPSocket.new(@host, @port, **@socket_args) connect(socket, @login, @passcode, @vhost) send(socket, message, properties.merge(destination: "/queue/#{queue}")) end
Protected Instance Methods
connect(socket, login, passcode, vhost)
click to toggle source
# File lib/stomp_publisher.rb, line 28 def connect(socket, login, passcode, vhost) header = Header.new( login: login, passcode: passcode, host: vhost, "accept-version" => "1.2" ) frame = Frame.new("CONNECT", header) socket.write(frame.to_s) response_frame = read_frame(socket) if (response_frame.command != "CONNECTED") raise ConnectionError.new("Failed to login", response_frame) end end
read_frame(socket)
click to toggle source
# File lib/stomp_publisher.rb, line 60 def read_frame(socket) response = "" begin response << socket.readpartial(FRAME_READ_SIZE) if (response.bytesize > MAX_FRAME_SIZE) raise ConnectionError.new("Frame was larger than the max size of #{MAX_FRAME_SIZE}", nil) end Frame.parse(response) rescue Frame::InvalidFrame retry end end
send(socket, message, receipt_id: SecureRandom.hex(16), **properties)
click to toggle source
# File lib/stomp_publisher.rb, line 44 def send(socket, message, receipt_id: SecureRandom.hex(16), **properties) frame = Frame.new("SEND", Header.new(properties.merge(receipt: receipt_id)), message) socket.write(frame.to_s) response_frame = read_frame(socket) if (response_frame.command == "ERROR") raise ConnectionError.new("Connection error", response_frame) elsif (response_frame.command != "RECEIPT") raise ConnectionError.new("Unexpected response: #{response_frame.command}", response_frame) elsif ((response_receipt = response_frame.header["receipt-id"]) != receipt_id) raise ConnectionError.new("Unexpected receipt id: #{response_receipt}", response_frame) end receipt_id end