class LogStash::Inputs::Acquia

Constants

VERSION

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/acquia.rb, line 23
def register
  @cloud = ::Acquia::Cloud.new(:credentials => "#{@username}:#{@api_key}")
  @acsite = @cloud.site(@site)
end
run(queue) click to toggle source
# File lib/logstash/inputs/acquia.rb, line 28
def run(queue)
  @streams = {}
  @environments.each do |env|
    @streams[env] = get_stream(env)
  end
  Stud.interval(@interval) do
    @streams.each do |env, stream|
      begin
        stream.each_log do |log|
          # p log
          event = generate_event(env, log)
          decorate(event)
          queue << event
        end
      rescue Errno::EPIPE
        @logger.warn("Detected a broken pipe for #{env} on #{@site}, reconnecting.")
        @streams[env] = get_stream(env)
      end
    end
  end
end
stop() click to toggle source
# File lib/logstash/inputs/acquia.rb, line 50
def stop
  @logger.info 'Closing log streams.'
  @streams.each do |_, stream|
    stream.close
  end
end

Private Instance Methods

generate_event(env, log) click to toggle source
# File lib/logstash/inputs/acquia.rb, line 70
def generate_event(env, log)
  # Remove useless API cruft.
  log.delete 'cmd'

  # Save the environment this message is coming from.
  log['acquia'] = {
      'site' => @acsite.name,
      'environment' => env,
  }

  # Rename some of Acquia's parameters to more relevant Logstash names.
  log['host'] = log.delete('server')
  log['message'] = log.delete('text')

  # Trim off duplicated request id if Acquia has already provided it
  # separately.
  if log['request_id']
    matches = log['message'].match %r{\s+request_id="#{log['request_id']}"\s+$}
    if matches
      log['message'] = log['message'][0, log['message'].length - matches[0].length]
    end
  end

  timestamp = log.delete('disp_time')
  if timestamp
    begin
      log['@timestamp'] = Time.parse(timestamp + ' +0000').iso8601
    rescue ArgumentError
      # Not a valid timestamp. Oh well. Clean up, just in case.
      log.delete '@timestamp'
    end
  end

  LogStash::Event.new(log)
end
get_stream(env) click to toggle source
# File lib/logstash/inputs/acquia.rb, line 58
def get_stream(env)
  @logger.info "Opening log stream for #{env}."
  stream = @acsite.environment(env).logstream
  @types.each do |type|
    stream.enable_type type
  end
  stream.keepalive_duration = @interval * 2
  stream.debug if @debug
  stream.connect
  stream
end