class Rinda::TupleSpace

Attributes

bag[R]
read_waiter[R]
take_waiter[R]

Public Class Methods

new(*args) click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 517
def initialize(*args)
  orig_initialize(*args)
  @bag.set_special_bin(
    :task => TupleBag::DomainTupleBin,
    :finished => TupleBag::DomainTupleBin,
    :working => TupleBag::DomainTupleBin,
    :data => TupleBag::DataTupleBin,
    :lift => TupleBag::HashTupleBin
  )
  @mutex = Mutex.new
end
Also aliased as: orig_initialize

Public Instance Methods

all_tuples(target=:bag) click to toggle source

Returns all tuples in the space. @param [Symbol] target

tuple type(:all, :bag, :read_waiter, or :take_waiter)

@return [Array]

all tuples
# File lib/pione/patch/rinda-patch.rb, line 558
def all_tuples(target=:bag)
  case target
  when :all
    all_tuples(:bag) + all_tuples(:read_waiter) + all_tuples(:take_waiter)
  when :bag
    @mutex.synchronize{@bag.all_tuples}.map{|tuple| tuple.value}
  when :read_waiter
    @mutex.synchronize{@read_waiter.all_tuples}.map{|tuple| tuple.value}
  when :take_waiter
    @mutex.synchronize{@take_waiter.all_tuples}.map{|tuple| tuple.value}
  end
end
data_size() click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 591
def data_size
  @bag.data_size
end
finished_size() click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 587
def finished_size
  @bag.finished_size
end
move(port, tuple, sec=nil) click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 534
def move(port, tuple, sec=nil)
  real_move(port, tuple, sec)
end
notify(event, tuple, sec=nil) click to toggle source

@note

mutex version of +notify+
# File lib/pione/patch/rinda-patch.rb, line 573
def notify(event, tuple, sec=nil)
  template = NotifyTemplateEntry.new(self, event, tuple, sec)
  @mutex.synchronize {@notify_waiter.push(template)}
  template
end
orig_initialize(*args)
Alias for: new
read(tuple, sec=nil) click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 538
def read(tuple, sec=nil)
  lift_tuple(real_read(tuple, sec))
end
read_all(tuple) click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 542
def read_all(tuple)
  real_read_all(tuple).map do |res|
    lift_tuple(res)
  end
end
take_all(tuple, sec=nil) click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 548
def take_all(tuple, sec=nil)
  real_take_all(tuple, sec).map {|res| lift_tuple(res)}
end
task_size() click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 579
def task_size
  @bag.task_size
end
working_size() click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 583
def working_size
  @bag.working_size
end
write(tuple, *args) click to toggle source
# File lib/pione/patch/rinda-patch.rb, line 529
def write(tuple, *args)
  tuple.timestamp = Time.now
  real_write(tuple, *args)
end

Private Instance Methods

keep_clean() click to toggle source

@note

mutex version of +keep_clean+
# File lib/pione/patch/rinda-patch.rb, line 746
def keep_clean
  @mutex.synchronize{@read_waiter.delete_unless_alive}.each do |e|
    e.signal
  end
  @mutex.synchronize{@take_waiter.delete_unless_alive}.each do |e|
    e.signal
  end
  @mutex.synchronize{@notify_waiter.delete_unless_alive}.each do |e|
    e.notify(['close'])
  end
  @mutex.synchronize{@bag.delete_unless_alive}.each do |e|
    notify_event('delete', e.value)
  end
end
lift_location(location, history=[]) click to toggle source

Lift the location.

@param location [Location::BasicLocation]

location that lift from

@param history [Array<Location::BasicLocation>]

history of lifted location

@return [Location::BasicLocation or nil]

new location
# File lib/pione/patch/rinda-patch.rb, line 800
def lift_location(location, history=[])
  return nil if history.include?(location)

  template = TemplateEntry.new([:lift, location, nil])
  if lift_tuple = @bag.find(template)
    new_location = lift_tuple[2]
    return lift_location(new_location, history << location) || new_location
  end
  return nil
end
lift_tuple(tuple) click to toggle source

Lift the location of the tuple.

@param tuple [BasicTuple]

tuple

@return [BasicTuple]

lifted tuple
# File lib/pione/patch/rinda-patch.rb, line 780
def lift_tuple(tuple)
  if Pione::TupleSpace[tuple.first]
    if pos = Pione::TupleSpace[tuple.first].location_position
      if new_location = lift_location(tuple[pos])
        tuple = tuple.clone
        tuple[pos] = new_location
      end
    end
  end
  return tuple
end
real_move(port, tuple, sec=nil) { |template| ... } click to toggle source

@note

mutex version of +move+
# File lib/pione/patch/rinda-patch.rb, line 634
def real_move(port, tuple, sec=nil)
  template = WaitTemplateEntry.new(self, tuple, sec)
  yield(template) if block_given?

  if entry = @mutex.synchronize {@bag.find(template)}
    port.push(entry.value) if port
    @mutex.synchronize {@bag.delete(entry)}
    notify_event('take', entry.value)
    template.finished = true
    return entry.value
  end
  raise RequestExpiredError if template.expired?

  begin
    @mutex.synchronize {@take_waiter.push(template)}
    start_keeper if template.expires
    while true
      raise RequestCanceledError if template.canceled?
      raise RequestExpiredError if template.expired?
      if entry = @mutex.synchronize {@bag.find(template)}
        port.push(entry.value) if port
        @mutex.synchronize {@bag.delete(entry)}
        notify_event('take', entry.value)
        template.finished = true
        @mutex.synchronize do
          @take_waiter.delete(template)
        end
        return entry.value
      end
      Thread.current[:WaitTemplate] = template
      template.wait
      Thread.current[:WaitTemplate] = nil
    end
  ensure
    @mutex.synchronize {@take_waiter.delete(template)}
  end
end
real_read(tuple, sec=nil) { |template| ... } click to toggle source

@note

mutex version of +read+
# File lib/pione/patch/rinda-patch.rb, line 674
def real_read(tuple, sec=nil)
  template = WaitTemplateEntry.new(self, tuple, sec)
  yield(template) if block_given?

  entry = @mutex.synchronize {@bag.find(template)}
  return entry.value if entry
  raise RequestExpiredError if template.expired?

  begin
    @mutex.synchronize {@read_waiter.push(template)}
    start_keeper if template.expires
    template.wait
    raise RequestCanceledError if template.canceled?
    raise RequestExpiredError if template.expired?
    return template.found
  ensure
    @mutex.synchronize {@read_waiter.delete(template)}
  end
end
real_read_all(tuple) click to toggle source

@note

mutex version of +read_all+
# File lib/pione/patch/rinda-patch.rb, line 696
def real_read_all(tuple)
  template = WaitTemplateEntry.new(self, tuple, nil)
  entry = @mutex.synchronize {@bag.find_all(template)}
  entry.collect {|e| e.value}
end
real_take_all(tuple, sec=nil) { |template| ... } click to toggle source

@note

mutex version of +read_all+
# File lib/pione/patch/rinda-patch.rb, line 704
def real_take_all(tuple, sec=nil)
  template = WaitTemplateEntry.new(self, tuple, sec)
  yield(template) if block_given?

  entries = @mutex.synchronize {@bag.find_all(template)}
  unless entries.empty?
    entries.each do |entry|
      # port.push(entry.value) if port
      @mutex.synchronize {@bag.delete(entry)}
    end
    template.finished = true
    return entries.map {|entry| entry.value}
  end
  raise RequestExpiredError if template.expired?

  begin
    @mutex.synchronize {@take_waiter.push(template)}
    start_keeper if template.expires
    while true
      raise RequestCanceledError if template.canceled?
      raise RequestExpiredError if template.expired?
      entries = @mutex.synchronize {@bag.find_all(template)}
      unless entries.empty?
        entries.each do |entry|
          # port.push(entry.value) if port
          @mutex.synchronize {@bag.delete(entry)}
        end
        template.finished = true
        @mutex.synchronize {@take_waiter.delete(template)}
        return entries.map {|entry| entry.value}
      end
      Thread.current[:WaitTemplate] = template
      template.wait
      Thread.current[:WaitTemplate] = nil
    end
  ensure
    @mutex.synchronize {@take_waiter.delete(template)}
  end
end
real_write(tuple, sec=nil) click to toggle source

@note

mutex version of +write+
# File lib/pione/patch/rinda-patch.rb, line 599
def real_write(tuple, sec=nil)
  entry = create_entry(tuple, sec)
  if entry.expired?
    # why only read_waiter???
    @mutex.synchronize{@read_waiter.find_all_template(entry)}.each do |template|
      template.read(tuple)
    end
    notify_event('write', entry.value)
    notify_event('delete', entry.value)
  else
    # push to bag
    @mutex.synchronize do
      @bag.push(entry)
    end
    # start keeper
    start_keeper if entry.expires
    # send tuple to all matched waiters in read waiter list
    @mutex.synchronize do
      @read_waiter.find_all_template(entry).each do |template|
        template.read(tuple)
      end
    end
    # send tuple to one of matched waiters in take waiter list
    @mutex.synchronize do
      if template = @take_waiter.find_template(entry)
        template.signal
      end
    end
    notify_event('write', entry.value)
  end
  entry
end
start_keeper() click to toggle source

@note

mutex version of +start_keeper+
# File lib/pione/patch/rinda-patch.rb, line 763
def start_keeper
  return if @keeper && @keeper.alive?
  @keeper = Thread.new do
    while true
      sleep(@period)
      break unless need_keeper?
      keep_clean
    end
  end
end