class Async::Channel

Channel is a wrapper around Async::Queue that provides a protocol and handy tools for passing data, exceptions and closing. It is designed to be used with only one publisher and one subscriber

Public Class Methods

new() click to toggle source
# File lib/async/channel.rb, line 12
def initialize
  @queue = Async::Queue.new
  @closed = false
end

Public Instance Methods

<<(payload) click to toggle source

Methods for a publisher

# File lib/async/channel.rb, line 22
def <<(payload)
  raise(ChannelClosedError, "Cannot send to a closed channel") if @closed

  @queue << [:payload, payload]
end
close() click to toggle source
# File lib/async/channel.rb, line 34
def close
  return if closed?

  @queue << [:close]
  @closed = true
end
closed?() click to toggle source
# File lib/async/channel.rb, line 17
def closed?
  @closed
end
dequeue() click to toggle source

Methods for a subscriber

# File lib/async/channel.rb, line 42
def dequeue
  each do |payload| # rubocop:disable Lint/UnreachableLoop this is intended
    return payload
  end
end
each() { |payload| ... } click to toggle source
# File lib/async/channel.rb, line 48
def each
  raise(ChannelClosedError, "Cannot receive from a closed channel") if closed?

  @queue.each do |type, payload|
    case type
    when :exception
      payload.set_backtrace(caller + (payload.backtrace || [])) # A hack to preserve full backtrace
      raise payload
    when :payload
      yield payload
    when :close
      break
    end
  end
end
exception(exception) click to toggle source
# File lib/async/channel.rb, line 28
def exception(exception)
  raise(ChannelClosedError, "Cannot send to a closed channel") if closed?

  @queue << [:exception, exception]
end