class Utilrb::EventLoop
Simple event loop which supports timers and defers blocking operations to a thread pool those results are queued and being processed by the event loop thread at the end of each step.
All events must be code blocks which will be executed at the end of each step. There is no support for filtering or event propagations.
For an easy integration of ruby classes into the event loop the {Forwardable#def_event_loop_delegator} can be used.
@example Example for using the EventLoop
event_loop = EventLoop.new event_loop.once do puts "called once" end event_loop.every(1.0) do puts "called every second" end callback = Proc.new |result| puts result end event_loop.defer callback do sleep 2 "result from the worker thread #{Thread.current}" end event_loop.exec
@author Alexander Duda <Alexander.Duda@dfki.de>
Attributes
Underlying thread pool used to defer work.
@return [Utilrb::ThreadPool]
Public Class Methods
# File lib/utilrb/event_loop.rb, line 172 def self.cleanup_backtrace(&block) block.call rescue $@.delete_if{|s| %r"#{Regexp.quote(__FILE__)}"o =~ s} ::Kernel::raise end
A new EventLoop
# File lib/utilrb/event_loop.rb, line 185 def initialize @mutex = Mutex.new @events = Queue.new # stores all events for the next step @timers = Set.new # stores all timers @every_cylce_events = Set.new # stores all events which are added to @events each step @on_error = {} # stores on error callbacks @errors = Queue.new # stores errors which will be re raised at the end of the step @number_of_events_to_process = 0 # number of events which are processed in the current step @thread_pool = ThreadPool.new @thread = Thread.current #the event loop thread @stop = nil end
Public Instance Methods
Adds an Event
to the event loop
@param [Event] event The event @param [Boolean] every_step
Automatically added for every step
# File lib/utilrb/event_loop.rb, line 642 def add_event(event,every_step = false) raise ArgumentError "cannot add event which is ignored." if event.ignore? if every_step @mutex.synchronize do @every_cylce_events << event end else @events << event end event end
Adds a task to the thread pool
@param [ThreadPool::Task] task The task.
# File lib/utilrb/event_loop.rb, line 657 def add_task(task) thread_pool << task end
Adds a timer to the event loop
@param [Timer] timer The timer.
# File lib/utilrb/event_loop.rb, line 631 def add_timer(timer) @mutex.synchronize do raise "timer #{timer}:#{timer.doc} was already added!" if @timers.include?(timer) @timers << timer end end
Integrates a blocking operation call into the EventLoop
like {Utilrb::EventLoop#defer} but has a more suitable syntax for deferring a method call
async method(:my_method) do |result,exception| if exception raise exception else puts result end end
@param [#call] work The proc which will be deferred @yield [result] The callback @yield [result,exception] The callback @return [Utilrb::ThreadPool::Task] The thread pool task.
# File lib/utilrb/event_loop.rb, line 213 def async(work,*args,&callback) async_with_options(work,Hash.new,*args,&callback) end
Integrates a blocking operation call like {Utilrb::EventLoop#async} but automatically re queues the call if period was passed and the task was finished by the worker thread. This means it will never re queue the call if the task blocks for ever and it will never simultaneously defer the call to more than one worker thread.
@param [Hash] options The options @option options [Float] :period The period @option options [Boolean] :start Starts the timer right away (default = true) @param [#call] work The proc which will be deferred @param (see defer
) @option (see defer
) @return [EventLoop::Timer] The thread pool task.
# File lib/utilrb/event_loop.rb, line 252 def async_every(work,options=Hash.new,*args, &callback) options, async_opt = Kernel.filter_options(options,:period,:start => true) period = options[:period] raise ArgumentError,"No period given" unless period task = nil every period ,options[:start] do if !task task = async_with_options(work,async_opt,*args,&callback) elsif task.finished? add_task task end task end end
(see ThreadPool#backlog
)
# File lib/utilrb/event_loop.rb, line 568 def backlog thread_pool.backlog end
Calls the give block in the event loop thread. If the current thread is the event loop thread it will execute it right a way and returns the result of the code block call. Otherwise, it returns an handler to the Event which was queued.
@return [Event,Object]
# File lib/utilrb/event_loop.rb, line 375 def call(&block) if thread? block.call else once(&block) end end
Cancels the given timer if it is running otherwise it does nothing.
@param [Timer] timer The timer
# File lib/utilrb/event_loop.rb, line 492 def cancel_timer(timer) @mutex.synchronize do @timers.delete timer end end
Clears all timers, events and errors
# File lib/utilrb/event_loop.rb, line 662 def clear thread_pool.clear @errors.clear @events.clear @mutex.synchronize do @every_cylce_events.clear @timers.clear end end
Clears all errors which occurred during the last step and are not marked as known If the errors were not cleared they are re raised the next time step is called.
# File lib/utilrb/event_loop.rb, line 675 def clear_errors @errors.clear end
Integrates a blocking operation call into the EventLoop
by executing it from a different thread. The given callback will be called from the EventLoop
thread while processing its events after the call returned.
If the callback has an arity of 2 the exception will be passed to the callback as second parameter in an event of an error. The error is also passed to the error handlers of the even loop, but it will not be re raised if the error is marked as known
To overwrite an error the callback can return :ignore_error or a new instance of an error in an event of an error. In this case the error handlers of the event loop will not be called or called with the new error instance.
@example ignore a error callback = Proc.new do |r,e|
if e :ignore_error else puts r end end
defer({:callback => callback}) do
raise
end
@param [Hash] options The options @option (see ThreadPool::Task#initialize) @option options [Proc] :callback The callback @option options [class] :known_errors Known erros which will be rescued @option options [Proc] :on_error Callback which is called when an error occured
@param (see ThreadPool::Task#initialize) @return [ThreadPool::Task] The thread pool task.
# File lib/utilrb/event_loop.rb, line 302 def defer(options=Hash.new,*args,&block) options, task_options = Kernel.filter_options(options,{:callback => nil,:known_errors => [],:on_error => nil}) callback = options[:callback] error_callback = options[:on_error] known_errors = Array(options[:known_errors]) task = Utilrb::ThreadPool::Task.new(task_options,*args,&block) # ensures that user callback is called from main thread and not from worker threads if callback task.callback do |result,exception| once do if callback.arity == 1 callback.call result if !exception else e = callback.call result,exception #check if the error was overwritten in the #case of an error exception = if exception if e.is_a?(Symbol) && e == :ignore_error nil elsif e.is_a? Exception e else exception end end end if exception error_callback.call(exception) if error_callback raises = !known_errors.any? {|error| exception.is_a?(error)} handle_error(exception,raises) end end end else task.callback do |result,exception| if exception raises = !known_errors.find {|error| exception.is_a?(error)} once do error_callback.call(exception) if error_callback handle_error(exception,raises) end end end end @mutex.synchronize do @thread_pool << task end task end
Returns true if events are queued.
@return [Boolean]
# File lib/utilrb/event_loop.rb, line 386 def events? !@events.empty? || !@errors.empty? end
Adds a timer to the event loop which will execute the given code block with the given period from the event loop thread.
@param [Float] period The period of the timer in seconds @parma [Boolean] start Startet the timerright away. @yield The code block. @return [Utilrb::EventLoop::Timer]
# File lib/utilrb/event_loop.rb, line 398 def every(period,start=true,&block) timer = Timer.new(self,period,&block) timer.start if start # adds itself to the event loop timer end
Executes the given block every step from the event loop thread.
@return [Event] The event
# File lib/utilrb/event_loop.rb, line 407 def every_step(&block) add_event Event.new(block),true end
Starts the event loop with the given period. If a code block is given it will be executed at the end of each step. This method will block until stop is called
@param [Float] period The period @yield The code block
# File lib/utilrb/event_loop.rb, line 516 def exec(period=0.05,&block) @stop = false reset_timers while !@stop last_step = Time.now step(last_step,&block) diff = (Time.now-last_step).to_f sleep(period-diff) if diff < period && !@stop end end
# File lib/utilrb/event_loop.rb, line 679 def handle_error(error,save = true) call do on_error = @mutex.synchronize do @on_error.find_all{|key,e| error.is_a? key}.map(&:last).flatten end on_error.each do |handler| handler.call error end @errors << error if save == true end end
Errors caught during event loop callbacks are forwarded to registered code blocks. The code block is called from the event loop thread.
@param @error_class The error class the block should be called for @yield [exception] The code block
# File lib/utilrb/event_loop.rb, line 417 def on_error(error_class,&block) @mutex.synchronize do @on_error[error_class] ||= [] @on_error[error_class] << block end end
Errors caught during event loop callbacks are forwarded to registered code blocks. The code blocks are called from the event loop thread.
@param @error_classes The error classes the block should be called for @yield [exception] The code block
# File lib/utilrb/event_loop.rb, line 430 def on_errors(*error_classes,&block) error_classes.flatten! error_classes.each do |error_class| on_error(error_class,&block) end end
Executes the given block in the next step from the event loop thread. Returns a Timer
object if a delay is set otherwise an handler to the Event
which was queued.
@yield [] The code block. @return [Utilrb::EventLoop::Timer,Event]
# File lib/utilrb/event_loop.rb, line 359 def once(delay=nil,&block) raise ArgumentError "no block given" unless block if delay && delay > 0 timer = Timer.new(self,delay,true,&block) timer.start(timer.period, false) else add_event(Event.new(block)) end end
# File lib/utilrb/event_loop.rb, line 577 def reraise_error(error) raise error, error.message, (error.backtrace || []) + caller(1) end
Resets all timers to fire not before their hole period is passed counting from the given point in time.
@param [Time] time The time
# File lib/utilrb/event_loop.rb, line 502 def reset_timers(time = Time.now) @mutex.synchronize do @timers.each do |timer| timer.reset time end end end
Shuts down the thread pool
# File lib/utilrb/event_loop.rb, line 573 def shutdown() thread_pool.shutdown() end
Handles all current events and timers. If a code block is given it will be executed at the end.
@param [Time] time The time the step is executed for. @yield The code block
# File lib/utilrb/event_loop.rb, line 586 def step(time = Time.now,&block) validate_thread reraise_error(@errors.shift) if !@errors.empty? #copy all work otherwise it would not be allowed to #call any event loop functions from a timer timers,call = @mutex.synchronize do @every_cylce_events.delete_if(&:ignore?) @every_cylce_events.each do |event| add_event event end # check all timers temp_timers = @timers.find_all do |timer| timer.timeout?(time) end # delete single shot timer which elapsed @timers -= temp_timers.find_all(&:single_shot?) [temp_timers,block] end # handle all current events but not the one which are added during processing. # Step is recursively be called if wait_for is used insight an event code block. # To make sure that all events and timer are processed in the right order # @number_of_events_to_process and a second timeout check is used. @number_of_events_to_process = [@events.size,@number_of_events_to_process].max while @number_of_events_to_process > 0 event = @events.pop @number_of_events_to_process -= 1 handle_errors{event.call} unless event.ignore? end timers.each do |timer| next if timer.stopped? handle_errors{timer.call(time)} if timer.timeout?(time) end handle_errors{call.call} if call reraise_error(@errors.shift) if !@errors.empty? #allow thread pool to take over Thread.pass end
Steps with the given period until all worker thread are waiting for work
@param [Float] period Ther period @param (@see step
)
# File lib/utilrb/event_loop.rb, line 555 def steps(period = 0.05,max_time=1.0,&block) start = Time.now begin last_step = Time.now step(last_step,&block) time = Time.now break if max_time && max_time <= (time-start).to_f diff = (time-last_step).to_f sleep(period-diff) if diff < period && !@stop end while (thread_pool.process? || events?) end
Stops the EventLoop
after [#exec] was called.
# File lib/utilrb/event_loop.rb, line 528 def stop @stop = true end
(see ThreadPool#sync
)
# File lib/utilrb/event_loop.rb, line 226 def sync(sync_key,*args,&block) thread_pool.sync(sync_key,*args,&block) end
(see ThreadPool#sync_timeout
)
# File lib/utilrb/event_loop.rb, line 231 def sync_timeout(sync_key,timeout,*args,&block) thread_pool.sync_timeout(sync_key,timeout,*args,&block) end
Returns true if the current thread is the event loop thread.
@return [Boolean]
# File lib/utilrb/event_loop.rb, line 449 def thread? @mutex.synchronize do if Thread.current == @thread true else false end end end
Returns true if the given timer is running.
@param [Timer] timer The timer. @return [Boolean]
# File lib/utilrb/event_loop.rb, line 473 def timer?(timer) @mutex.synchronize do @timers.include? timer end end
Returns all currently running timers.
@return Array
<Timer>
# File lib/utilrb/event_loop.rb, line 482 def timers @mutex.synchronize do @timers.dup end end
Raises if the current thread is not the event loop thread (by default the one the event loop was started from).
@raise [RuntimeError]
# File lib/utilrb/event_loop.rb, line 441 def validate_thread raise "current thread is not the event loop thread" if !thread? end
Steps with the given period until the given block returns true.
@param [Float] period The period @param [Float] timeout The timeout in seconds @yieldreturn [Boolean]
# File lib/utilrb/event_loop.rb, line 538 def wait_for(period=0.05,timeout=nil,&block) start = Time.now old_stop = @stop exec period do stop if block.call if timeout && timeout <= (Time.now-start).to_f raise RuntimeError,"Timeout during wait_for" end end @stop = old_stop end
Private Instance Methods
Calls the given block and rescues all errors which can be handled by the added error handler. If an error cannot be handled it is stored and re raised after all events and timers are processed. If more than one error occurred which cannot be handled they are stored until the next step is called and re raised until all errors are processed.
@info This method must be called from the event loop thread, otherwise
all error handlers would be called from the wrong thread
@yield The code block. @see error_handler
# File lib/utilrb/event_loop.rb, line 703 def handle_errors(&block) block.call rescue Exception => e handle_error(e,true) end