class Eventbox::EventLoop

@private

This class manages the calls to event scope methods and procs comparable to an event loop. It doesn't use an explicit event loop, but uses the calling thread to process the event.

All methods prefixed with “_” requires @mutex acquired to be called.

Public Class Methods

new(threadpool, guard_time) click to toggle source
# File lib/eventbox/event_loop.rb, line 11
def initialize(threadpool, guard_time)
  @threadpool = threadpool
  @shutdown = false
  @guard_time = guard_time
  _init_variables
end

Public Instance Methods

_external_object_call(object, method, name, args, kwargs, arg_block, cbresult, source_event_loop, call_context) click to toggle source
# File lib/eventbox/event_loop.rb, line 403
def _external_object_call(object, method, name, args, kwargs, arg_block, cbresult, source_event_loop, call_context)
  result_wrapper = ArgumentWrapper.build(cbresult, name) if cbresult
  args = Sanitizer.sanitize_values(args, self, source_event_loop)
  kwargs = Sanitizer.sanitize_kwargs(kwargs, self, source_event_loop)
  arg_block = Sanitizer.sanitize_value(arg_block, self, source_event_loop)
  cb = ExternalObjectCall.new(object, method, args, kwargs, arg_block, cbresult, result_wrapper)

  if call_context
    # explicit call_context given
    if call_context.__answer_queue__.closed?
      raise InvalidAccess, "#{cb.objtype} #{"defined by `#{name}' " if name}was called with a call context that already returned"
    end
    call_context.__answer_queue__ << cb
  elsif @latest_answer_queue
    # proc called by a sync or yield call/proc context
    @latest_answer_queue << cb
  else
    raise InvalidAccess, "#{cb.objtype} #{"defined by `#{name}' " if name}was called by `#{@latest_call_name}', which must a sync_call, yield_call, sync_proc or yield_proc"
  end

  nil
end
_init_variables() click to toggle source
# File lib/eventbox/event_loop.rb, line 31
def _init_variables
  @running_actions = []
  @running_actions_for_gc = []
  @mutex = Mutex.new
  @guard_time_proc = case @guard_time
    when NilClass
      nil
    when Numeric
      @guard_time and proc do |dt, name|
        if dt > @guard_time
          ecaller = caller.find{|t| !(t=~/lib\/eventbox(\/|\.rb:)/) }
          warn "guard time exceeded: #{"%2.3f" % dt} sec (limit is #{@guard_time}) in `#{name}' called from `#{ecaller}' - please move blocking tasks to actions"
        end
      end
    when Proc
      @guard_time
    else
      raise ArgumentError, "guard_time should be Numeric, Proc or nil"
  end
end
_latest_call_context() click to toggle source
# File lib/eventbox/event_loop.rb, line 143
def _latest_call_context
  if @latest_answer_queue
    ctx = BlockingExternalCallContext.new
    ctx.__answer_queue__ = @latest_answer_queue
  end
  ctx
end
_update_action_threads_for_gc() click to toggle source

Make a copy of the thread list for use in shutdown. The copy is replaced per an atomic operation, so that it can be read lock-free in shutdown.

# File lib/eventbox/event_loop.rb, line 104
def _update_action_threads_for_gc
  @running_actions_for_gc = @running_actions.dup
end
async_call(box, name, args, kwargs, block, wrapper) click to toggle source
# File lib/eventbox/event_loop.rb, line 160
def async_call(box, name, args, kwargs, block, wrapper)
  with_call_frame(name, nil) do |source_event_loop|
    args, kwargs = wrapper.call(source_event_loop, self, *args, **kwargs) if wrapper
    args = Sanitizer.sanitize_values(args, source_event_loop, self, name)
    kwargs = Sanitizer.sanitize_kwargs(kwargs, source_event_loop, self, name)
    block = Sanitizer.sanitize_value(block, source_event_loop, self, name)
    box.send("__#{name}__", *args, **kwargs, &block)
  end
end
async_proc_call(pr, args, kwargs, arg_block, wrapper) click to toggle source

Anonymous version of async_call

# File lib/eventbox/event_loop.rb, line 194
def async_proc_call(pr, args, kwargs, arg_block, wrapper)
  with_call_frame(AsyncProc, nil) do |source_event_loop|
    args, kwargs = wrapper.call(source_event_loop, self, *args, **kwargs) if wrapper
    args = Sanitizer.sanitize_values(args, source_event_loop, self)
    kwargs = Sanitizer.sanitize_kwargs(kwargs, source_event_loop, self)
    arg_block = Sanitizer.sanitize_value(arg_block, source_event_loop, self)
    pr.yield(*args, **kwargs, &arg_block)
  end
end
callback_loop(answer_queue, source_event_loop, name) click to toggle source
# File lib/eventbox/event_loop.rb, line 330
def callback_loop(answer_queue, source_event_loop, name)
  loop do
    rets = answer_queue.deq
    case rets
    when ExternalObjectCall
      cbres = rets.object.send(rets.method, *rets.args, **rets.kwargs, &rets.arg_block)

      if rets.cbresult
        external_call_result(rets.cbresult, cbres, answer_queue, rets.result_wrapper)
      end
    when WrappedException
      close_answer_queue(answer_queue, name)
      raise(*rets.exc)
    else
      close_answer_queue(answer_queue, name)
      return rets
    end
  end
end
event_scope?() click to toggle source

Is the caller running within the event scope context?

# File lib/eventbox/event_loop.rb, line 109
def event_scope?
  @mutex.owned?
end
external_call_result(cbresult, res, answer_queue, wrapper) click to toggle source

Called when an external object call finished

# File lib/eventbox/event_loop.rb, line 230
def external_call_result(cbresult, res, answer_queue, wrapper)
  with_call_frame(ExternalObject, answer_queue) do |source_event_loop|
    res, _ = wrapper.call(source_event_loop, self, res) if wrapper
    res = Sanitizer.sanitize_value(res, source_event_loop, self)
    cbresult.yield(*res)
  end
end
inspect() click to toggle source
# File lib/eventbox/event_loop.rb, line 74
def inspect
  "#<#{self.class}:#{self.object_id} @threadpool=#{@threadpool.inspect}, @shutdown=#{@shutdown.inspect}, @guard_time=#{@guard_time.inspect}, @running_actions=#{@running_actions.length}>"
end
internal_yield_result(args, name) click to toggle source
# File lib/eventbox/event_loop.rb, line 287
def internal_yield_result(args, name)
  complete = args.last
  unless Proc === complete
    if Proc === name
      raise InvalidAccess, "yield_proc #{name.inspect} must be called with a Proc object in the event scope but got #{complete.class}"
    else
      raise InvalidAccess, "yield_call `#{name}' must be called with a Proc object in the event scope but got #{complete.class}"
    end
  end
  args[-1] = proc do |*cargs, &cblock|
    unless complete
      if Proc === name
        raise MultipleResults, "second result yielded for #{name.inspect} that already returned"
      else
        raise MultipleResults, "second result yielded for method `#{name}' that already returned"
      end
    end
    res = complete.yield(*cargs, &cblock)
    complete = nil
    res
  end
end
marshal_dump() click to toggle source
# File lib/eventbox/event_loop.rb, line 18
def marshal_dump
  raise TypeError, "Eventbox objects can't be serialized within event scope" if event_scope?
  @mutex.synchronize do
    raise TypeError, "Eventbox objects can't be serialized while actions are running" unless @running_actions.empty?
    [@threadpool, @shutdown, @guard_time]
  end
end
marshal_load(array) click to toggle source
# File lib/eventbox/event_loop.rb, line 26
def marshal_load(array)
  @threadpool, @shutdown, @guard_time = array
  _init_variables
end
new_async_proc(name=nil, klass=AsyncProc, &block) click to toggle source
# File lib/eventbox/event_loop.rb, line 238
def new_async_proc(name=nil, klass=AsyncProc, &block)
  raise InvalidAccess, "async_proc outside of the event scope is not allowed" unless event_scope?
  wrapper = ArgumentWrapper.build(block, "async_proc #{name}")
  pr = klass.new do |*args, **kwargs, &arg_block|
    if event_scope?
      # called in the event scope
      block.yield(*args, **kwargs, &arg_block)
    else
      # called externally
      async_proc_call(block, args, kwargs, arg_block, wrapper)
    end
    pr
  end
end
new_sync_proc(name=nil, &block) click to toggle source
# File lib/eventbox/event_loop.rb, line 253
def new_sync_proc(name=nil, &block)
  raise InvalidAccess, "sync_proc outside of the event scope is not allowed" unless event_scope?
  wrapper = ArgumentWrapper.build(block, "sync_proc #{name}")
  SyncProc.new do |*args, **kwargs, &arg_block|
    if event_scope?
      # called in the event scope
      block.yield(*args, **kwargs, &arg_block)
    else
      # called externally
      answer_queue = Queue.new
      sel = sync_proc_call(block, args, kwargs, arg_block, answer_queue, wrapper)
      callback_loop(answer_queue, sel, block)
    end
  end
end
new_yield_proc(name=nil, &block) click to toggle source
# File lib/eventbox/event_loop.rb, line 269
def new_yield_proc(name=nil, &block)
  raise InvalidAccess, "yield_proc outside of the event scope is not allowed" unless event_scope?
  wrapper = ArgumentWrapper.build(block, "yield_proc #{name}")
  YieldProc.new do |*args, **kwargs, &arg_block|
    if event_scope?
      # called in the event scope
      internal_yield_result(args, block)
      block.yield(*args, **kwargs, &arg_block)
      nil
    else
      # called externally
      answer_queue = Queue.new
      sel = yield_proc_call(block, args, kwargs, arg_block, answer_queue, wrapper)
      callback_loop(answer_queue, sel, block)
    end
  end
end
send_shutdown(object_id=nil) click to toggle source

Abort all running action threads.

# File lib/eventbox/event_loop.rb, line 53
    def send_shutdown(object_id=nil)
#       warn "shutdown called for object #{object_id} with #{@running_actions.size} threads #{@running_actions.map(&:object_id).join(",")}"

      # The finalizer doesn't allow suspension per Mutex, so that we access a read-only copy of @running_actions.
      # To avoid race conditions with thread creation, set a flag before the loop.
      @shutdown = true

      # terminate all running action threads
      begin
        @running_actions_for_gc.each(&:abort)
      rescue ThreadError
        # ThreadPool requires to lock a mutex, which fails in trap context.
        # So defer the abort through another thread.
        Thread.new do
          @running_actions_for_gc.each(&:abort)
        end
      end

      nil
    end
shared_object(object) click to toggle source

Mark an object as to be shared instead of copied.

# File lib/eventbox/event_loop.rb, line 372
def shared_object(object)
  if event_scope?
    ObjectRegistry.set_tag(object, self)
  else
    ObjectRegistry.set_tag(object, ExternalSharedObject)
  end
  object
end
shutdown(&completion_block) click to toggle source
# File lib/eventbox/event_loop.rb, line 78
def shutdown(&completion_block)
  send_shutdown
  if event_scope?
    if completion_block
      completion_block = new_async_proc(&completion_block)

      # Thread might not be tagged to a calling event scope
      source_event_loop = Thread.current.thread_variable_get(:__event_loop__)
      Thread.current.thread_variable_set(:__event_loop__, nil)
      begin
        @threadpool.new do
          @running_actions_for_gc.each(&:join)
          completion_block.call
        end
      ensure
        Thread.current.thread_variable_set(:__event_loop__, source_event_loop)
      end
    end
  else
    raise InvalidAccess, "external shutdown call doesn't take a block but blocks until threads have terminated" if completion_block
    @running_actions_for_gc.each(&:join)
  end
end
start_action(meth, name, args, &block) click to toggle source
# File lib/eventbox/event_loop.rb, line 426
def start_action(meth, name, args, &block)
  # Actions might not be tagged to a calling event scope
  source_event_loop = Thread.current.thread_variable_get(:__event_loop__)
  Thread.current.thread_variable_set(:__event_loop__, nil)

  qu = Queue.new

  new_thread = Thread.handle_interrupt(Exception => :never) do
    @threadpool.new do
      ac = nil
      begin
        Thread.handle_interrupt(AbortAction => :on_blocking) do
          if meth.arity == args.length
            meth.call(*args, &block)
          else
            ac ||= qu.deq
            meth.call(*args, ac, &block)
          end
        end
      rescue AbortAction
        ac ||= qu.deq
        ac.terminate
      rescue WeakRef::RefError
        # It can happen that the GC already swept the Eventbox instance, before some instance action is in a blocking state.
        # In this case access to the Eventbox instance raises a RefError.
        # Since it's now impossible to execute the action up to a blocking state, abort the action prematurely.
        raise unless @shutdown
      ensure
        ac ||= qu.deq
        thread_finished(ac)
      end
    end
  end

  a = Action.new(name, new_thread, self)

  # Add to the list of running actions
  synchronize_external do
    @running_actions << a
    _update_action_threads_for_gc
  end

  # Enqueue the action for passing as action parameter
  qu << a

  # @shutdown is set without a lock, so that we need to re-check, if it was set while start_action
  if @shutdown
    a.abort
    a.join
  end

  a
ensure
  Thread.current.thread_variable_set(:__event_loop__, source_event_loop)
end
sync_call(box, name, args, kwargs, block, answer_queue, wrapper) click to toggle source
# File lib/eventbox/event_loop.rb, line 170
def sync_call(box, name, args, kwargs, block, answer_queue, wrapper)
  with_call_frame(name, answer_queue) do |source_event_loop|
    args, kwargs = wrapper.call(source_event_loop, self, *args, **kwargs) if wrapper
    args = Sanitizer.sanitize_values(args, source_event_loop, self, name)
    kwargs = Sanitizer.sanitize_kwargs(kwargs, source_event_loop, self, name)
    block = Sanitizer.sanitize_value(block, source_event_loop, self, name)
    res = box.send("__#{name}__", *args, **kwargs, &block)
    res = Sanitizer.sanitize_value(res, self, source_event_loop)
    answer_queue << res
  end
end
sync_proc_call(pr, args, kwargs, arg_block, answer_queue, wrapper) click to toggle source

Anonymous version of sync_call

# File lib/eventbox/event_loop.rb, line 205
def sync_proc_call(pr, args, kwargs, arg_block, answer_queue, wrapper)
  with_call_frame(SyncProc, answer_queue) do |source_event_loop|
    args, kwargs = wrapper.call(source_event_loop, self, *args, **kwargs) if wrapper
    args = Sanitizer.sanitize_values(args, source_event_loop, self)
    kwargs = Sanitizer.sanitize_kwargs(kwargs, source_event_loop, self)
    arg_block = Sanitizer.sanitize_value(arg_block, source_event_loop, self)
    res = pr.yield(*args, **kwargs, &arg_block)
    res = Sanitizer.sanitize_value(res, self, source_event_loop)
    answer_queue << res
  end
end
synchronize_external() { || ... } click to toggle source
# File lib/eventbox/event_loop.rb, line 113
def synchronize_external
  if event_scope?
    yield
  else
    @mutex.synchronize do
      yield
    end
  end
end
thread_finished(action) click to toggle source
# File lib/eventbox/event_loop.rb, line 386
def thread_finished(action)
  @mutex.synchronize do
    @running_actions.delete(action) or raise(ArgumentError, "unknown action has finished: #{action}")
    _update_action_threads_for_gc
  end
end
with_call_context(ctx) { || ... } click to toggle source
# File lib/eventbox/event_loop.rb, line 151
def with_call_context(ctx)
  orig_context = @latest_answer_queue
  raise ArgumentError, "invalid argument #{ctx.inspect} instead of Eventbox::CallContext" unless CallContext === ctx
  @latest_answer_queue = ctx.__answer_queue__
  yield
ensure
  @latest_answer_queue = orig_context
end
with_call_frame(name, answer_queue) { |source_event_loop| ... } click to toggle source
# File lib/eventbox/event_loop.rb, line 123
def with_call_frame(name, answer_queue)
  source_event_loop = Thread.current.thread_variable_get(:__event_loop__)
  @mutex.lock
  begin
    Thread.current.thread_variable_set(:__event_loop__, self)
    @latest_answer_queue = answer_queue
    @latest_call_name = name
    start_time = Time.now
    yield(source_event_loop)
  ensure
    @latest_answer_queue = nil
    @latest_call_name = nil
    @mutex.unlock
    Thread.current.thread_variable_set(:__event_loop__, source_event_loop)
    diff_time = Time.now - start_time
    @guard_time_proc&.call(diff_time, name)
  end
  source_event_loop
end
yield_call(box, name, args, kwargs, block, answer_queue, wrapper) click to toggle source
# File lib/eventbox/event_loop.rb, line 182
def yield_call(box, name, args, kwargs, block, answer_queue, wrapper)
  with_call_frame(name, answer_queue) do |source_event_loop|
    args << new_completion_proc(answer_queue, name, source_event_loop)
    args, kwargs = wrapper.call(source_event_loop, self, *args, **kwargs) if wrapper
    args = Sanitizer.sanitize_values(args, source_event_loop, self, name)
    kwargs = Sanitizer.sanitize_kwargs(kwargs, source_event_loop, self, name)
    block = Sanitizer.sanitize_value(block, source_event_loop, self, name)
    box.send("__#{name}__", *args, **kwargs, &block)
  end
end
yield_proc_call(pr, args, kwargs, arg_block, answer_queue, wrapper) click to toggle source

Anonymous version of yield_call

# File lib/eventbox/event_loop.rb, line 218
def yield_proc_call(pr, args, kwargs, arg_block, answer_queue, wrapper)
  with_call_frame(YieldProc, answer_queue) do |source_event_loop|
    args << new_completion_proc(answer_queue, pr, source_event_loop)
    args, kwargs = wrapper.call(source_event_loop, self, *args, **kwargs) if wrapper
    args = Sanitizer.sanitize_values(args, source_event_loop, self)
    kwargs = Sanitizer.sanitize_kwargs(kwargs, source_event_loop, self)
    arg_block = Sanitizer.sanitize_value(arg_block, source_event_loop, self)
    pr.yield(*args, **kwargs, &arg_block)
  end
end
(object) click to toggle source

Wrap an object as ExternalObject.

# File lib/eventbox/event_loop.rb, line 382
def (object)
  Sanitizer.wrap_object(object, nil, self, nil)
end

Private Instance Methods

close_answer_queue(answer_queue, name) click to toggle source
# File lib/eventbox/event_loop.rb, line 350
        def close_answer_queue(answer_queue, name)
  answer_queue.close
  unless answer_queue.empty?
    rets = answer_queue.deq
    case rets
    when ExternalObjectCall
      if Proc === name
        raise InvalidAccess, "#{rets.objtype} can't be called through #{name.inspect}, since it already returned"
      else
        raise InvalidAccess, "#{rets.objtype} can't be called through method `#{name}', since it already returned"
      end
    else
      if Proc === name
        raise MultipleResults, "second result yielded for #{name.inspect} that already returned"
      else
        raise MultipleResults, "second result yielded for method `#{name}' that already returned"
      end
    end
  end
end
new_completion_proc(answer_queue, name, source_event_loop) click to toggle source
# File lib/eventbox/event_loop.rb, line 310
        def new_completion_proc(answer_queue, name, source_event_loop)
  pr = new_async_proc(name, CompletionProc) do |*resu|
    unless answer_queue
      # It could happen, that two threads call the CompletionProc simultanously so that nothing is raised here.
      # In this case the failure is caught in callback_loop instead, but in all other cases the failure is raised early here at the caller side.
      if Proc === name
        raise MultipleResults, "second result yielded for #{name.inspect} that already returned"
      else
        raise MultipleResults, "second result yielded for method `#{name}' that already returned"
      end
    end
    resu = Sanitizer.sanitize_values(resu, self, source_event_loop)
    resu = Sanitizer.return_args(resu)
    answer_queue << resu
    answer_queue = nil
  end
  pr.__answer_queue__ = answer_queue
  pr
end