class Msgr::Client

Attributes

config[R]
mutex[R]

Public Class Methods

new(config = {}) click to toggle source
# File lib/msgr/client.rb, line 13
def initialize(config = {})
  @config = {
    host: '127.0.0.1',
    vhost: '/',
    max: 2
  }

  @config.merge! parse(config.delete(:uri)) if config[:uri]
  @config.merge! config.symbolize_keys

  @mutex  = ::Mutex.new
  @routes = Routes.new
  @pid ||= ::Process.pid

  log(:debug) { "Created new client on process ##{@pid}..." }
end

Public Instance Methods

connect() click to toggle source
# File lib/msgr/client.rb, line 68
def connect
  mutex.synchronize do
    check_process!
    return if connection.running?

    log(:debug) { "Connect to #{uri}..." }

    connection.connect
  end
end
drain() click to toggle source

Purge all queues known to Msgr, if they exist.

# File lib/msgr/client.rb, line 107
def drain
  @routes.each do |route|
    connection.purge_queue(route.name)
  end
end
publish(payload, opts = {}) click to toggle source
# File lib/msgr/client.rb, line 113
def publish(payload, opts = {})
  mutex.synchronize do
    check_process!
    sync_publish payload, opts
  end
end
purge(release: false) click to toggle source
# File lib/msgr/client.rb, line 94
def purge(release: false)
  mutex.synchronize do
    check_process!

    log(:debug) { "Purge all queues on #{uri}..." }

    connection.purge(release: release)
  end
end
release() click to toggle source
# File lib/msgr/client.rb, line 126
def release
  mutex.synchronize do
    check_process!
    return unless running?

    connection.release
  end
end
routes() click to toggle source
# File lib/msgr/client.rb, line 120
def routes
  mutex.synchronize do
    @routes
  end
end
running?() click to toggle source
# File lib/msgr/client.rb, line 48
def running?
  mutex.synchronize do
    check_process!
    connection.running?
  end
end
start() click to toggle source
# File lib/msgr/client.rb, line 55
def start
  mutex.synchronize do
    check_process!
    return if connection.running?

    log(:debug) { "Start on #{uri}..." }

    @routes << config[:routing_file] if config[:routing_file].present?
    @routes.reload
    connection.bind(@routes)
  end
end
stop(opts = {}) click to toggle source
# File lib/msgr/client.rb, line 79
def stop(opts = {})
  mutex.synchronize do
    check_process!

    log(:debug) { "Stop on #{uri}..." }

    connection.release
    connection.delete if opts[:delete]
    connection.close
    dispatcher.shutdown

    reset
  end
end
uri() click to toggle source
# File lib/msgr/client.rb, line 30
def uri
  @uri = begin
    uri = ::URI.parse('amqp://localhost')

    uri.user     = CGI.escape(config[:user]) if config.key?(:user)
    uri.password = '****'                    if config.key?(:pass)
    uri.host     = config[:host]             if config.key?(:host)
    uri.port     = config[:port]             if config.key?(:port)
    uri.scheme   = config[:ssl] ? 'amqps' : 'amqp'

    if config.key?(:vhost) && config[:vhost] != '/'
      uri.path = "/#{CGI.escape(config[:vhost])}"
    end

    uri
  end
end

Private Instance Methods

check_process!() click to toggle source
# File lib/msgr/client.rb, line 148
def check_process!
  return if ::Process.pid == @pid

  log(:warn) do
    "Fork detected. Reset internal state. (Old PID: #{@pid} / " \
    "New PID: #{::Process.pid}"
  end

  reset
  @pid = ::Process.pid
end
connection() click to toggle source
# File lib/msgr/client.rb, line 160
def connection
  @connection ||= Connection.new(uri, config, dispatcher).tap do
    log(:debug) { 'Created new connection..' }
  end
end
dispatcher() click to toggle source
# File lib/msgr/client.rb, line 166
def dispatcher
  @dispatcher ||= Dispatcher.new(config).tap do
    log(:debug) { 'Created new dispatcher..' }
  end
end
parse(uri) click to toggle source
# File lib/msgr/client.rb, line 180
def parse(uri)
  # Legacy parsing of URI configuration; does not follow usual
  # AMQP vhost encoding but used regular URL path
  uri = ::URI.parse(uri)

  config = {}
  config[:user] ||= uri.user if uri.user
  config[:pass] ||= uri.password if uri.password
  config[:host] ||= uri.host     if uri.host
  config[:port] ||= uri.port     if uri.port
  config[:vhost] ||= uri.path unless uri.path.empty?
  config[:ssl]   ||= uri.scheme.casecmp('amqps').zero?

  config
end
reset() click to toggle source
# File lib/msgr/client.rb, line 172
def reset
  @connection = nil
  @pool       = nil
  @channel    = nil
  @bindings   = nil
  @dispatcher = nil
end
sync_publish(payload, opts) click to toggle source
# File lib/msgr/client.rb, line 137
def sync_publish(payload, opts)
  opts[:content_type] ||= 'application/json'
  sync_publish_message JSON.dump(payload).to_s, opts
end
sync_publish_message(message, opts) click to toggle source
# File lib/msgr/client.rb, line 142
def sync_publish_message(message, opts)
  connection.publish message, opts
end