class CDMDEXER::ETLWorker

Extract records from OAI, delete records marked for deletion and send everything else to a transformation / load worker

Attributes

batch_size[R]
cdm_endpoint[R]
completed_callback_klass[W]
config[R]
etl_worker_klass[W]
field_mappings[R]
is_recursive[R]
load_worker_klass[W]
oai_endpoint[R]
oai_request_klass[W]
resumption_token[R]
solr_config[R]
transform_worker_klass[W]

Public Instance Methods

completed_callback_klass() click to toggle source

Because Sidekiq serializes params to JSON, we provide custom setters for dependencies (normally these would be default params in the constructor) so that they may be mocked and tested

# File lib/cdmdexer/etl_worker.rb, line 64
def completed_callback_klass
  @completed_callback_klass ||= CDMDEXER::CompletedCallback
end
etl_worker_klass() click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 68
def etl_worker_klass
  @etl_worker_klass ||= ETLWorker
end
load_worker_klass() click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 76
def load_worker_klass
  @load_worker_klass ||= LoadWorker
end
oai_request_klass() click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 72
def oai_request_klass
  @oai_request_klass ||= OaiRequest
end
perform(config) click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 29
def perform(config)
  # Sidekiq stores params in JSON, so we can't inject dependencies. This
  # results in the long set of arguments that follows. Otherwise, we'd
  # simply inject the OAI request and extractor objects
  @config            = config
  @solr_config       = config.fetch('solr_config').symbolize_keys
  @cdm_endpoint      = config.fetch('cdm_endpoint')
  @oai_endpoint      = config.fetch('oai_endpoint')
  @field_mappings    = config.fetch('field_mappings', false)
  @resumption_token  = config.fetch('resumption_token', nil)
  @batch_size        = config.fetch('batch_size', 5).to_i
  @is_recursive      = config.fetch('is_recursive', true)
  after_date         = config.fetch('after_date', false)

  @oai_request = oai_request_klass.new(
    endpoint_url: oai_endpoint,
    resumption_token: resumption_token,
    set_spec: config.fetch('set_spec', nil),
    # Optionally only select records that have been updated after a
    # certain date. You may need to manually update a parent record
    # after updating a child in order to signify to the indexer that
    # some record in the parent's children has been updated. This indexer
    # expects to only see parent records in the OAI responses.
    # The default here is to skip indexing based on date.
    # Rails example for getting a date: `after_date: 2.weeks.ago`
    after_date: after_date
  )

  run_batch!
  run_next_batch!
end
run_next_batch!() click to toggle source

Recurse through OAI batches one at a time

# File lib/cdmdexer/etl_worker.rb, line 85
def run_next_batch!
  if next_resumption_token && is_recursive
    etl_worker_klass.perform_async(next_config)
  else
    completed_callback_klass.call!(config)
  end
end
transform_worker_klass() click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 80
def transform_worker_klass
  @transform_worker_klass ||= TransformWorker
end

Private Instance Methods

delete_deletables!() click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 118
def delete_deletables!
  load_worker_klass.perform_async([], deletable_ids, solr_config)
end
next_config() click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 103
def next_config
  config.merge(resumption_token: next_resumption_token)
end
run_batch!() click to toggle source

Extract an oai response, delete the deletables, transform and load the updatable items

# File lib/cdmdexer/etl_worker.rb, line 97
def run_batch!
  # Delete records that OAI has marked for deletion
  delete_deletables!
  transform_and_load!
end
transform_and_load!() click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 107
def transform_and_load!
  updatables.each_slice(batch_size) do |records|
    transform_worker_klass.perform_async(records,
                                        solr_config,
                                        cdm_endpoint,
                                        oai_endpoint,
                                        field_mappings,
                                        batch_size)
  end
end