class LogStash::Inputs::Jdbc

Attributes

database[R]

Public Instance Methods

close() click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 307
def close
  @scheduler.shutdown if @scheduler
end
register() click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 235
def register
  @logger = self.logger
  require "rufus/scheduler"
  prepare_jdbc_connection

  if @use_column_value
    # Raise an error if @use_column_value is true, but no @tracking_column is set
    if @tracking_column.nil?
      raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.")
    end
  end

  unless @statement.nil? ^ @statement_filepath.nil?
    raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.")
  end

  @statement = ::File.read(@statement_filepath) if @statement_filepath

  # must validate prepared statement mode after trying to read in from @statement_filepath
  if @use_prepared_statements
    validation_errors = validate_prepared_statement_mode
    unless validation_errors.empty?
      raise(LogStash::ConfigurationError, "Prepared Statement Mode validation errors: " + validation_errors.join(", "))
    end
  end

  set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self))
  set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger))

  @enable_encoding = !@charset.nil? || !@columns_charset.empty?

  if (@jdbc_password_filepath and @jdbc_password)
    raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.")
  end

  @jdbc_password = LogStash::Util::Password.new(::File.read(@jdbc_password_filepath).strip) if @jdbc_password_filepath

  if enable_encoding?
    encodings = @columns_charset.values
    encodings << @charset if @charset

    @converters = encodings.each_with_object({}) do |encoding, converters|
      converter = LogStash::Util::Charset.new(encoding)
      converter.logger = self.logger
      converters[encoding] = converter
    end
  end
end
run(queue) click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 293
def run(queue)
  load_driver
  if @schedule
    @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
    @scheduler.cron @schedule do
      execute_query(queue)
    end

    @scheduler.join
  else
    execute_query(queue)
  end
end
set_statement_logger(instance) click to toggle source

test injection points

# File lib/logstash/inputs/jdbc.rb, line 285
def set_statement_logger(instance)
  @statement_handler = LogStash::PluginMixins::Jdbc::StatementHandler.build_statement_handler(self, instance)
end
set_value_tracker(instance) click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 289
def set_value_tracker(instance)
  @value_tracker = instance
end
stop() click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 311
def stop
  close_jdbc_connection
  @scheduler.shutdown(:wait) if @scheduler
end

Private Instance Methods

convert(column_name, value) click to toggle source

make sure the encoding is uniform over fields

# File lib/logstash/inputs/jdbc.rb, line 354
def convert(column_name, value)
  return value unless value.is_a?(String)
  column_charset = @columns_charset[column_name]
  if column_charset
    converter = @converters[column_charset]
    converter.convert(value)
  elsif @charset
    converter = @converters[@charset]
    converter.convert(value)
  else
    value
  end
end
enable_encoding?() click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 349
def enable_encoding?
  @enable_encoding
end
execute_query(queue) click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 334
def execute_query(queue)
  execute_statement do |row|
    if enable_encoding?
      ## do the necessary conversions to string elements
      row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }]
    end
    event = targeted_event_factory.new_event(row)
    decorate(event)
    queue << event
  end
  @value_tracker.write
end
validate_prepared_statement_mode() click to toggle source
# File lib/logstash/inputs/jdbc.rb, line 318
def validate_prepared_statement_mode
  error_messages = []
  if @prepared_statement_name.empty?
    error_messages << "must provide a name for the Prepared Statement, it must be unique for the db session"
  end
  if @statement.count("?") != @prepared_statement_bind_values.size
    # mismatch in number of bind value elements to placeholder characters
    error_messages << "there is a mismatch between the number of statement `?` placeholders and :prepared_statement_bind_values array setting elements"
  end
  if @jdbc_paging_enabled
    # Pagination is not supported when using prepared statements
    error_messages << "JDBC pagination cannot be used at this time"
  end
  error_messages
end