class Rinda::TupleSpace
Attributes
bag[R]
read_waiter[R]
take_waiter[R]
Public Class Methods
new(*args)
click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 517 def initialize(*args) orig_initialize(*args) @bag.set_special_bin( :task => TupleBag::DomainTupleBin, :finished => TupleBag::DomainTupleBin, :working => TupleBag::DomainTupleBin, :data => TupleBag::DataTupleBin, :lift => TupleBag::HashTupleBin ) @mutex = Mutex.new end
Also aliased as: orig_initialize
Public Instance Methods
all_tuples(target=:bag)
click to toggle source
Returns all tuples in the space. @param [Symbol] target
tuple type(:all, :bag, :read_waiter, or :take_waiter)
@return [Array]
all tuples
# File lib/pione/patch/rinda-patch.rb, line 558 def all_tuples(target=:bag) case target when :all all_tuples(:bag) + all_tuples(:read_waiter) + all_tuples(:take_waiter) when :bag @mutex.synchronize{@bag.all_tuples}.map{|tuple| tuple.value} when :read_waiter @mutex.synchronize{@read_waiter.all_tuples}.map{|tuple| tuple.value} when :take_waiter @mutex.synchronize{@take_waiter.all_tuples}.map{|tuple| tuple.value} end end
data_size()
click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 591 def data_size @bag.data_size end
finished_size()
click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 587 def finished_size @bag.finished_size end
move(port, tuple, sec=nil)
click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 534 def move(port, tuple, sec=nil) real_move(port, tuple, sec) end
notify(event, tuple, sec=nil)
click to toggle source
@note
mutex version of +notify+
# File lib/pione/patch/rinda-patch.rb, line 573 def notify(event, tuple, sec=nil) template = NotifyTemplateEntry.new(self, event, tuple, sec) @mutex.synchronize {@notify_waiter.push(template)} template end
read(tuple, sec=nil)
click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 538 def read(tuple, sec=nil) lift_tuple(real_read(tuple, sec)) end
read_all(tuple)
click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 542 def read_all(tuple) real_read_all(tuple).map do |res| lift_tuple(res) end end
take_all(tuple, sec=nil)
click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 548 def take_all(tuple, sec=nil) real_take_all(tuple, sec).map {|res| lift_tuple(res)} end
task_size()
click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 579 def task_size @bag.task_size end
working_size()
click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 583 def working_size @bag.working_size end
write(tuple, *args)
click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 529 def write(tuple, *args) tuple.timestamp = Time.now real_write(tuple, *args) end
Private Instance Methods
keep_clean()
click to toggle source
@note
mutex version of +keep_clean+
# File lib/pione/patch/rinda-patch.rb, line 746 def keep_clean @mutex.synchronize{@read_waiter.delete_unless_alive}.each do |e| e.signal end @mutex.synchronize{@take_waiter.delete_unless_alive}.each do |e| e.signal end @mutex.synchronize{@notify_waiter.delete_unless_alive}.each do |e| e.notify(['close']) end @mutex.synchronize{@bag.delete_unless_alive}.each do |e| notify_event('delete', e.value) end end
lift_location(location, history=[])
click to toggle source
Lift the location.
@param location [Location::BasicLocation]
location that lift from
@param history [Array<Location::BasicLocation>]
history of lifted location
@return [Location::BasicLocation or nil]
new location
# File lib/pione/patch/rinda-patch.rb, line 800 def lift_location(location, history=[]) return nil if history.include?(location) template = TemplateEntry.new([:lift, location, nil]) if lift_tuple = @bag.find(template) new_location = lift_tuple[2] return lift_location(new_location, history << location) || new_location end return nil end
lift_tuple(tuple)
click to toggle source
Lift the location of the tuple.
@param tuple [BasicTuple]
tuple
@return [BasicTuple]
lifted tuple
# File lib/pione/patch/rinda-patch.rb, line 780 def lift_tuple(tuple) if Pione::TupleSpace[tuple.first] if pos = Pione::TupleSpace[tuple.first].location_position if new_location = lift_location(tuple[pos]) tuple = tuple.clone tuple[pos] = new_location end end end return tuple end
real_move(port, tuple, sec=nil) { |template| ... }
click to toggle source
@note
mutex version of +move+
# File lib/pione/patch/rinda-patch.rb, line 634 def real_move(port, tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? if entry = @mutex.synchronize {@bag.find(template)} port.push(entry.value) if port @mutex.synchronize {@bag.delete(entry)} notify_event('take', entry.value) template.finished = true return entry.value end raise RequestExpiredError if template.expired? begin @mutex.synchronize {@take_waiter.push(template)} start_keeper if template.expires while true raise RequestCanceledError if template.canceled? raise RequestExpiredError if template.expired? if entry = @mutex.synchronize {@bag.find(template)} port.push(entry.value) if port @mutex.synchronize {@bag.delete(entry)} notify_event('take', entry.value) template.finished = true @mutex.synchronize do @take_waiter.delete(template) end return entry.value end Thread.current[:WaitTemplate] = template template.wait Thread.current[:WaitTemplate] = nil end ensure @mutex.synchronize {@take_waiter.delete(template)} end end
real_read(tuple, sec=nil) { |template| ... }
click to toggle source
@note
mutex version of +read+
# File lib/pione/patch/rinda-patch.rb, line 674 def real_read(tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? entry = @mutex.synchronize {@bag.find(template)} return entry.value if entry raise RequestExpiredError if template.expired? begin @mutex.synchronize {@read_waiter.push(template)} start_keeper if template.expires template.wait raise RequestCanceledError if template.canceled? raise RequestExpiredError if template.expired? return template.found ensure @mutex.synchronize {@read_waiter.delete(template)} end end
real_read_all(tuple)
click to toggle source
@note
mutex version of +read_all+
# File lib/pione/patch/rinda-patch.rb, line 696 def real_read_all(tuple) template = WaitTemplateEntry.new(self, tuple, nil) entry = @mutex.synchronize {@bag.find_all(template)} entry.collect {|e| e.value} end
real_take_all(tuple, sec=nil) { |template| ... }
click to toggle source
@note
mutex version of +read_all+
# File lib/pione/patch/rinda-patch.rb, line 704 def real_take_all(tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? entries = @mutex.synchronize {@bag.find_all(template)} unless entries.empty? entries.each do |entry| # port.push(entry.value) if port @mutex.synchronize {@bag.delete(entry)} end template.finished = true return entries.map {|entry| entry.value} end raise RequestExpiredError if template.expired? begin @mutex.synchronize {@take_waiter.push(template)} start_keeper if template.expires while true raise RequestCanceledError if template.canceled? raise RequestExpiredError if template.expired? entries = @mutex.synchronize {@bag.find_all(template)} unless entries.empty? entries.each do |entry| # port.push(entry.value) if port @mutex.synchronize {@bag.delete(entry)} end template.finished = true @mutex.synchronize {@take_waiter.delete(template)} return entries.map {|entry| entry.value} end Thread.current[:WaitTemplate] = template template.wait Thread.current[:WaitTemplate] = nil end ensure @mutex.synchronize {@take_waiter.delete(template)} end end
real_write(tuple, sec=nil)
click to toggle source
@note
mutex version of +write+
# File lib/pione/patch/rinda-patch.rb, line 599 def real_write(tuple, sec=nil) entry = create_entry(tuple, sec) if entry.expired? # why only read_waiter??? @mutex.synchronize{@read_waiter.find_all_template(entry)}.each do |template| template.read(tuple) end notify_event('write', entry.value) notify_event('delete', entry.value) else # push to bag @mutex.synchronize do @bag.push(entry) end # start keeper start_keeper if entry.expires # send tuple to all matched waiters in read waiter list @mutex.synchronize do @read_waiter.find_all_template(entry).each do |template| template.read(tuple) end end # send tuple to one of matched waiters in take waiter list @mutex.synchronize do if template = @take_waiter.find_template(entry) template.signal end end notify_event('write', entry.value) end entry end
start_keeper()
click to toggle source
@note
mutex version of +start_keeper+
# File lib/pione/patch/rinda-patch.rb, line 763 def start_keeper return if @keeper && @keeper.alive? @keeper = Thread.new do while true sleep(@period) break unless need_keeper? keep_clean end end end