class TrRMIte::Consumers::Scheduler
Public Class Methods
default_name()
click to toggle source
# File lib/TrRMIte/consumers/scheduler.rb, line 8 def self.default_name 'EVENT_SCHEDULER' end
new(topic, registry_uri, queue_uri, interval, state_store)
click to toggle source
Calls superclass method
TrRMIte::Consumer::new
# File lib/TrRMIte/consumers/scheduler.rb, line 12 def initialize(topic, registry_uri, queue_uri, interval, state_store) super(default_name, topic, registry_uri) @scheduled_events = [] @semaphore = Mutex.new @queue_uri = queue_uri @interval = interval @state_store = state_store end
Public Instance Methods
run()
click to toggle source
Calls superclass method
TrRMIte::Consumer#run
# File lib/TrRMIte/consumers/scheduler.rb, line 24 def run @scheduler = Thread.new do Thread.current.name = 'TRMT/publisher' producer = Producer.new(@queue_uri) loop do @semaphore.synchronize do publish_overdue_events(producer) end sleep(@interval) end end super @semaphore.synchronize do @scheduler.terminate end end
Private Instance Methods
finalize()
click to toggle source
# File lib/TrRMIte/consumers/scheduler.rb, line 72 def finalize # we already persisted the state... nothing to do! end
persist_state(_event)
click to toggle source
# File lib/TrRMIte/consumers/scheduler.rb, line 64 def persist_state(_event) write_state_store end
publish_overdue_events(producer)
click to toggle source
# File lib/TrRMIte/consumers/scheduler.rb, line 76 def publish_overdue_events(producer) wall_clock_time = TrRMIte.wall_clock_time overdue_events = @scheduled_events.find_all { |event| event[:publish_at] <= wall_clock_time } overdue_events.each do |event| producer.publish_scheduled(event) @scheduled_events.delete(event) end write_state_store if overdue_events.any? end
restore_state(_sequence)
click to toggle source
# File lib/TrRMIte/consumers/scheduler.rb, line 51 def restore_state(_sequence) if File.exist? @state_store json_state = File.read(@state_store) return if json_state.strip.empty? stored_events = JSON.parse(json_state, symbolize_names: true) return if stored_events.nil? restored_events = stored_events.map { |event| Event.from(event) } @semaphore.synchronize do @scheduled_events += restored_events end end end
write_state_store()
click to toggle source
# File lib/TrRMIte/consumers/scheduler.rb, line 68 def write_state_store File.write(@state_store, @scheduled_events.to_json) end