class LogStash::Outputs::Charrington
This class is responsible for setting things up, creating the connection, and handling retries. Charrington::Insert
is where the insert is attempted. If that fails, it will try to either create a table via Charrington::CreateTable or alter an existing one via Charrington::AlterTable
Constants
- STRFTIME_FMT
Public Instance Methods
close()
click to toggle source
Calls superclass method
# File lib/logstash/outputs/charrington.rb, line 121 def close @stopping.make_true @pool.close super end
multi_receive(events)
click to toggle source
# File lib/logstash/outputs/charrington.rb, line 99 def multi_receive(events) events.each do |event| connection = get_connection break unless connection schema = get_schema(event) opts = { connection: connection, schema: schema, max_retries: @max_flush_exceptions, retry_initial_interval: @retry_initial_interval, driver: driver, transformer: @transformer } Charrington::Process.call(connection, event, opts) rescue => e @logger.error("Unable to process event. Event dropped. #{e.message}") next ensure connection.close unless connection.nil? end end
register()
click to toggle source
# File lib/logstash/outputs/charrington.rb, line 90 def register @logger.info('JDBC - Starting up') load_jar_files! @stopping = Concurrent::AtomicBoolean.new(false) setup_and_test_pool! end
Private Instance Methods
driver()
click to toggle source
# File lib/logstash/outputs/charrington.rb, line 129 def driver case @driver_class when /redshift/ "redshift" else "postgres" end end
get_connection()
click to toggle source
# File lib/logstash/outputs/charrington.rb, line 206 def get_connection connection = @pool.getConnection rescue => e log_jdbc_exception(e, true, nil) false end
get_schema(event)
click to toggle source
# File lib/logstash/outputs/charrington.rb, line 191 def get_schema(event) if !@schema.nil? @schema else case event.to_hash["app_name"] when "Web App" "dea_webapp" when "Mobile App" "dea_mobileapp" else "" end end end
load_jar_files!()
click to toggle source
Load jar from driver path
# File lib/logstash/outputs/charrington.rb, line 166 def load_jar_files! unless @driver_jar_path.nil? raise LogStash::ConfigurationError, 'JDBC - Could not find jar file at given path. Check config.' unless File.exist? @driver_jar_path require @driver_jar_path return end # Revert original behaviour of loading from vendor directory if no path given jarpath = if ENV['LOGSTASH_HOME'] File.join(ENV['LOGSTASH_HOME'], '/vendor/jar/jdbc/*.jar') else File.join(File.dirname(__FILE__), '../../../vendor/jar/jdbc/*.jar') end @logger.trace('JDBC - jarpath', path: jarpath) jars = Dir[jarpath] raise LogStash::ConfigurationError, 'JDBC - No jars found. Have you read the README?' if jars.empty? jars.each do |jar| @logger.trace('JDBC - Loaded jar', jar: jar) require jar end end
log_jdbc_exception(exception, retrying, event)
click to toggle source
# File lib/logstash/outputs/charrington.rb, line 213 def log_jdbc_exception(exception, retrying, event) current_exception = exception log_text = 'JDBC - Exception. ' + (retrying ? 'Retrying' : 'Not retrying') log_method = (retrying ? 'warn' : 'error') loop do @logger.send(log_method, log_text, :exception => current_exception, :event => event) if current_exception.respond_to? 'getNextException' current_exception = current_exception.getNextException() else current_exception = nil end break if current_exception == nil end end
setup_and_test_pool!()
click to toggle source
# File lib/logstash/outputs/charrington.rb, line 138 def setup_and_test_pool! @pool = Java::ComZaxxerHikari::HikariDataSource.new @pool.setDriverClassName(@driver_class) if @driver_class @pool.setUsername(@username) if @username @pool.setPassword(@password) if @password @pool.setMaximumPoolSize(@max_pool_size) @pool.setConnectionTimeout(@connection_timeout) @pool.setAutoCommit(@driver_auto_commit) @pool.setJdbcUrl(@connection_string) validate_connection_timeout = (@connection_timeout / 1000) / 2 if !@connection_test_query.nil? and @connection_test_query.length > 1 @pool.setConnectionTestQuery(@connection_test_query) @pool.setConnectionInitSql(@connection_test_query) end return unless @connection_test # Test connection test_connection = @pool.getConnection unless test_connection.isValid(validate_connection_timeout) @logger.warn('JDBC - Connection is not reporting as validate. Either connection is invalid, or driver is not getting the appropriate response.') end test_connection.close end