class Traject::SequelWriter

Attributes

db_table[R]

Sequel table/relation object

sequel_db[R]

Sequel db connection object

Public Class Methods

new(argSettings) click to toggle source
# File lib/traject/sequel_writer.rb, line 17
def initialize(argSettings)
  @settings = Traject::Indexer::Settings.new(argSettings)

  unless (!! @settings["sequel_writer.connection_string"]) ^ (!! @settings["sequel_writer.database"]) 
    raise ArgumentError, "Exactly one of either setting `sequel_writer.connection_string` or `sequel_writer.database` is required"
  end
  unless @settings["sequel_writer.table_name"]
    raise ArgumentError, "setting `sequel_writer.table_name` is required"
  end

  @disconnect_on_close = true
  @sequel_db = @settings["sequel_writer.database"]
  unless @sequel_db
    @sequel_db = Sequel.connect(@settings["sequel_writer.connection_string"])
    @disconnect_on_close = false
  end

  @db_table  = @sequel_db[  @settings["sequel_writer.table_name"].to_sym ]


  # Which keys to send to columns? Can be set explicitly with sequel_writer.columns,
  # or we'll use all non-PK columns introspected from the db schema.
  @column_names      = @settings["sequel_writer.columns"]

  unless @column_names
    @column_names = @sequel_db.schema( @db_table.first_source_table ).find_all do |column, info|
      info[:primary_key] != true
    end.collect {|pair| pair.first}
  end
  @column_names = @column_names.collect {|c| c.to_sym}
  @column_names = @column_names.freeze

  
  # How many threads to use for the writer?
  # if our thread pool settings are 0, it'll just create a null threadpool that
  # executes in calling context. Default to 1, for waiting on DB I/O.
  @thread_pool_size = (@settings["sequel_writer.thread_pool"] || 1).to_i

  @batch_size       = (@settings["sequel_writer.batch_size"] || 100).to_i

  @batched_queue         = Queue.new
  @thread_pool = Traject::ThreadPool.new(@thread_pool_size)

  @after_send_batch_callbacks = Array(@settings["sequel_writer.after_send_batch"] || [])

  @internal_delimiter = @settings["sequel_writer.internal_delimiter"] || ","
end

Public Instance Methods

after_send_batch(&block) click to toggle source
# File lib/traject/sequel_writer.rb, line 170
def after_send_batch(&block)
  @after_send_batch_callbacks << block
end
close() click to toggle source
# File lib/traject/sequel_writer.rb, line 80
def close
  @thread_pool.raise_collected_exception!

  # Finish off whatever's left. Do it in the thread pool for
  # consistency, and to ensure expected order of operations, so
  # it goes to the end of the queue behind any other work.
  batch = Traject::Util.drain_queue(@batched_queue)
  @thread_pool.maybe_in_thread_pool(batch) {|batch_arg| send_batch(batch_arg) }
  

  # Wait for shutdown, and time it.
  logger.debug "#{self.class.name}: Shutting down thread pool, waiting if needed..."
  elapsed = @thread_pool.shutdown_and_wait
  if elapsed > 60
    logger.warn "Waited #{elapsed} seconds for all threads, you may want to increase sequel_writer.thread_pool (currently #{@settings["solr_writer.thread_pool"]})"
  end
  logger.debug "#{self.class.name}: Thread pool shutdown complete"

  # check again now that we've waited, there could still be some
  # that didn't show up before.
  @thread_pool.raise_collected_exception!

  @sequel_db.disconnect if @disconnect_on_close
end
hash_to_array(column_names, hash) click to toggle source
# File lib/traject/sequel_writer.rb, line 141
def hash_to_array(column_names, hash)
  column_names.collect do |c| 
    output_value_to_column_value(hash[c.to_s])
  end
end
hashes_to_arrays(column_names, list_of_hashes) click to toggle source

Turn an array of hashes into an array of arrays, with each array being a hashes values matching column_names, in that order

# File lib/traject/sequel_writer.rb, line 135
def hashes_to_arrays(column_names, list_of_hashes)
  list_of_hashes.collect do |h| 
    hash_to_array(column_names, h)
  end
end
logger() click to toggle source

Get the logger from the settings, or default to an effectively null logger

# File lib/traject/sequel_writer.rb, line 66
def logger
  @settings["logger"] ||= Yell.new(STDERR, :level => "gt.fatal") # null logger
end
output_value_to_column_value(v) click to toggle source

Traject context.output_hash values are arrays. turn them into good column values, joining strings if needed.

Single values also accepted, even though not traject standard, they will be passed through unchanged.

# File lib/traject/sequel_writer.rb, line 152
def output_value_to_column_value(v)
  if v.kind_of?(Array)
    if v.length == 0
      nil
    elsif v.length == 1
      v.first
    elsif v.first.kind_of?(String)
      v.join(@internal_delimiter)
    else
      # Not a string? Um, raise for now?
      raise ArgumentError.new("Traject::SequelWriter, multiple non-String values provided: #{v}")
    end
  else
    v
  end
end
put(context) click to toggle source
# File lib/traject/sequel_writer.rb, line 70
def put(context)
  @thread_pool.raise_collected_exception!

  @batched_queue << context
  if @batched_queue.size >= @batch_size
    batch = Traject::Util.drain_queue(@batched_queue)
    @thread_pool.maybe_in_thread_pool(batch) {|batch_arg| send_batch(batch_arg) }
  end
end
send_batch(batch) click to toggle source
# File lib/traject/sequel_writer.rb, line 105
def send_batch(batch)
  list_of_arrays = hashes_to_arrays(@column_names, batch.collect {|context| context.output_hash})

  begin
    db_table.import @column_names, list_of_arrays
  rescue Sequel::DatabaseError, Sequel::PoolTimeout => batch_exception
    # We rescue PoolTimeout too, because we're mysteriously getting those, they are maybe dropped DB connections?
    # Try them each one by one, mostly so we can get a reasonable error message with particular record.
    logger.warn("SequelWriter: error (#{batch_exception}) inserting batch of #{list_of_arrays.count} starting from system_id #{batch.first.output_hash['system_id']}, retrying individually...")
    
    batch.each do |context|
      send_single(context)
    end
  end

  @after_send_batch_callbacks.each do |callback|
    callback.call(batch, self)
  end
end
send_single(context) click to toggle source
# File lib/traject/sequel_writer.rb, line 125
def send_single(context)      
  db_table.insert @column_names, hash_to_array(@column_names, context.output_hash)
rescue Sequel::DatabaseError => e
  logger.error("SequelWriter: Could not insert row: #{context.output_hash}: #{e}")
  raise e
end