class LogStash::Outputs::Charrington

This class is responsible for setting things up, creating the connection, and handling retries. Charrington::Insert is where the insert is attempted. If that fails, it will try to either create a table via Charrington::CreateTable or alter an existing one via Charrington::AlterTable

Constants

STRFTIME_FMT

Public Instance Methods

close() click to toggle source
Calls superclass method
# File lib/logstash/outputs/charrington.rb, line 121
def close
  @stopping.make_true
  @pool.close
  super
end
multi_receive(events) click to toggle source
# File lib/logstash/outputs/charrington.rb, line 99
def multi_receive(events)
  events.each do |event|
    connection = get_connection
    break unless connection

    schema = get_schema(event)

    opts = { connection: connection,
             schema: schema,
             max_retries: @max_flush_exceptions,
             retry_initial_interval: @retry_initial_interval,
             driver: driver,
             transformer: @transformer }
    Charrington::Process.call(connection, event, opts)
  rescue => e
    @logger.error("Unable to process event. Event dropped. #{e.message}")
    next
  ensure
    connection.close unless connection.nil?
  end
end
register() click to toggle source
# File lib/logstash/outputs/charrington.rb, line 90
def register
  @logger.info('JDBC - Starting up')

  load_jar_files!
  @stopping = Concurrent::AtomicBoolean.new(false)

  setup_and_test_pool!
end

Private Instance Methods

driver() click to toggle source
# File lib/logstash/outputs/charrington.rb, line 129
def driver
  case @driver_class
  when /redshift/
    "redshift"
  else
    "postgres"
  end
end
get_connection() click to toggle source
# File lib/logstash/outputs/charrington.rb, line 206
def get_connection
  connection = @pool.getConnection
rescue => e
  log_jdbc_exception(e, true, nil)
  false
end
get_schema(event) click to toggle source
# File lib/logstash/outputs/charrington.rb, line 191
def get_schema(event)
  if !@schema.nil?
    @schema
  else
    case event.to_hash["app_name"]
    when "Web App"
      "dea_webapp"
    when "Mobile App"
      "dea_mobileapp"
    else
      ""
    end
  end
end
load_jar_files!() click to toggle source

Load jar from driver path

# File lib/logstash/outputs/charrington.rb, line 166
def load_jar_files!
  unless @driver_jar_path.nil?
    raise LogStash::ConfigurationError, 'JDBC - Could not find jar file at given path. Check config.' unless File.exist? @driver_jar_path
    require @driver_jar_path
    return
  end

  # Revert original behaviour of loading from vendor directory if no path given
  jarpath = if ENV['LOGSTASH_HOME']
              File.join(ENV['LOGSTASH_HOME'], '/vendor/jar/jdbc/*.jar')
            else
              File.join(File.dirname(__FILE__), '../../../vendor/jar/jdbc/*.jar')
            end

  @logger.trace('JDBC - jarpath', path: jarpath)

  jars = Dir[jarpath]
  raise LogStash::ConfigurationError, 'JDBC - No jars found. Have you read the README?' if jars.empty?

  jars.each do |jar|
    @logger.trace('JDBC - Loaded jar', jar: jar)
    require jar
  end
end
log_jdbc_exception(exception, retrying, event) click to toggle source
# File lib/logstash/outputs/charrington.rb, line 213
def log_jdbc_exception(exception, retrying, event)
  current_exception = exception
  log_text = 'JDBC - Exception. ' + (retrying ? 'Retrying' : 'Not retrying')

  log_method = (retrying ? 'warn' : 'error')

  loop do
    @logger.send(log_method, log_text, :exception => current_exception, :event => event)

    if current_exception.respond_to? 'getNextException'
      current_exception = current_exception.getNextException()
    else
      current_exception = nil
    end

    break if current_exception == nil
  end
end
setup_and_test_pool!() click to toggle source
# File lib/logstash/outputs/charrington.rb, line 138
def setup_and_test_pool!
  @pool = Java::ComZaxxerHikari::HikariDataSource.new
  @pool.setDriverClassName(@driver_class) if @driver_class
  @pool.setUsername(@username) if @username
  @pool.setPassword(@password) if @password
  @pool.setMaximumPoolSize(@max_pool_size)
  @pool.setConnectionTimeout(@connection_timeout)
  @pool.setAutoCommit(@driver_auto_commit)
  @pool.setJdbcUrl(@connection_string)

  validate_connection_timeout = (@connection_timeout / 1000) / 2

  if !@connection_test_query.nil? and @connection_test_query.length > 1
    @pool.setConnectionTestQuery(@connection_test_query)
    @pool.setConnectionInitSql(@connection_test_query)
  end

  return unless @connection_test

  # Test connection
  test_connection = @pool.getConnection
  unless test_connection.isValid(validate_connection_timeout)
    @logger.warn('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.')
  end
  test_connection.close
end