class CDMDEXER::ETLWorker
Extract records from OAI, delete records marked for deletion and send everything else to a transformation / load worker
Attributes
batch_size[R]
cdm_endpoint[R]
completed_callback_klass[W]
config[R]
etl_worker_klass[W]
field_mappings[R]
is_recursive[R]
load_worker_klass[W]
oai_endpoint[R]
oai_request_klass[W]
resumption_token[R]
solr_config[R]
transform_worker_klass[W]
Public Instance Methods
completed_callback_klass()
click to toggle source
Because Sidekiq serializes params to JSON, we provide custom setters for dependencies (normally these would be default params in the constructor) so that they may be mocked and tested
# File lib/cdmdexer/etl_worker.rb, line 64 def completed_callback_klass @completed_callback_klass ||= CDMDEXER::CompletedCallback end
etl_worker_klass()
click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 68 def etl_worker_klass @etl_worker_klass ||= ETLWorker end
load_worker_klass()
click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 76 def load_worker_klass @load_worker_klass ||= LoadWorker end
oai_request_klass()
click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 72 def oai_request_klass @oai_request_klass ||= OaiRequest end
perform(config)
click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 29 def perform(config) # Sidekiq stores params in JSON, so we can't inject dependencies. This # results in the long set of arguments that follows. Otherwise, we'd # simply inject the OAI request and extractor objects @config = config @solr_config = config.fetch('solr_config').symbolize_keys @cdm_endpoint = config.fetch('cdm_endpoint') @oai_endpoint = config.fetch('oai_endpoint') @field_mappings = config.fetch('field_mappings', false) @resumption_token = config.fetch('resumption_token', nil) @batch_size = config.fetch('batch_size', 5).to_i @is_recursive = config.fetch('is_recursive', true) after_date = config.fetch('after_date', false) @oai_request = oai_request_klass.new( endpoint_url: oai_endpoint, resumption_token: resumption_token, set_spec: config.fetch('set_spec', nil), # Optionally only select records that have been updated after a # certain date. You may need to manually update a parent record # after updating a child in order to signify to the indexer that # some record in the parent's children has been updated. This indexer # expects to only see parent records in the OAI responses. # The default here is to skip indexing based on date. # Rails example for getting a date: `after_date: 2.weeks.ago` after_date: after_date ) run_batch! run_next_batch! end
run_next_batch!()
click to toggle source
Recurse through OAI batches one at a time
# File lib/cdmdexer/etl_worker.rb, line 85 def run_next_batch! if next_resumption_token && is_recursive etl_worker_klass.perform_async(next_config) else completed_callback_klass.call!(config) end end
transform_worker_klass()
click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 80 def transform_worker_klass @transform_worker_klass ||= TransformWorker end
Private Instance Methods
delete_deletables!()
click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 118 def delete_deletables! load_worker_klass.perform_async([], deletable_ids, solr_config) end
next_config()
click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 103 def next_config config.merge(resumption_token: next_resumption_token) end
run_batch!()
click to toggle source
Extract an oai response, delete the deletables, transform and load the updatable items
# File lib/cdmdexer/etl_worker.rb, line 97 def run_batch! # Delete records that OAI has marked for deletion delete_deletables! transform_and_load! end
transform_and_load!()
click to toggle source
# File lib/cdmdexer/etl_worker.rb, line 107 def transform_and_load! updatables.each_slice(batch_size) do |records| transform_worker_klass.perform_async(records, solr_config, cdm_endpoint, oai_endpoint, field_mappings, batch_size) end end