class Qpid::Proton::Container

An AMQP container manages a set of {Listener}s and {Connection}s which contain {#Sender} and {#Receiver} links to transfer messages. Usually, each AMQP client or server process has a single container for all of its connections and links.

One or more threads can call {#run}, events generated by all the listeners and connections will be dispatched in the {#run} threads.

Attributes

auto_stop[RW]

Auto-stop flag.

True (the default) means that the container will stop automatically, as if {#stop} had been called, when the last listener or connection closes.

False means {#run} will not return unless {#stop} is called.

@return [Bool] auto-stop state

handler[R]

@return [MessagingHandler] The container-wide handler

id[R]

@return [String] unique identifier for this container

stopped[RW]

True if the container has been stopped and can no longer be used. @return [Bool] stopped state

Public Class Methods

new(*args) click to toggle source

Create a new Container @overload initialize(id=nil)

@param id [String,Symbol] A unique ID for this container, use random UUID if nil.

@overload initialize(handler=nil, id=nil)

@param id [String,Symbol] A unique ID for this container, use random UUID if nil.
@param handler [MessagingHandler] Optional default handler for connections
 that do not have their own handler (see {#connect} and {#listen})

 *Note*: For multi-threaded code, it is recommended to use a separate
 handler instance for each connection, as a shared handler may be called
 concurrently.
# File lib/core/container.rb, line 108
def initialize(*args)
  case args.size
  when 2 then @handler, @id = args
  when 1 then
    @id = String.try_convert(args[0]) || (args[0].to_s if args[0].is_a? Symbol)
    @handler = args[0] unless @id
  else raise ArgumentError, "wrong number of arguments (given #{args.size}, expected 0..2"
  end
  # Use an empty messaging adapter to give default behaviour if there's no global handler.
  @adapter = Handler::Adapter.adapt(@handler) || Handler::MessagingAdapter.new(nil)
  @id = (@id || SecureRandom.uuid).freeze

  # Implementation note:
  #
  # - #run threads take work from @work
  # - Each driver and the Container itself is processed by at most one #run thread at a time
  # - The Container thread does IO.select
  # - nil on the @work queue makes a #run thread exit

  @work = Queue.new
  @work << :start << self   # Issue start and start start selecting
  @wake = IO.pipe           # Wakes #run thread in IO.select
  @auto_stop = true         # Stop when @active drops to 0

  # Following instance variables protected by lock
  @lock = Mutex.new
  @active = 0               # All active tasks, in @selectable, @work or being processed
  @selectable = Set.new     # Tasks ready to block in IO.select
  @running = 0              # Count of #run threads
  @stopped = false          # #stop called
  @stop_err = nil           # Optional error to pass to tasks, from #stop
end

Public Instance Methods

connect(url, opts=nil) click to toggle source

Open an AMQP connection.

@param url [String, URI] Open a {TCPSocket} to url.host, url.port. url.scheme must be “amqp” or “amqps”, url.scheme.nil? is treated as “amqp” url.user, url.password are used as defaults if opts, opts are nil @option (see Qpid::Proton::Connection#open) @return [Connection] The new AMQP connection

# File lib/core/container.rb, line 172
def connect(url, opts=nil)
  not_stopped
  url = Qpid::Proton::uri url
  opts ||= {}
  if url.user ||  url.password
    opts[:user] ||= url.user
    opts[:password] ||= url.password
  end
  opts[:ssl_domain] ||= SSLDomain.new(SSLDomain::MODE_CLIENT) if url.scheme == "amqps"
  connect_io(TCPSocket.new(url.host, url.port), opts)
end
connect_io(io, opts=nil) click to toggle source

Open an AMQP protocol connection on an existing {IO} object @param io [IO] An existing {IO} object, e.g. a {TCPSocket} @option (see Qpid::Proton::Connection#open)

# File lib/core/container.rb, line 187
def connect_io(io, opts=nil)
  not_stopped
  cd = connection_driver(io, opts)
  cd.connection.open()
  add(cd)
  cd.connection
end
listen(url, handler=Listener::Handler.new) click to toggle source

Listen for incoming AMQP connections

@param url [String,URI] Listen on host:port of the AMQP URL @param handler [Listener::Handler] A {Listener::Handler} object that will be called with events for this listener and can generate a new set of options for each one. @return [Listener] The AMQP listener.

# File lib/core/container.rb, line 202
def listen(url, handler=Listener::Handler.new)
  not_stopped
  url = Qpid::Proton::uri url
  # TODO aconway 2017-11-01: amqps, SSL
  listen_io(TCPServer.new(url.host, url.port), handler)
end
listen_io(io, handler=Listener::Handler.new) click to toggle source

Listen for incoming AMQP connections on an existing server socket. @param io A server socket, for example a {TCPServer} @param handler [Listener::Handler] Handler for events from this listener

# File lib/core/container.rb, line 213
def listen_io(io, handler=Listener::Handler.new)
  not_stopped
  l = ListenTask.new(io, handler, self)
  add(l)
  l
end
run() click to toggle source

Run the container: wait for IO activity, dispatch events to handlers.

More than one thread can call {#run} concurrently, the container will use all the {#run} threads as a thread pool. Calls to {Handler::MessagingHandler} methods are serialized for each connection or listener, even if the container has multiple threads.

# File lib/core/container.rb, line 227
def run
  @lock.synchronize do
    @running += 1        # Note: ensure clause below will decrement @running
    raise StoppedError if @stopped
  end
  while task = @work.pop
    case task

    when :start
      @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start

    when Container
      r, w = [@wake[0]], []
      @lock.synchronize do
        @selectable.each do |s|
          r << s if s.send :can_read?
          w << s if s.send :can_write?
        end
      end
      r, w = IO.select(r, w)
      selected = Set.new(r).merge(w)
      drain_wake if selected.delete?(@wake[0])
      stop_select = nil
      @lock.synchronize do
        if stop_select = @stopped # close everything
          selected += @selectable
          selected.each { |s| s.close @stop_err }
          @wake.each { |fd| fd.close() }
        end
        @selectable -= selected # Remove selected tasks
      end
      selected.each { |s| @work << s } # Queue up tasks needing #process
      @work << self unless stop_select

    when ConnectionTask then
      task.process
      rearm task

    when ListenTask then
      io, opts = task.process
      add(connection_driver(io, opts, true)) if io
      rearm task
    end
    # TODO aconway 2017-10-26: scheduled tasks, heartbeats
  end
ensure
  @lock.synchronize do
    if (@running -= 1) > 0
      work_wake nil         # Signal the next thread
    else
      @adapter.on_container_stop(self) if @adapter.respond_to? :on_container_stop
    end
  end
end
running() click to toggle source

Number of threads in {#run} @return [Bool] {#run} thread count

# File lib/core/container.rb, line 163
def running; @lock.synchronize { @running }; end
stop(error=nil) click to toggle source

Stop the container.

Close all listeners and abort all connections without doing AMQP protocol close.

{#stop} returns immediately, calls to {#run} will return when all activity is finished.

The container can no longer be used, using a stopped container raises {StoppedError} on attempting. Create a new container if you want to resume activity.

@param error [Condition] Optional transport/listener error condition

# File lib/core/container.rb, line 295
def stop(error=nil)
  @lock.synchronize do
    raise StoppedError if @stopped
    @stopped = true
    @stop_err = Condition.convert(error)
    check_stop_lh
    # NOTE: @stopped =>
    # - no new run threads can join
    # - no more select calls after next wakeup
    # - once @active == 0, all threads will be stopped with nil
  end
  wake
end

Protected Instance Methods

add(task) click to toggle source

All new tasks are added here

# File lib/core/container.rb, line 332
def add task
  @lock.synchronize do
    @active += 1
    task.close @stop_err if @stopped
  end
  work_wake task
end
check_stop_lh() click to toggle source
# File lib/core/container.rb, line 355
def check_stop_lh
  if @active.zero? && (@auto_stop || @stopped)
    @stopped = true
    work_wake nil          # Signal threads to stop
    true
  end
end
connection_driver(io, opts=nil, server=false) click to toggle source
# File lib/core/container.rb, line 324
def connection_driver(io, opts=nil, server=false)
  opts ||= {}
  opts[:container] = self
  opts[:handler] ||= @adapter
  ConnectionTask.new(self, io, opts, server)
end
drain_wake() click to toggle source
# File lib/core/container.rb, line 317
def drain_wake
  begin
    @wake[0].read_nonblock(256) while true
  rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
  end
end
not_stopped() click to toggle source
# File lib/core/container.rb, line 363
def not_stopped; raise StoppedError if @lock.synchronize { @stopped }; end
rearm(task) click to toggle source
# File lib/core/container.rb, line 340
def rearm task
  @lock.synchronize do
    if task.finished?
      @active -= 1
      check_stop_lh
    elsif @stopped
      task.close @stop_err
      work_wake task
    else
      @selectable << task
    end
  end
  wake
end
wake() click to toggle source
# File lib/core/container.rb, line 311
def wake; @wake[1].write_nonblock('x') rescue nil; end
work_wake(task) click to toggle source

Normally if we add work we need to set a wakeup to ensure a single run thread doesn't get stuck in select while there is other work on the queue.

# File lib/core/container.rb, line 315
def work_wake(task) @work << task; wake; end