class CronoTrigger::PollingThread
Public Class Methods
new(model_queue, stop_flag, logger, executor, execution_counter)
click to toggle source
# File lib/crono_trigger/polling_thread.rb, line 3 def initialize(model_queue, stop_flag, logger, executor, execution_counter) @model_queue = model_queue @stop_flag = stop_flag @logger = logger @executor = executor @execution_counter = execution_counter @quiet = Concurrent::AtomicBoolean.new(false) end
Public Instance Methods
alive?()
click to toggle source
# File lib/crono_trigger/polling_thread.rb, line 48 def alive? @thread.alive? end
join()
click to toggle source
# File lib/crono_trigger/polling_thread.rb, line 36 def join @thread.join end
poll(model)
click to toggle source
# File lib/crono_trigger/polling_thread.rb, line 52 def poll(model) @logger.debug "(polling-thread-#{Thread.current.object_id}) Poll #{model}" records = [] overflowed_record_ids = [] begin model.connection_pool.with_connection do records = model.executables_with_lock end records.each do |record| begin @executor.post do @execution_counter.increment begin process_record(record) ensure @execution_counter.decrement end end rescue Concurrent::RejectedExecutionError overflowed_record_ids << record.id end end unlock_overflowed_records(model, overflowed_record_ids) end while overflowed_record_ids.empty? && records.any? end
quiet()
click to toggle source
# File lib/crono_trigger/polling_thread.rb, line 40 def quiet @quiet.make_true end
quiet?()
click to toggle source
# File lib/crono_trigger/polling_thread.rb, line 44 def quiet? @quiet.true? end
run()
click to toggle source
# File lib/crono_trigger/polling_thread.rb, line 12 def run @thread = Thread.start do @logger.info "(polling-thread-#{Thread.current.object_id}) Start polling thread" until @stop_flag.wait_for_set(CronoTrigger.config.polling_interval) next if quiet? CronoTrigger.reloader.call do begin model_name = @model_queue.pop(true) model = model_name.classify.constantize poll(model) rescue ThreadError => e @logger.error(e) unless e.message == "queue empty" rescue => ex @logger.error(ex) CronoTrigger::GlobalExceptionHandler.handle_global_exception(ex) ensure @model_queue << model_name if model_name end end end end end
Private Instance Methods
process_record(record)
click to toggle source
# File lib/crono_trigger/polling_thread.rb, line 82 def process_record(record) record.class.connection_pool.with_connection do @logger.info "(executor-thread-#{Thread.current.object_id}) Execute #{record.class}-#{record.id}" record.do_execute end rescue Exception => ex @logger.error(ex) CronoTrigger::GlobalExceptionHandler.handle_global_exception(ex) end
unlock_overflowed_records(model, overflowed_record_ids)
click to toggle source
# File lib/crono_trigger/polling_thread.rb, line 92 def unlock_overflowed_records(model, overflowed_record_ids) model.connection_pool.with_connection do unless overflowed_record_ids.empty? model.where(id: overflowed_record_ids).crono_trigger_unlock_all! end end rescue ActiveRecord::ConnectionNotEstablished, ActiveRecord::LockWaitTimeout, ActiveRecord::StatementTimeout, ActiveRecord::Deadlocked sleep 1 retry end