class PromisePool::Promise
Attributes
callbacks[RW]
condv[RW]
error[RW]
mutex[RW]
resolved[RW]
result[RW]
task[RW]
thread[RW]
timer[RW]
value[RW]
Public Class Methods
backtrace()
click to toggle source
# File lib/promise_pool/promise.rb, line 14 def self.backtrace Thread.current[:promise_pool_backtrace] || [] end
claim(value, &callback)
click to toggle source
# File lib/promise_pool/promise.rb, line 7 def self.claim value, &callback promise = new promise.then(&callback) if block_given? promise.fulfill(value) promise end
new(timer=nil)
click to toggle source
# File lib/promise_pool/promise.rb, line 23 def initialize timer=nil self.value = self.error = self.result = nil self.resolved = false self.callbacks = [] self.timer = timer self.condv = ConditionVariable.new self.mutex = Mutex.new end
set_backtrace(e)
click to toggle source
should never raise!
# File lib/promise_pool/promise.rb, line 19 def self.set_backtrace e e.set_backtrace((e.backtrace || caller) + backtrace) end
Public Instance Methods
call() { || ... }
click to toggle source
# File lib/promise_pool/promise.rb, line 55 def call self.thread = Thread.current # set working thread protected_yield{ yield } # avoid any exception and do the job self end
defer(pool=nil) { || ... }
click to toggle source
called in client thread
# File lib/promise_pool/promise.rb, line 34 def defer pool=nil backtrace = caller + self.class.backtrace # retain the backtrace so far if pool mutex.synchronize do # still timing it out if the task never processed timer.on_timeout{ cancel_task } if timer self.task = pool.defer(mutex) do Thread.current[:promise_pool_backtrace] = backtrace protected_yield{ yield } Thread.current[:promise_pool_backtrace] = nil end end else self.thread = Thread.new do Thread.current[:promise_pool_backtrace] = backtrace protected_yield{ yield } end end self end
fulfill(value)
click to toggle source
called in requesting thread after the request is done
# File lib/promise_pool/promise.rb, line 83 def fulfill value mutex.synchronize{ fulfilling(value) } end
future()
click to toggle source
# File lib/promise_pool/promise.rb, line 61 def future Future.new(self) end
reject(error)
click to toggle source
called in requesting thread if something goes wrong or timed out
# File lib/promise_pool/promise.rb, line 88 def reject error mutex.synchronize{ rejecting(error) } end
resolved?()
click to toggle source
# File lib/promise_pool/promise.rb, line 98 def resolved? resolved end
started?()
click to toggle source
# File lib/promise_pool/promise.rb, line 102 def started? !!working_thread end
then(&action)
click to toggle source
append your actions, which would be called when we're calling back
# File lib/promise_pool/promise.rb, line 93 def then &action callbacks << action self end
wait()
click to toggle source
called in client thread (client.wait)
# File lib/promise_pool/promise.rb, line 66 def wait # it might be awaken by some other futures! mutex.synchronize{ condv.wait(mutex) until resolved? } unless resolved? end
yield()
click to toggle source
called in client thread (from the future (e.g. body))
# File lib/promise_pool/promise.rb, line 72 def yield wait case result when Exception raise result else result end end
Private Instance Methods
cancel_task()
click to toggle source
timeout!
# File lib/promise_pool/promise.rb, line 155 def cancel_task mutex.synchronize do if resolved? # do nothing if it's already done elsif t = working_thread t.raise(timer.error) # raise Timeout::Error to working thread else # task was queued and never started, just cancel it and # fulfill the promise with Timeout::Error task.cancel rejecting(timer.error) end end end
fulfilling(value)
click to toggle source
# File lib/promise_pool/promise.rb, line 111 def fulfilling value # should be synchronized self.value = value resolve end
log_callback_error(err)
click to toggle source
log user callback error, should never raise
# File lib/promise_pool/promise.rb, line 175 def log_callback_error err warn "#{self.class}: ERROR: #{err}\n from #{err.backtrace.inspect}" rescue Exception => e Thread.main.raise(e) if !!$DEBUG end
protected_yield() { || ... }
click to toggle source
called in a new thread if pool_size == 0, otherwise from the pool i.e. requesting thread
# File lib/promise_pool/promise.rb, line 134 def protected_yield value = if timer timeout_protected_yield{ yield } else yield end fulfill(value) rescue Exception => err self.class.set_backtrace(err) reject(err) end
rejecting(error)
click to toggle source
# File lib/promise_pool/promise.rb, line 116 def rejecting error # should be synchronized self.error = error resolve end
resolve()
click to toggle source
# File lib/promise_pool/promise.rb, line 121 def resolve # should be synchronized self.result = callbacks.inject(error || value){ |r, k| k.call(r) } rescue Exception => err self.class.set_backtrace(err) self.result = err log_callback_error(err) ensure self.resolved = true condv.broadcast # client or response might be waiting end
timeout_protected_yield() { || ... }
click to toggle source
# File lib/promise_pool/promise.rb, line 146 def timeout_protected_yield # timeout might already be set for thread_pool (pool_size > 0) timer.on_timeout{ cancel_task } unless timer.timer yield ensure timer.cancel end
working_thread()
click to toggle source
# File lib/promise_pool/promise.rb, line 170 def working_thread thread || (task && task.thread) end