class RxRuby::CurrentThreadScheduler
Represents an object that schedules units of work on the platform’s default scheduler.
Public Class Methods
schedule_required?()
click to toggle source
Gets a value that indicates whether the caller must call a Schedule method.
# File lib/rx_ruby/concurrency/current_thread_scheduler.rb, line 20 def self.schedule_required? @@thread_local_queue.nil? end
Private Class Methods
queue()
click to toggle source
# File lib/rx_ruby/concurrency/current_thread_scheduler.rb, line 54 def queue @@thread_local_queue end
queue=(new_queue)
click to toggle source
# File lib/rx_ruby/concurrency/current_thread_scheduler.rb, line 58 def queue=(new_queue) @@thread_local_queue = new_queue end
run_trampoline(queue)
click to toggle source
# File lib/rx_ruby/concurrency/current_thread_scheduler.rb, line 62 def run_trampoline(queue) while item = queue.shift unless item.cancelled? wait = item.due_time - Scheduler.now.to_i sleep wait if wait > 0 item.invoke unless item.cancelled? end end end
Public Instance Methods
schedule_relative_with_state(state, due_time, action)
click to toggle source
Schedules an action to be executed after dueTime.
# File lib/rx_ruby/concurrency/current_thread_scheduler.rb, line 25 def schedule_relative_with_state(state, due_time, action) raise 'action cannot be nil' unless action dt = self.now.to_i + Scheduler.normalize(due_time) si = ScheduledItem.new self, state, dt, &action local_queue = self.class.queue unless local_queue local_queue = PriorityQueue.new local_queue.push si self.class.queue = local_queue begin self.class.run_trampoline local_queue ensure self.class.queue = nil end else local_queue.push si end Subscription.create { si.cancel } end