class LogStash::Outputs::Swift::FileRepository

Constants

DEFAULT_STALE_TIME_SECS
DEFAULT_STATE_SWEEPER_INTERVAL_SECS

Public Class Methods

new(tags, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) click to toggle source
# File lib/logstash/outputs/swift/file_repository.rb, line 57
def initialize(tags, encoding, temporary_directory,
               stale_time = DEFAULT_STALE_TIME_SECS,
               sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
  # The path need to contains the prefix so when we start
  # logtash after a crash we keep the remote structure
  @prefixed_factories =  ConcurrentHashMap.new

  @sweeper_interval = sweeper_interval

  @factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time)

  start_stale_sweeper
end

Public Instance Methods

each_files() { |current| ... } click to toggle source
# File lib/logstash/outputs/swift/file_repository.rb, line 75
def each_files
  @prefixed_factories.elements.each do |prefixed_file|
    prefixed_file.with_lock { |factory| yield factory.current }
  end
end
get_factory(prefix_key) { |factory| ... } click to toggle source

Return the file factory

# File lib/logstash/outputs/swift/file_repository.rb, line 82
def get_factory(prefix_key)
  @prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory }
end
get_file(prefix_key) { |current| ... } click to toggle source
# File lib/logstash/outputs/swift/file_repository.rb, line 86
def get_file(prefix_key)
  get_factory(prefix_key) { |factory| yield factory.current }
end
keys() click to toggle source
# File lib/logstash/outputs/swift/file_repository.rb, line 71
def keys
  @prefixed_factories.keySet
end
remove_stale(k, v) click to toggle source
# File lib/logstash/outputs/swift/file_repository.rb, line 98
def remove_stale(k, v)
  if v.stale?
    @prefixed_factories.remove(k, v)
    v.delete!
  end
end
shutdown() click to toggle source
# File lib/logstash/outputs/swift/file_repository.rb, line 90
def shutdown
  stop_stale_sweeper
end
size() click to toggle source
# File lib/logstash/outputs/swift/file_repository.rb, line 94
def size
  @prefixed_factories.size
end
start_stale_sweeper() click to toggle source
# File lib/logstash/outputs/swift/file_repository.rb, line 105
def start_stale_sweeper
  @stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
    LogStash::Util.set_thread_name("S3, Stale factory sweeper")

    @prefixed_factories.forEach{|k,v| remove_stale(k,v)}
  end

  @stale_sweeper.execute
end
stop_stale_sweeper() click to toggle source
# File lib/logstash/outputs/swift/file_repository.rb, line 115
def stop_stale_sweeper
  @stale_sweeper.shutdown
end