class Rbgo::Channel::NonBufferChan

NonBufferChan

Attributes

close_flag[RW]
deq_cond[RW]
deq_mutex[RW]
enq_cond[RW]
enq_mutex[RW]
have_deq_waiting_flag[RW]
have_enq_waiting_flag[RW]
resource_array[RW]

Public Class Methods

new() click to toggle source
# File lib/rbgo/select_chan.rb, line 121
def initialize
  self.enq_mutex             = Mutex.new
  self.deq_mutex             = Mutex.new
  self.enq_cond              = ConditionVariable.new
  self.deq_cond              = ConditionVariable.new
  self.resource_array        = []
  self.close_flag            = false
  self.have_enq_waiting_flag = false
  self.have_deq_waiting_flag = false

  self.ios            = Set.new
  self.register_mutex = Mutex.new
end

Public Instance Methods

<<(obj, nonblock = false)
Alias for: push
close() click to toggle source
# File lib/rbgo/select_chan.rb, line 224
def close
  deq_mutex.synchronize do
    self.close_flag = true
    enq_cond.broadcast
    deq_cond.broadcast
    notify
    self
  end
end
closed?() click to toggle source
# File lib/rbgo/select_chan.rb, line 234
def closed?
  close_flag
end
deq(nonblock = false)
Alias for: pop
enq(obj, nonblock = false)
Alias for: push
pop(nonblock = false) click to toggle source
# File lib/rbgo/select_chan.rb, line 189
def pop(nonblock = false)
  resource = nil
  ok       = true
  if closed?
    return [nil, false]
  end

  if nonblock
    raise ThreadError.new unless deq_mutex.try_lock
  else
    deq_mutex.lock
  end

  begin
    if nonblock
      raise ThreadError.new unless have_enq_waiting_flag
    end

    while resource_array.empty? && !closed?
      self.have_deq_waiting_flag = true
      notify
      enq_cond.wait(deq_mutex)
    end
    resource = resource_array.first
    ok       = false if resource_array.empty?
    resource_array.clear
    self.have_deq_waiting_flag = false
    deq_cond.signal
  ensure
    deq_mutex.unlock
  end

  [resource, ok]
end
Also aliased as: deq, shift
push(obj, nonblock = false) click to toggle source
# File lib/rbgo/select_chan.rb, line 135
def push(obj, nonblock = false)
  if closed?
    raise ClosedQueueError.new
  end

  if nonblock
    raise ThreadError.new unless enq_mutex.try_lock
  else
    enq_mutex.lock
  end

  begin
    if nonblock
      raise ThreadError.new unless have_deq_waiting_flag
    end

    begin
      if closed?
        raise ClosedQueueError.new
      else
        deq_mutex.synchronize do
          resource_array[0] = obj
          enq_cond.signal
          until resource_array.empty? || closed?
            self.have_enq_waiting_flag = true

            begin
              Thread.new do
                deq_mutex.synchronize do
                  # no op
                end
                notify
              end
            rescue Exception => ex
              Rbgo.logger&.error('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
              sleep 1
              retry
            end

            deq_cond.wait(deq_mutex)
          end
          raise ClosedQueueError.new if closed?
        end
      end
    ensure
      self.have_enq_waiting_flag = false
    end
  ensure
    enq_mutex.unlock
  end

  self
end
Also aliased as: <<, enq
shift(nonblock = false)
Alias for: pop