class PromisePool::Promise

Attributes

callbacks[RW]
condv[RW]
error[RW]
mutex[RW]
resolved[RW]
result[RW]
task[RW]
thread[RW]
timer[RW]
value[RW]

Public Class Methods

backtrace() click to toggle source
# File lib/promise_pool/promise.rb, line 14
def self.backtrace
  Thread.current[:promise_pool_backtrace] || []
end
claim(value, &callback) click to toggle source
# File lib/promise_pool/promise.rb, line 7
def self.claim value, &callback
  promise = new
  promise.then(&callback) if block_given?
  promise.fulfill(value)
  promise
end
new(timer=nil) click to toggle source
# File lib/promise_pool/promise.rb, line 23
def initialize timer=nil
  self.value = self.error = self.result = nil
  self.resolved = false
  self.callbacks = []

  self.timer = timer
  self.condv = ConditionVariable.new
  self.mutex = Mutex.new
end
set_backtrace(e) click to toggle source

should never raise!

# File lib/promise_pool/promise.rb, line 19
def self.set_backtrace e
  e.set_backtrace((e.backtrace || caller) + backtrace)
end

Public Instance Methods

call() { || ... } click to toggle source
# File lib/promise_pool/promise.rb, line 55
def call
  self.thread = Thread.current # set working thread
  protected_yield{ yield } # avoid any exception and do the job
  self
end
defer(pool=nil) { || ... } click to toggle source

called in client thread

# File lib/promise_pool/promise.rb, line 34
def defer pool=nil
  backtrace = caller + self.class.backtrace # retain the backtrace so far
  if pool
    mutex.synchronize do
      # still timing it out if the task never processed
      timer.on_timeout{ cancel_task } if timer
      self.task = pool.defer(mutex) do
        Thread.current[:promise_pool_backtrace] = backtrace
        protected_yield{ yield }
        Thread.current[:promise_pool_backtrace] = nil
      end
    end
  else
    self.thread = Thread.new do
      Thread.current[:promise_pool_backtrace] = backtrace
      protected_yield{ yield }
    end
  end
  self
end
fulfill(value) click to toggle source

called in requesting thread after the request is done

# File lib/promise_pool/promise.rb, line 83
def fulfill value
  mutex.synchronize{ fulfilling(value) }
end
future() click to toggle source
# File lib/promise_pool/promise.rb, line 61
def future
  Future.new(self)
end
reject(error) click to toggle source

called in requesting thread if something goes wrong or timed out

# File lib/promise_pool/promise.rb, line 88
def reject error
  mutex.synchronize{ rejecting(error) }
end
resolved?() click to toggle source
# File lib/promise_pool/promise.rb, line 98
def resolved?
  resolved
end
started?() click to toggle source
# File lib/promise_pool/promise.rb, line 102
def started?
  !!working_thread
end
then(&action) click to toggle source

append your actions, which would be called when we're calling back

# File lib/promise_pool/promise.rb, line 93
def then &action
  callbacks << action
  self
end
wait() click to toggle source

called in client thread (client.wait)

# File lib/promise_pool/promise.rb, line 66
def wait
  # it might be awaken by some other futures!
  mutex.synchronize{ condv.wait(mutex) until resolved? } unless resolved?
end
yield() click to toggle source

called in client thread (from the future (e.g. body))

# File lib/promise_pool/promise.rb, line 72
def yield
  wait
  case result
  when Exception
    raise result
  else
    result
  end
end

Private Instance Methods

cancel_task() click to toggle source

timeout!

# File lib/promise_pool/promise.rb, line 155
def cancel_task
  mutex.synchronize do
    if resolved?
      # do nothing if it's already done
    elsif t = working_thread
      t.raise(timer.error) # raise Timeout::Error to working thread
    else
      # task was queued and never started, just cancel it and
      # fulfill the promise with Timeout::Error
      task.cancel
      rejecting(timer.error)
    end
  end
end
fulfilling(value) click to toggle source
# File lib/promise_pool/promise.rb, line 111
def fulfilling value # should be synchronized
  self.value = value
  resolve
end
log_callback_error(err) click to toggle source

log user callback error, should never raise

# File lib/promise_pool/promise.rb, line 175
def log_callback_error err
  warn "#{self.class}: ERROR: #{err}\n  from #{err.backtrace.inspect}"
rescue Exception => e
  Thread.main.raise(e) if !!$DEBUG
end
protected_yield() { || ... } click to toggle source

called in a new thread if pool_size == 0, otherwise from the pool i.e. requesting thread

# File lib/promise_pool/promise.rb, line 134
def protected_yield
  value = if timer
            timeout_protected_yield{ yield }
          else
            yield
          end
  fulfill(value)
rescue Exception => err
  self.class.set_backtrace(err)
  reject(err)
end
rejecting(error) click to toggle source
# File lib/promise_pool/promise.rb, line 116
def rejecting error # should be synchronized
  self.error = error
  resolve
end
resolve() click to toggle source
# File lib/promise_pool/promise.rb, line 121
def resolve # should be synchronized
  self.result = callbacks.inject(error || value){ |r, k| k.call(r) }
rescue Exception => err
  self.class.set_backtrace(err)
  self.result = err
  log_callback_error(err)
ensure
  self.resolved = true
  condv.broadcast # client or response might be waiting
end
timeout_protected_yield() { || ... } click to toggle source
# File lib/promise_pool/promise.rb, line 146
def timeout_protected_yield
  # timeout might already be set for thread_pool (pool_size > 0)
  timer.on_timeout{ cancel_task } unless timer.timer
  yield
ensure
  timer.cancel
end
working_thread() click to toggle source
# File lib/promise_pool/promise.rb, line 170
def working_thread
  thread || (task && task.thread)
end