class Turnstile::Collector::Controller

Attributes

actors[RW]
options[RW]
threads[RW]

Public Class Methods

new(*args) click to toggle source
# File lib/turnstile/collector/controller.rb, line 15
def initialize(*args)
  self.options = args.last.is_a?(Hash) ? args.pop : {}

  options.verbose ? Turnstile::Logger.enable : Turnstile::Logger.disable

  wait_for_file(file)

  self.actors = [self.reader, self.flusher]

  Daemonize.daemonize if options[:daemonize]
  STDOUT.sync = true if options[:verbose]
end

Public Instance Methods

config() click to toggle source
# File lib/turnstile/collector/controller.rb, line 67
def config
  @config ||= Turnstile.config
end
file() click to toggle source
# File lib/turnstile/collector/controller.rb, line 49
def file
  options.file
end
flusher() click to toggle source
# File lib/turnstile/collector/controller.rb, line 37
def flusher
  @flusher ||= Flusher.new(**flusher_arguments)
end
queue() click to toggle source
# File lib/turnstile/collector/controller.rb, line 45
def queue
  @queue ||= Queue.new
end
reader() click to toggle source
# File lib/turnstile/collector/controller.rb, line 53
def reader
  opts    = reader_arguments
  matcher = opts.delete(:matcher).to_sym
  @reader ||= if log_reader_class.respond_to?(matcher)
                log_reader_class.send(matcher, file, queue, **opts)
              else
                raise ArgumentError, "Invalid matcher #{matcher}, args #{reader_args}"
              end
end
start() click to toggle source
# File lib/turnstile/collector/controller.rb, line 28
def start
  self.threads = actors.map(&:start)
  threads.map(&:join)
end
stop() click to toggle source
# File lib/turnstile/collector/controller.rb, line 33
def stop
  actors.each(&:shutdown)
end
symbolize(opts) click to toggle source
# File lib/turnstile/collector/controller.rb, line 63
def symbolize(opts)
  Hashie::Extensions::SymbolizeKeys.symbolize_keys(opts.to_h)
end
tracker() click to toggle source
# File lib/turnstile/collector/controller.rb, line 41
def tracker
  @tracker ||= Turnstile::Tracker.new
end

Private Instance Methods

actor_argument_hash() click to toggle source
# File lib/turnstile/collector/controller.rb, line 87
def actor_argument_hash
  options.merge(queue: queue, tracker: tracker)
end
flusher_arguments() click to toggle source
# File lib/turnstile/collector/controller.rb, line 73
def flusher_arguments
  symbolize(actor_argument_hash.merge(sleep_when_idle: config.flush_interval))
end
log_reader_class() click to toggle source
# File lib/turnstile/collector/controller.rb, line 103
def log_reader_class
  Turnstile::Collector::LogReader
end
reader_arguments() click to toggle source
# File lib/turnstile/collector/controller.rb, line 77
def reader_arguments
  reader_args_hash   = actor_argument_hash.merge(sleep_when_idle: config.flush_interval)
  matcher, delimiter = select_matcher

  reader_args_hash.merge!(delimiter: delimiter) if delimiter
  reader_args_hash.merge!(matcher: matcher) if matcher

  symbolize(reader_args_hash)
end
select_matcher() click to toggle source
# File lib/turnstile/collector/controller.rb, line 107
def select_matcher
  matcher   = :default
  delimiter = nil

  if options[:delimiter]
    matcher   = :delimited
    delimiter = options[:delimiter]
  elsif options[:filetype]
    matcher = options[:filetype].to_sym
  end
  return matcher, delimiter
end
wait_for_file(file) click to toggle source
# File lib/turnstile/collector/controller.rb, line 91
def wait_for_file(file)
  sleep_period = 1
  while !::File.exist?(file)
    terr "File #{file.bold.yellow} does not exist, waiting for it to appear..."
    terr 'Press Ctrl-C to abort.' if sleep_period == 1

    sleep sleep_period
    sleep_period *= 1.2
  end
  tdb "Detected file #{file.bold.yellow} now exists, continue..."
end