class Eventbox::EventLoop
@private
This class manages the calls to event scope methods and procs comparable to an event loop. It doesn't use an explicit event loop, but uses the calling thread to process the event.
All methods prefixed with “_” requires @mutex acquired to be called.
Public Class Methods
new(threadpool, guard_time)
click to toggle source
# File lib/eventbox/event_loop.rb, line 11 def initialize(threadpool, guard_time) @threadpool = threadpool @shutdown = false @guard_time = guard_time _init_variables end
Public Instance Methods
_external_object_call(object, method, name, args, kwargs, arg_block, cbresult, source_event_loop, call_context)
click to toggle source
# File lib/eventbox/event_loop.rb, line 403 def _external_object_call(object, method, name, args, kwargs, arg_block, cbresult, source_event_loop, call_context) result_wrapper = ArgumentWrapper.build(cbresult, name) if cbresult args = Sanitizer.sanitize_values(args, self, source_event_loop) kwargs = Sanitizer.sanitize_kwargs(kwargs, self, source_event_loop) arg_block = Sanitizer.sanitize_value(arg_block, self, source_event_loop) cb = ExternalObjectCall.new(object, method, args, kwargs, arg_block, cbresult, result_wrapper) if call_context # explicit call_context given if call_context.__answer_queue__.closed? raise InvalidAccess, "#{cb.objtype} #{"defined by `#{name}' " if name}was called with a call context that already returned" end call_context.__answer_queue__ << cb elsif @latest_answer_queue # proc called by a sync or yield call/proc context @latest_answer_queue << cb else raise InvalidAccess, "#{cb.objtype} #{"defined by `#{name}' " if name}was called by `#{@latest_call_name}', which must a sync_call, yield_call, sync_proc or yield_proc" end nil end
_init_variables()
click to toggle source
# File lib/eventbox/event_loop.rb, line 31 def _init_variables @running_actions = [] @running_actions_for_gc = [] @mutex = Mutex.new @guard_time_proc = case @guard_time when NilClass nil when Numeric @guard_time and proc do |dt, name| if dt > @guard_time ecaller = caller.find{|t| !(t=~/lib\/eventbox(\/|\.rb:)/) } warn "guard time exceeded: #{"%2.3f" % dt} sec (limit is #{@guard_time}) in `#{name}' called from `#{ecaller}' - please move blocking tasks to actions" end end when Proc @guard_time else raise ArgumentError, "guard_time should be Numeric, Proc or nil" end end
_latest_call_context()
click to toggle source
# File lib/eventbox/event_loop.rb, line 143 def _latest_call_context if @latest_answer_queue ctx = BlockingExternalCallContext.new ctx.__answer_queue__ = @latest_answer_queue end ctx end
_update_action_threads_for_gc()
click to toggle source
Make a copy of the thread list for use in shutdown. The copy is replaced per an atomic operation, so that it can be read lock-free in shutdown.
# File lib/eventbox/event_loop.rb, line 104 def _update_action_threads_for_gc @running_actions_for_gc = @running_actions.dup end
async_call(box, name, args, kwargs, block, wrapper)
click to toggle source
# File lib/eventbox/event_loop.rb, line 160 def async_call(box, name, args, kwargs, block, wrapper) with_call_frame(name, nil) do |source_event_loop| args, kwargs = wrapper.call(source_event_loop, self, *args, **kwargs) if wrapper args = Sanitizer.sanitize_values(args, source_event_loop, self, name) kwargs = Sanitizer.sanitize_kwargs(kwargs, source_event_loop, self, name) block = Sanitizer.sanitize_value(block, source_event_loop, self, name) box.send("__#{name}__", *args, **kwargs, &block) end end
async_proc_call(pr, args, kwargs, arg_block, wrapper)
click to toggle source
Anonymous version of async_call
# File lib/eventbox/event_loop.rb, line 194 def async_proc_call(pr, args, kwargs, arg_block, wrapper) with_call_frame(AsyncProc, nil) do |source_event_loop| args, kwargs = wrapper.call(source_event_loop, self, *args, **kwargs) if wrapper args = Sanitizer.sanitize_values(args, source_event_loop, self) kwargs = Sanitizer.sanitize_kwargs(kwargs, source_event_loop, self) arg_block = Sanitizer.sanitize_value(arg_block, source_event_loop, self) pr.yield(*args, **kwargs, &arg_block) end end
callback_loop(answer_queue, source_event_loop, name)
click to toggle source
# File lib/eventbox/event_loop.rb, line 330 def callback_loop(answer_queue, source_event_loop, name) loop do rets = answer_queue.deq case rets when ExternalObjectCall cbres = rets.object.send(rets.method, *rets.args, **rets.kwargs, &rets.arg_block) if rets.cbresult external_call_result(rets.cbresult, cbres, answer_queue, rets.result_wrapper) end when WrappedException close_answer_queue(answer_queue, name) raise(*rets.exc) else close_answer_queue(answer_queue, name) return rets end end end
event_scope?()
click to toggle source
Is the caller running within the event scope context?
# File lib/eventbox/event_loop.rb, line 109 def event_scope? @mutex.owned? end
external_call_result(cbresult, res, answer_queue, wrapper)
click to toggle source
Called when an external object call finished
# File lib/eventbox/event_loop.rb, line 230 def external_call_result(cbresult, res, answer_queue, wrapper) with_call_frame(ExternalObject, answer_queue) do |source_event_loop| res, _ = wrapper.call(source_event_loop, self, res) if wrapper res = Sanitizer.sanitize_value(res, source_event_loop, self) cbresult.yield(*res) end end
inspect()
click to toggle source
# File lib/eventbox/event_loop.rb, line 74 def inspect "#<#{self.class}:#{self.object_id} @threadpool=#{@threadpool.inspect}, @shutdown=#{@shutdown.inspect}, @guard_time=#{@guard_time.inspect}, @running_actions=#{@running_actions.length}>" end
internal_yield_result(args, name)
click to toggle source
# File lib/eventbox/event_loop.rb, line 287 def internal_yield_result(args, name) complete = args.last unless Proc === complete if Proc === name raise InvalidAccess, "yield_proc #{name.inspect} must be called with a Proc object in the event scope but got #{complete.class}" else raise InvalidAccess, "yield_call `#{name}' must be called with a Proc object in the event scope but got #{complete.class}" end end args[-1] = proc do |*cargs, &cblock| unless complete if Proc === name raise MultipleResults, "second result yielded for #{name.inspect} that already returned" else raise MultipleResults, "second result yielded for method `#{name}' that already returned" end end res = complete.yield(*cargs, &cblock) complete = nil res end end
marshal_dump()
click to toggle source
# File lib/eventbox/event_loop.rb, line 18 def marshal_dump raise TypeError, "Eventbox objects can't be serialized within event scope" if event_scope? @mutex.synchronize do raise TypeError, "Eventbox objects can't be serialized while actions are running" unless @running_actions.empty? [@threadpool, @shutdown, @guard_time] end end
marshal_load(array)
click to toggle source
# File lib/eventbox/event_loop.rb, line 26 def marshal_load(array) @threadpool, @shutdown, @guard_time = array _init_variables end
new_async_proc(name=nil, klass=AsyncProc, &block)
click to toggle source
# File lib/eventbox/event_loop.rb, line 238 def new_async_proc(name=nil, klass=AsyncProc, &block) raise InvalidAccess, "async_proc outside of the event scope is not allowed" unless event_scope? wrapper = ArgumentWrapper.build(block, "async_proc #{name}") pr = klass.new do |*args, **kwargs, &arg_block| if event_scope? # called in the event scope block.yield(*args, **kwargs, &arg_block) else # called externally async_proc_call(block, args, kwargs, arg_block, wrapper) end pr end end
new_sync_proc(name=nil, &block)
click to toggle source
# File lib/eventbox/event_loop.rb, line 253 def new_sync_proc(name=nil, &block) raise InvalidAccess, "sync_proc outside of the event scope is not allowed" unless event_scope? wrapper = ArgumentWrapper.build(block, "sync_proc #{name}") SyncProc.new do |*args, **kwargs, &arg_block| if event_scope? # called in the event scope block.yield(*args, **kwargs, &arg_block) else # called externally answer_queue = Queue.new sel = sync_proc_call(block, args, kwargs, arg_block, answer_queue, wrapper) callback_loop(answer_queue, sel, block) end end end
new_yield_proc(name=nil, &block)
click to toggle source
# File lib/eventbox/event_loop.rb, line 269 def new_yield_proc(name=nil, &block) raise InvalidAccess, "yield_proc outside of the event scope is not allowed" unless event_scope? wrapper = ArgumentWrapper.build(block, "yield_proc #{name}") YieldProc.new do |*args, **kwargs, &arg_block| if event_scope? # called in the event scope internal_yield_result(args, block) block.yield(*args, **kwargs, &arg_block) nil else # called externally answer_queue = Queue.new sel = yield_proc_call(block, args, kwargs, arg_block, answer_queue, wrapper) callback_loop(answer_queue, sel, block) end end end
send_shutdown(object_id=nil)
click to toggle source
Abort all running action threads.
# File lib/eventbox/event_loop.rb, line 53 def send_shutdown(object_id=nil) # warn "shutdown called for object #{object_id} with #{@running_actions.size} threads #{@running_actions.map(&:object_id).join(",")}" # The finalizer doesn't allow suspension per Mutex, so that we access a read-only copy of @running_actions. # To avoid race conditions with thread creation, set a flag before the loop. @shutdown = true # terminate all running action threads begin @running_actions_for_gc.each(&:abort) rescue ThreadError # ThreadPool requires to lock a mutex, which fails in trap context. # So defer the abort through another thread. Thread.new do @running_actions_for_gc.each(&:abort) end end nil end
shutdown(&completion_block)
click to toggle source
# File lib/eventbox/event_loop.rb, line 78 def shutdown(&completion_block) send_shutdown if event_scope? if completion_block completion_block = new_async_proc(&completion_block) # Thread might not be tagged to a calling event scope source_event_loop = Thread.current.thread_variable_get(:__event_loop__) Thread.current.thread_variable_set(:__event_loop__, nil) begin @threadpool.new do @running_actions_for_gc.each(&:join) completion_block.call end ensure Thread.current.thread_variable_set(:__event_loop__, source_event_loop) end end else raise InvalidAccess, "external shutdown call doesn't take a block but blocks until threads have terminated" if completion_block @running_actions_for_gc.each(&:join) end end
start_action(meth, name, args, &block)
click to toggle source
# File lib/eventbox/event_loop.rb, line 426 def start_action(meth, name, args, &block) # Actions might not be tagged to a calling event scope source_event_loop = Thread.current.thread_variable_get(:__event_loop__) Thread.current.thread_variable_set(:__event_loop__, nil) qu = Queue.new new_thread = Thread.handle_interrupt(Exception => :never) do @threadpool.new do ac = nil begin Thread.handle_interrupt(AbortAction => :on_blocking) do if meth.arity == args.length meth.call(*args, &block) else ac ||= qu.deq meth.call(*args, ac, &block) end end rescue AbortAction ac ||= qu.deq ac.terminate rescue WeakRef::RefError # It can happen that the GC already swept the Eventbox instance, before some instance action is in a blocking state. # In this case access to the Eventbox instance raises a RefError. # Since it's now impossible to execute the action up to a blocking state, abort the action prematurely. raise unless @shutdown ensure ac ||= qu.deq thread_finished(ac) end end end a = Action.new(name, new_thread, self) # Add to the list of running actions synchronize_external do @running_actions << a _update_action_threads_for_gc end # Enqueue the action for passing as action parameter qu << a # @shutdown is set without a lock, so that we need to re-check, if it was set while start_action if @shutdown a.abort a.join end a ensure Thread.current.thread_variable_set(:__event_loop__, source_event_loop) end
sync_call(box, name, args, kwargs, block, answer_queue, wrapper)
click to toggle source
# File lib/eventbox/event_loop.rb, line 170 def sync_call(box, name, args, kwargs, block, answer_queue, wrapper) with_call_frame(name, answer_queue) do |source_event_loop| args, kwargs = wrapper.call(source_event_loop, self, *args, **kwargs) if wrapper args = Sanitizer.sanitize_values(args, source_event_loop, self, name) kwargs = Sanitizer.sanitize_kwargs(kwargs, source_event_loop, self, name) block = Sanitizer.sanitize_value(block, source_event_loop, self, name) res = box.send("__#{name}__", *args, **kwargs, &block) res = Sanitizer.sanitize_value(res, self, source_event_loop) answer_queue << res end end
sync_proc_call(pr, args, kwargs, arg_block, answer_queue, wrapper)
click to toggle source
Anonymous version of sync_call
# File lib/eventbox/event_loop.rb, line 205 def sync_proc_call(pr, args, kwargs, arg_block, answer_queue, wrapper) with_call_frame(SyncProc, answer_queue) do |source_event_loop| args, kwargs = wrapper.call(source_event_loop, self, *args, **kwargs) if wrapper args = Sanitizer.sanitize_values(args, source_event_loop, self) kwargs = Sanitizer.sanitize_kwargs(kwargs, source_event_loop, self) arg_block = Sanitizer.sanitize_value(arg_block, source_event_loop, self) res = pr.yield(*args, **kwargs, &arg_block) res = Sanitizer.sanitize_value(res, self, source_event_loop) answer_queue << res end end
synchronize_external() { || ... }
click to toggle source
# File lib/eventbox/event_loop.rb, line 113 def synchronize_external if event_scope? yield else @mutex.synchronize do yield end end end
thread_finished(action)
click to toggle source
# File lib/eventbox/event_loop.rb, line 386 def thread_finished(action) @mutex.synchronize do @running_actions.delete(action) or raise(ArgumentError, "unknown action has finished: #{action}") _update_action_threads_for_gc end end
with_call_context(ctx) { || ... }
click to toggle source
# File lib/eventbox/event_loop.rb, line 151 def with_call_context(ctx) orig_context = @latest_answer_queue raise ArgumentError, "invalid argument #{ctx.inspect} instead of Eventbox::CallContext" unless CallContext === ctx @latest_answer_queue = ctx.__answer_queue__ yield ensure @latest_answer_queue = orig_context end
with_call_frame(name, answer_queue) { |source_event_loop| ... }
click to toggle source
# File lib/eventbox/event_loop.rb, line 123 def with_call_frame(name, answer_queue) source_event_loop = Thread.current.thread_variable_get(:__event_loop__) @mutex.lock begin Thread.current.thread_variable_set(:__event_loop__, self) @latest_answer_queue = answer_queue @latest_call_name = name start_time = Time.now yield(source_event_loop) ensure @latest_answer_queue = nil @latest_call_name = nil @mutex.unlock Thread.current.thread_variable_set(:__event_loop__, source_event_loop) diff_time = Time.now - start_time @guard_time_proc&.call(diff_time, name) end source_event_loop end
yield_call(box, name, args, kwargs, block, answer_queue, wrapper)
click to toggle source
# File lib/eventbox/event_loop.rb, line 182 def yield_call(box, name, args, kwargs, block, answer_queue, wrapper) with_call_frame(name, answer_queue) do |source_event_loop| args << new_completion_proc(answer_queue, name, source_event_loop) args, kwargs = wrapper.call(source_event_loop, self, *args, **kwargs) if wrapper args = Sanitizer.sanitize_values(args, source_event_loop, self, name) kwargs = Sanitizer.sanitize_kwargs(kwargs, source_event_loop, self, name) block = Sanitizer.sanitize_value(block, source_event_loop, self, name) box.send("__#{name}__", *args, **kwargs, &block) end end
yield_proc_call(pr, args, kwargs, arg_block, answer_queue, wrapper)
click to toggle source
Anonymous version of yield_call
# File lib/eventbox/event_loop.rb, line 218 def yield_proc_call(pr, args, kwargs, arg_block, answer_queue, wrapper) with_call_frame(YieldProc, answer_queue) do |source_event_loop| args << new_completion_proc(answer_queue, pr, source_event_loop) args, kwargs = wrapper.call(source_event_loop, self, *args, **kwargs) if wrapper args = Sanitizer.sanitize_values(args, source_event_loop, self) kwargs = Sanitizer.sanitize_kwargs(kwargs, source_event_loop, self) arg_block = Sanitizer.sanitize_value(arg_block, source_event_loop, self) pr.yield(*args, **kwargs, &arg_block) end end
€(object)
click to toggle source
Wrap an object as ExternalObject
.
# File lib/eventbox/event_loop.rb, line 382 def €(object) Sanitizer.wrap_object(object, nil, self, nil) end
Private Instance Methods
close_answer_queue(answer_queue, name)
click to toggle source
# File lib/eventbox/event_loop.rb, line 350 def close_answer_queue(answer_queue, name) answer_queue.close unless answer_queue.empty? rets = answer_queue.deq case rets when ExternalObjectCall if Proc === name raise InvalidAccess, "#{rets.objtype} can't be called through #{name.inspect}, since it already returned" else raise InvalidAccess, "#{rets.objtype} can't be called through method `#{name}', since it already returned" end else if Proc === name raise MultipleResults, "second result yielded for #{name.inspect} that already returned" else raise MultipleResults, "second result yielded for method `#{name}' that already returned" end end end end
new_completion_proc(answer_queue, name, source_event_loop)
click to toggle source
# File lib/eventbox/event_loop.rb, line 310 def new_completion_proc(answer_queue, name, source_event_loop) pr = new_async_proc(name, CompletionProc) do |*resu| unless answer_queue # It could happen, that two threads call the CompletionProc simultanously so that nothing is raised here. # In this case the failure is caught in callback_loop instead, but in all other cases the failure is raised early here at the caller side. if Proc === name raise MultipleResults, "second result yielded for #{name.inspect} that already returned" else raise MultipleResults, "second result yielded for method `#{name}' that already returned" end end resu = Sanitizer.sanitize_values(resu, self, source_event_loop) resu = Sanitizer.return_args(resu) answer_queue << resu answer_queue = nil end pr.__answer_queue__ = answer_queue pr end