class LogStash::Outputs::Qingstor

Constants

CRASH_RECOVERY_THREADPOOL
PERIODIC_CHECK_INTERVAL_IN_SECONDS

Public Instance Methods

clean_temporary_file(file) click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 224
def clean_temporary_file(file)
  @logger.info('Callback: removing temporary file', :file => file.path)
  file.delete!
end
close() click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 196
def close
  stop_periodic_check if @rotation.needs_periodic?

  @logger.info('uploading current workspace before closing')
  @file_repository.each_files do |file|
    upload_file(file) if file.size > 0
  end

  @file_repository.shutdown

  @uploader.stop

  @crash_uploader.stop if @restore
end
directory_valid?(path) click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 247
def directory_valid?(path)
  FileUtils.mkdir_p(path) unless Dir.exist?(path)
  ::File.writable?(path)
rescue
  false
end
getbucket() click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 189
def getbucket
  @qs_config = QingStor::SDK::Config.init @access_key_id, @secret_access_key
  @qs_config.update(:host => @host, :port => @port) unless @host.nil?
  @qs_service = QingStor::SDK::Service.new @qs_config
  @qs_service.bucket @bucket, @region
end
log_print_config() click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 131
def log_print_config
  @logger.info('Run at setting: ', :prefix => @prefix,
                                   :tmpdir => @tmpdir,
                                   :rotation => @rotation.to_s,
                                   :tags => @tags,
                                   :encoding => @encoding,
                                   :restore => @restore)
end
multi_receive_encoded(events_and_encoded) click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 140
def multi_receive_encoded(events_and_encoded)
  prefix_written_to = Set.new

  events_and_encoded.each do |event, encoded|
    prefix_key = event.sprintf(@prefix)
    prefix_written_to << prefix_key

    begin
      @file_repository.get_file(prefix_key) do |f|
        content = encoded.strip + "\n"
        f.write(content)
      end
    rescue Errno::ENOSPC => e
      @logger.error('QingStor: Nospace left in temporary directory',
                    :tmpdir => @tmpdir)
      raise e
    end
  end # end of each method

  # check the file after file writing
  # Groups IO calls to optimize fstat checks
  rotate_if_needed(prefix_written_to)
end
register() click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 106
def register
  QingstorValidator.prefix_valid?(@prefix) unless @prefix.nil?
  unless directory_valid?(@tmpdir)
    raise LogStash::ConfigurationError,
          "Logstash must have the permissions to write to: #{@tmpdir}"
  end

  @file_repository = FileRepository.new(@tags, @encoding, @tmpdir)
  @rotation = RotationPolicy.new(@rotation_strategy, @file_size, @file_time)
  executor = Concurrent::ThreadPoolExecutor.new(
    :min_threads => 1,
    :max_threads => @upload_workers_count,
    :max_queue => @upload_queue_size,
    :fallback_policy => :caller_runs
  )

  @qs_bucket = getbucket
  QingstorValidator.bucket_valid?(@qs_bucket)
  @uploader = Uploader.new(@qs_bucket, @logger, executor)

  log_print_config
  start_periodic_check if @rotation.needs_periodic?
  restore_from_crash if @restore
end
restore_from_crash() click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 254
def restore_from_crash
  @crash_uploader = Uploader.new(@qs_bucket, @logger,
                                 CRASH_RECOVERY_THREADPOOL)

  temp_folder_path = Pathname.new(@tmpdir)
  Dir.glob(::File.join(@tmpdir, '**/*'))
     .select { |file| ::File.file?(file) }
     .each do |file|
    temp_file = TemporaryFile.create_from_existing_file(file,
                                                        temp_folder_path)
    # Now multipart uploader supports file size up to 500GB
    if temp_file.size > 0
      temp_file.key = 'Restored/' + Time.new.strftime('%Y-%m-%d/') + temp_file.key
      @logger.info('Recoving from crash and uploading',
                  :file => temp_file.key)
      @crash_uploader.upload_async(
        temp_file,
        :on_complete => method(:clean_temporary_file),
        :upload_options => upload_options
      )
    elsif temp_file.size == 0
      @logger.info('Recoving from crash, delete empty files',
                  :file => temp_file.path)
      temp_file.delete!
    end
  end
end
rotate_if_needed(prefixs) click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 164
def rotate_if_needed(prefixs)
  prefixs.each do |prefix|
    @file_repository.get_factory(prefix) do |factory|
      tmp_file = factory.current

      if @rotation.rotate?(tmp_file)
        @logger.debug('Rotate file',
                      :strategy => tmp_file.key,
                      :path => tmp_file.path)
        upload_file(tmp_file)
        factory.rotate!
      end
    end
  end
end
start_periodic_check() click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 229
def start_periodic_check
  @logger.info('Start periodic rotation check')

  @periodic_check = Concurrent::TimerTask.new(
    :execution_interval => PERIODIC_CHECK_INTERVAL_IN_SECONDS
  ) do
    @logger.debug('Periodic check for stale files')

    rotate_if_needed(@file_repository.keys)
  end

  @periodic_check.execute
end
stop_periodic_check() click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 243
def stop_periodic_check
  @periodic_check.shutdown
end
upload_file(file) click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 180
def upload_file(file)
  @logger.debug('Add file to uploading queue', :key => file.key)
  file.close
  @logger.debug('upload options', :upload_options => upload_options)
  @uploader.upload_async(file,
                         :on_complete => method(:clean_temporary_file),
                         :upload_options => upload_options)
end
upload_options() click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 211
def upload_options
  options = {
    :content_encoding => @encoding == 'gzip' ? 'gzip' : nil
  }

  if @server_side_encryption_algorithm == 'AES256' && !@customer_key.nil?
    options[:server_side_encryption_algorithm] = @server_side_encryption_algorithm
    options[:customer_key] = @customer_key
  end

  options
end