class Msgr::Client
Attributes
config[R]
mutex[R]
Public Class Methods
new(config = {})
click to toggle source
# File lib/msgr/client.rb, line 13 def initialize(config = {}) @config = { host: '127.0.0.1', vhost: '/', max: 2 } @config.merge! parse(config.delete(:uri)) if config[:uri] @config.merge! config.symbolize_keys @mutex = ::Mutex.new @routes = Routes.new @pid ||= ::Process.pid log(:debug) { "Created new client on process ##{@pid}..." } end
Public Instance Methods
connect()
click to toggle source
# File lib/msgr/client.rb, line 68 def connect mutex.synchronize do check_process! return if connection.running? log(:debug) { "Connect to #{uri}..." } connection.connect end end
drain()
click to toggle source
Purge all queues known to Msgr
, if they exist.
# File lib/msgr/client.rb, line 107 def drain @routes.each do |route| connection.purge_queue(route.name) end end
publish(payload, opts = {})
click to toggle source
# File lib/msgr/client.rb, line 113 def publish(payload, opts = {}) mutex.synchronize do check_process! sync_publish payload, opts end end
purge(release: false)
click to toggle source
# File lib/msgr/client.rb, line 94 def purge(release: false) mutex.synchronize do check_process! log(:debug) { "Purge all queues on #{uri}..." } connection.purge(release: release) end end
release()
click to toggle source
# File lib/msgr/client.rb, line 126 def release mutex.synchronize do check_process! return unless running? connection.release end end
routes()
click to toggle source
# File lib/msgr/client.rb, line 120 def routes mutex.synchronize do @routes end end
running?()
click to toggle source
# File lib/msgr/client.rb, line 48 def running? mutex.synchronize do check_process! connection.running? end end
start()
click to toggle source
# File lib/msgr/client.rb, line 55 def start mutex.synchronize do check_process! return if connection.running? log(:debug) { "Start on #{uri}..." } @routes << config[:routing_file] if config[:routing_file].present? @routes.reload connection.bind(@routes) end end
stop(opts = {})
click to toggle source
# File lib/msgr/client.rb, line 79 def stop(opts = {}) mutex.synchronize do check_process! log(:debug) { "Stop on #{uri}..." } connection.release connection.delete if opts[:delete] connection.close dispatcher.shutdown reset end end
uri()
click to toggle source
# File lib/msgr/client.rb, line 30 def uri @uri = begin uri = ::URI.parse('amqp://localhost') uri.user = CGI.escape(config[:user]) if config.key?(:user) uri.password = '****' if config.key?(:pass) uri.host = config[:host] if config.key?(:host) uri.port = config[:port] if config.key?(:port) uri.scheme = config[:ssl] ? 'amqps' : 'amqp' if config.key?(:vhost) && config[:vhost] != '/' uri.path = "/#{CGI.escape(config[:vhost])}" end uri end end
Private Instance Methods
check_process!()
click to toggle source
# File lib/msgr/client.rb, line 148 def check_process! return if ::Process.pid == @pid log(:warn) do "Fork detected. Reset internal state. (Old PID: #{@pid} / " \ "New PID: #{::Process.pid}" end reset @pid = ::Process.pid end
connection()
click to toggle source
# File lib/msgr/client.rb, line 160 def connection @connection ||= Connection.new(uri, config, dispatcher).tap do log(:debug) { 'Created new connection..' } end end
dispatcher()
click to toggle source
# File lib/msgr/client.rb, line 166 def dispatcher @dispatcher ||= Dispatcher.new(config).tap do log(:debug) { 'Created new dispatcher..' } end end
parse(uri)
click to toggle source
# File lib/msgr/client.rb, line 180 def parse(uri) # Legacy parsing of URI configuration; does not follow usual # AMQP vhost encoding but used regular URL path uri = ::URI.parse(uri) config = {} config[:user] ||= uri.user if uri.user config[:pass] ||= uri.password if uri.password config[:host] ||= uri.host if uri.host config[:port] ||= uri.port if uri.port config[:vhost] ||= uri.path unless uri.path.empty? config[:ssl] ||= uri.scheme.casecmp('amqps').zero? config end
reset()
click to toggle source
# File lib/msgr/client.rb, line 172 def reset @connection = nil @pool = nil @channel = nil @bindings = nil @dispatcher = nil end
sync_publish(payload, opts)
click to toggle source
# File lib/msgr/client.rb, line 137 def sync_publish(payload, opts) opts[:content_type] ||= 'application/json' sync_publish_message JSON.dump(payload).to_s, opts end
sync_publish_message(message, opts)
click to toggle source
# File lib/msgr/client.rb, line 142 def sync_publish_message(message, opts) connection.publish message, opts end