class Rbgo::Channel::NonBufferChan
Attributes
close_flag[RW]
deq_cond[RW]
deq_mutex[RW]
enq_cond[RW]
enq_mutex[RW]
have_deq_waiting_flag[RW]
have_enq_waiting_flag[RW]
resource_array[RW]
Public Class Methods
new()
click to toggle source
# File lib/rbgo/select_chan.rb, line 121 def initialize self.enq_mutex = Mutex.new self.deq_mutex = Mutex.new self.enq_cond = ConditionVariable.new self.deq_cond = ConditionVariable.new self.resource_array = [] self.close_flag = false self.have_enq_waiting_flag = false self.have_deq_waiting_flag = false self.ios = Set.new self.register_mutex = Mutex.new end
Public Instance Methods
close()
click to toggle source
# File lib/rbgo/select_chan.rb, line 224 def close deq_mutex.synchronize do self.close_flag = true enq_cond.broadcast deq_cond.broadcast notify self end end
closed?()
click to toggle source
# File lib/rbgo/select_chan.rb, line 234 def closed? close_flag end
pop(nonblock = false)
click to toggle source
# File lib/rbgo/select_chan.rb, line 189 def pop(nonblock = false) resource = nil ok = true if closed? return [nil, false] end if nonblock raise ThreadError.new unless deq_mutex.try_lock else deq_mutex.lock end begin if nonblock raise ThreadError.new unless have_enq_waiting_flag end while resource_array.empty? && !closed? self.have_deq_waiting_flag = true notify enq_cond.wait(deq_mutex) end resource = resource_array.first ok = false if resource_array.empty? resource_array.clear self.have_deq_waiting_flag = false deq_cond.signal ensure deq_mutex.unlock end [resource, ok] end
push(obj, nonblock = false)
click to toggle source
# File lib/rbgo/select_chan.rb, line 135 def push(obj, nonblock = false) if closed? raise ClosedQueueError.new end if nonblock raise ThreadError.new unless enq_mutex.try_lock else enq_mutex.lock end begin if nonblock raise ThreadError.new unless have_deq_waiting_flag end begin if closed? raise ClosedQueueError.new else deq_mutex.synchronize do resource_array[0] = obj enq_cond.signal until resource_array.empty? || closed? self.have_enq_waiting_flag = true begin Thread.new do deq_mutex.synchronize do # no op end notify end rescue Exception => ex Rbgo.logger&.error('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } sleep 1 retry end deq_cond.wait(deq_mutex) end raise ClosedQueueError.new if closed? end end ensure self.have_enq_waiting_flag = false end ensure enq_mutex.unlock end self end