class Hawkei::Processor::Worker

Constants

FLUSH_INTERVAL_SECONDS
FLUSH_MESSAGE
SHUTDOWN_MESSAGE

Public Class Methods

new(queue, state) click to toggle source
# File lib/hawkei/processor/worker.rb, line 10
def initialize(queue, state)
  @queue = queue
  @state = state

  @batch = Batch.new
  @promises = Concurrent::Array.new

  @timer = Concurrent::TimerTask.new(execution_interval: FLUSH_INTERVAL_SECONDS) { @queue << FLUSH_MESSAGE }
  @timer.execute
end

Public Instance Methods

run() click to toggle source
# File lib/hawkei/processor/worker.rb, line 21
def run
  while thread_active?
    message = @queue.pop

    add_message_to_batch(message)
  end

  shutdown_worker
end

Private Instance Methods

add_message_to_batch(message) click to toggle source

Add the message to a batch

  • Add message to a batch if it's not a flush message

  • flush batch if flush message is receive or batch is full

# File lib/hawkei/processor/worker.rb, line 38
def add_message_to_batch(message)
  @batch << message if message != FLUSH_MESSAGE
  flush if message == FLUSH_MESSAGE || @batch.full?
end
flush() click to toggle source

rubocop:enable Lint/HandleExceptions

# File lib/hawkei/processor/worker.rb, line 69
def flush
  return if @batch.empty?

  send_batch(@batch)
  @batch = Batch.new
end
logger() click to toggle source
# File lib/hawkei/processor/worker.rb, line 107
def logger
  Hawkei.configurations.logger
end
send_batch(batch) click to toggle source

rubocop:disable Metrics/AbcSize

# File lib/hawkei/processor/worker.rb, line 77
def send_batch(batch)
  promise =
    Concurrent::Promise
    .new { Hawkei::Batch.create(Message.base.merge(data: batch.messages)) }
    .on_success { @promises.delete(promise) }
    .rescue do |error|
      batch.update_retry

      if error.is_a?(Hawkei::RequestError) && [401, 404].include?(error.http_status)
        return logger.error(error.message)
      end

      if thread_active? && batch.can_retry?
        Concurrent::ScheduledTask.new(batch.next_retry) { send_batch(batch) }.execute
      else
        @promises.delete(promise)
      end
    end

  promise.execute
  @promises << promise
rescue Concurrent::RejectedExecutionError => _
  logger.error('Impossible to start a thread in closing application')
end
shutdown_worker() click to toggle source

Shutdown the worker

  • Close the timer

  • Retreive the last messages

  • Wait for all the request to be completed

rubocop:disable Lint/HandleExceptions

# File lib/hawkei/processor/worker.rb, line 51
def shutdown_worker
  @timer.shutdown

  begin
    until (message = @queue.pop(true)).nil?
      add_message_to_batch(message)
    end
  rescue ThreadError => _
  end

  flush

  @promises.each do |promise|
    promise.wait if promise && !promise.fulfilled?
  end
end
thread_active?() click to toggle source

rubocop:enable Metrics/AbcSize

# File lib/hawkei/processor/worker.rb, line 103
def thread_active?
  @state.true?
end