class Envoi::Restore::Agent

Constants

DEFAULT_ASPERA_ARGS
DEFAULT_DESTINATION_PATH
DEFAULT_THREAD_LIMIT

Attributes

default_aspera_ascp_args[RW]
default_aspera_ascp_path[RW]

Public Instance Methods

after_initialize() click to toggle source
# File lib/envoi/restore/agent.rb, line 67
def after_initialize
  args = initial_args
  @running = false
  @should_stop = false
  @default_aspera_ascp_path   = args[:default_aspera_ascp_path]
  @default_aspera_args = args[:default_ascp_args] || DEFAULT_ASPERA_ARGS

  @thread_limit = args.fetch(:thread_limit, DEFAULT_THREAD_LIMIT)

  initialize_queue_handler
  initialize_event_handler
end
initialize_event_handler() click to toggle source
# File lib/envoi/restore/agent.rb, line 95
def initialize_event_handler
  @event_handler = GlacierRestoreEventHandler.new(config: sqs_config, logger: logger)
end
initialize_queue_handler() click to toggle source
# File lib/envoi/restore/agent.rb, line 88
def initialize_queue_handler
  @queue_handler = SQSQueueWorker.new(config: sqs_config)
  @queue_handler.poller.before_request do |stats|
    throw :stop_polling if @should_stop
  end
end
run() click to toggle source
# File lib/envoi/restore/agent.rb, line 99
def run
  @running = true
  logger.info { "Running..." }
  @threads = []
  @queue_handler.poll do |messages, stats|
    messages = [ messages ] unless messages.is_a?(Array)
    messages.each do |msg|
      break if @should_stop
      break if @thread_limit && @threads.length >= @thread_limit
      logger.info { "Processing Message #{msg.message_id}... "}
      logger.debug { "Message: #{msg}" }

      message_handler = SQSMessageHandler.new(logger: logger,
                                              queue_handler: @queue_handler.dup,
                                              message: msg.dup,
                                              event_handler: @event_handler.dup)
      t = Thread.new(message_handler) { |mh| mh.process }
      @threads << t
    end
    @threads.keep_if(&:alive?)
    throw :stop_polling if @should_stop
    throw :skip_delete
  end

  @running = false
end
sqs_config() click to toggle source
# File lib/envoi/restore/agent.rb, line 84
def sqs_config
  @sqs_config ||= system_config['sqs'] || { skip_delete: true}
end
stop() click to toggle source
# File lib/envoi/restore/agent.rb, line 126
def stop
  @should_stop = true
end
system_config() click to toggle source
# File lib/envoi/restore/agent.rb, line 80
def system_config
  @system_config ||= config['restore'] || { }
end