class Serfx::Connection
This class wraps the low level msgpack data transformation and tcp communication for the RPC session. methods in this module are used to implement the actual RPC commands available via [Commands]
Constants
- COMMANDS
Attributes
Public Class Methods
@param opts [Hash] Specify the RPC connection details @option opts [Symbol] :host ipaddreess of the target serf agent @option opts [Symbol] :port port of target serf agents RPC @option opts [Symbol] :authkey encryption key for RPC communication
# File lib/serfx/connection.rb, line 50 def initialize(opts = {}) @host = opts[:host] || '127.0.0.1' @port = opts[:port] || 7373 @seq = 0 @authkey = opts[:authkey] @requests = {} @responses = {} end
Public Instance Methods
checks if the RPC response header has ‘error` field popular or not raises [RPCError] exception if error string is not empty
@param header [Hash] RPC response header as hash
# File lib/serfx/connection.rb, line 109 def check_rpc_error!(header) fail RPCError, header['Error'] unless header['Error'].nil? || header['Error'].empty? end
read data from tcp socket and pipe it through msgpack unpacker for deserialization
@return [Hash]
# File lib/serfx/connection.rb, line 77 def read_data unpacker.read end
read data from the tcp socket. and convert it to a [Response] object
@param command [String] RPC command name for which response will be read @return [Response]
# File lib/serfx/connection.rb, line 117 def read_response(command) header = read_data check_rpc_error!(header) if COMMANDS[command].include?(:body) body = read_data Response.new(header, body) else Response.new(header) end end
make an RPC request against the serf agent
@param command [String] name of the RPC command @param body [Hash] an optional request body for the RPC command @return [Response]
# File lib/serfx/connection.rb, line 133 def request(command, body = nil) tcp_send(command, body) read_response(command) end
creates a tcp socket if does not exist already, against RPC host/port
@return [TCPSocket]
# File lib/serfx/connection.rb, line 62 def socket @socket ||= TCPSocket.new(host, port) end
takes raw RPC command name and an optional request body and convert them to msgpack encoded data and then send over tcp
@param command [String] RPC command name @param body [Hash] request body of the RPC command
@return [Integer]
# File lib/serfx/connection.rb, line 89 def tcp_send(command, body = nil) @seq += 1 header = { 'Command' => command.to_s.gsub('_', '-'), 'Seq' => seq } Log.info("#{__method__}|Header: #{header.inspect}") buff = MessagePack::Buffer.new buff << header.to_msgpack buff << body.to_msgpack unless body.nil? res = socket.send(buff.to_str, 0) Log.info("#{__method__}|Res: #{res.inspect}") @requests[seq] = { header: header, ack?: false } seq end
creates a MsgPack un-packer object from the tcp socket unless its already present
@return [MessagePack::Unpacker]
# File lib/serfx/connection.rb, line 70 def unpacker @unpacker ||= MessagePack::Unpacker.new(socket) end