class LogStash::Filters::Jdbc::ReadWriteDatabase

Public Instance Methods

build_db_object(db_object) click to toggle source
# File lib/logstash/filters/jdbc/read_write_database.rb, line 37
def build_db_object(db_object)
  begin
    @rwlock.writeLock().lock()
    db_object.build(@db)
    if db_object.index_columns.empty?
      logger.warn("local_db_object '#{db_object.name}': `index_columns` is optional but on larger datasets consider adding an index on the lookup column, it will improve performance")
    end
  rescue *CONNECTION_ERRORS => err
    # we do not raise an error when there is a connection error, we hope that the connection works next time
    logger.error("Connection error when initialising lookup db", :db_object => db_object.inspect, :exception => err.message, :backtrace => err.backtrace.take(8))
  rescue ::Sequel::Error => err
    msg = "Exception when initialising lookup db for db object: #{db_object}"
    logger.error(msg, :exception => err.message, :backtrace => err.backtrace.take(8))
    raise wrap_error(LoaderJdbcException, err, msg)
  ensure
    @rwlock.writeLock().unlock()
  end
end
fetch(statement, parameters) click to toggle source
# File lib/logstash/filters/jdbc/read_write_database.rb, line 22
def fetch(statement, parameters)
  @rwlock.readLock().lock()
  # any exceptions should bubble up because we need to set failure tags etc.
  @db[statement, parameters].all
ensure
  @rwlock.readLock().unlock()
end
populate_all(loaders)
Alias for: repopulate_all
post_create(connection_string, driver_class, driver_library, user, password) click to toggle source
# File lib/logstash/filters/jdbc/read_write_database.rb, line 56
def post_create(connection_string, driver_class, driver_library, user, password)
  mutated_connection_string = connection_string.sub("____", unique_db_name)
  verify_connection(mutated_connection_string, driver_class, driver_library, user, password)
  connect("Connection error when connecting to lookup db")
end
prepare(statement, parameters) click to toggle source
# File lib/logstash/filters/jdbc/read_write_database.rb, line 30
def prepare(statement, parameters)
  @rwlock.readLock().lock()
  @db[statement, parameters].prepare(:select, @id)
ensure
  @rwlock.readLock().unlock()
end
repopulate_all(loaders) click to toggle source
# File lib/logstash/filters/jdbc/read_write_database.rb, line 6
def repopulate_all(loaders)
  case loaders.size
    when 1
      fill_local_table(loaders.first)
    when 2
      fill_local_table(loaders.first)
      fill_local_table(loaders.last)
    else
      loaders.each do |loader|
        fill_local_table(loader)
      end
  end
end
Also aliased as: populate_all

Private Instance Methods

fill_local_table(loader) click to toggle source
# File lib/logstash/filters/jdbc/read_write_database.rb, line 64
def fill_local_table(loader)
  begin
    @rwlock.writeLock().lock()
    start = Time.now.to_f
    records = loader.fetch
    records_size = records.size
    return if records_size.zero?
    logger.info("loader #{loader.id}, fetched #{records_size} records in: #{(Time.now.to_f - start).round(3)} seconds")
    start = Time.now.to_f
    import_file = ::File.join(loader.staging_directory, loader.table.to_s)
    ::File.open(import_file, "w") do |fd|
      dataset = @db[loader.table]
      records.each do |hash|
        array = hash.values.map {|val| dataset.literal(val) }
        fd.puts(array.join(", "))
      end
      fd.fsync
    end
    logger.info("loader #{loader.id}, saved fetched records to import file in: #{(Time.now.to_f - start).round(3)} seconds")
    start = Time.now.to_f
    import_cmd = "CALL SYSCS_UTIL.SYSCS_IMPORT_TABLE (null,'#{loader.table.upcase}','#{import_file}',null,'''',null,1)"
    @db.execute_ddl(import_cmd)
    FileUtils.rm_f(import_file)
    logger.info("loader #{loader.id}, imported all fetched records in: #{(Time.now.to_f - start).round(3)} seconds")
  rescue *CONNECTION_ERRORS => err
    # we do not raise an error when there is a connection error, we hope that the connection works next time
    logger.error("Connection error when filling lookup db from loader #{loader.id}, query results", :exception => err.message, :backtrace => err.backtrace.take(8))
  rescue => err
    # In theory all exceptions in Sequel should be wrapped in Sequel::Error
    # There are cases where exceptions occur in unprotected ensure sections
    msg = "Exception when filling lookup db from loader #{loader.id}, query results, original exception: #{err.class}, original message: #{err.message}"
    logger.error(msg, :backtrace => err.backtrace.take(16))
    raise wrap_error(LoaderJdbcException, err, msg)
  ensure
    @rwlock.writeLock().unlock()
  end
end
post_initialize() click to toggle source
# File lib/logstash/filters/jdbc/read_write_database.rb, line 102
def post_initialize()
  super
  # get a fair reentrant read write lock
  @rwlock = java.util.concurrent.locks.ReentrantReadWriteLock.new(true)
end