class Hawkei::Processor::Worker
Constants
- FLUSH_INTERVAL_SECONDS
- FLUSH_MESSAGE
- SHUTDOWN_MESSAGE
Public Class Methods
new(queue, state)
click to toggle source
# File lib/hawkei/processor/worker.rb, line 10 def initialize(queue, state) @queue = queue @state = state @batch = Batch.new @promises = Concurrent::Array.new @timer = Concurrent::TimerTask.new(execution_interval: FLUSH_INTERVAL_SECONDS) { @queue << FLUSH_MESSAGE } @timer.execute end
Public Instance Methods
run()
click to toggle source
# File lib/hawkei/processor/worker.rb, line 21 def run while thread_active? message = @queue.pop add_message_to_batch(message) end shutdown_worker end
Private Instance Methods
add_message_to_batch(message)
click to toggle source
Add the message to a batch
-
Add message to a batch if it's not a flush message
-
flush batch if flush message is receive or batch is full
# File lib/hawkei/processor/worker.rb, line 38 def add_message_to_batch(message) @batch << message if message != FLUSH_MESSAGE flush if message == FLUSH_MESSAGE || @batch.full? end
flush()
click to toggle source
rubocop:enable Lint/HandleExceptions
# File lib/hawkei/processor/worker.rb, line 69 def flush return if @batch.empty? send_batch(@batch) @batch = Batch.new end
logger()
click to toggle source
# File lib/hawkei/processor/worker.rb, line 107 def logger Hawkei.configurations.logger end
send_batch(batch)
click to toggle source
rubocop:disable Metrics/AbcSize
# File lib/hawkei/processor/worker.rb, line 77 def send_batch(batch) promise = Concurrent::Promise .new { Hawkei::Batch.create(Message.base.merge(data: batch.messages)) } .on_success { @promises.delete(promise) } .rescue do |error| batch.update_retry if error.is_a?(Hawkei::RequestError) && [401, 404].include?(error.http_status) return logger.error(error.message) end if thread_active? && batch.can_retry? Concurrent::ScheduledTask.new(batch.next_retry) { send_batch(batch) }.execute else @promises.delete(promise) end end promise.execute @promises << promise rescue Concurrent::RejectedExecutionError => _ logger.error('Impossible to start a thread in closing application') end
shutdown_worker()
click to toggle source
Shutdown the worker
-
Close the timer
-
Retreive the last messages
-
Wait for all the request to be completed
rubocop:disable Lint/HandleExceptions
# File lib/hawkei/processor/worker.rb, line 51 def shutdown_worker @timer.shutdown begin until (message = @queue.pop(true)).nil? add_message_to_batch(message) end rescue ThreadError => _ end flush @promises.each do |promise| promise.wait if promise && !promise.fulfilled? end end
thread_active?()
click to toggle source
rubocop:enable Metrics/AbcSize
# File lib/hawkei/processor/worker.rb, line 103 def thread_active? @state.true? end