class Async::Channel
Channel
is a wrapper around Async::Queue that provides a protocol and handy tools for passing data, exceptions and closing. It is designed to be used with only one publisher and one subscriber
Public Class Methods
new()
click to toggle source
# File lib/async/channel.rb, line 12 def initialize @queue = Async::Queue.new @closed = false end
Public Instance Methods
<<(payload)
click to toggle source
Methods for a publisher
# File lib/async/channel.rb, line 22 def <<(payload) raise(ChannelClosedError, "Cannot send to a closed channel") if @closed @queue << [:payload, payload] end
close()
click to toggle source
# File lib/async/channel.rb, line 34 def close return if closed? @queue << [:close] @closed = true end
closed?()
click to toggle source
# File lib/async/channel.rb, line 17 def closed? @closed end
dequeue()
click to toggle source
Methods for a subscriber
# File lib/async/channel.rb, line 42 def dequeue each do |payload| # rubocop:disable Lint/UnreachableLoop this is intended return payload end end
each() { |payload| ... }
click to toggle source
# File lib/async/channel.rb, line 48 def each raise(ChannelClosedError, "Cannot receive from a closed channel") if closed? @queue.each do |type, payload| case type when :exception payload.set_backtrace(caller + (payload.backtrace || [])) # A hack to preserve full backtrace raise payload when :payload yield payload when :close break end end end
exception(exception)
click to toggle source
# File lib/async/channel.rb, line 28 def exception(exception) raise(ChannelClosedError, "Cannot send to a closed channel") if closed? @queue << [:exception, exception] end