class GnListResolver::Resolver
Sends data to GN Resolver
and collects results
Constants
- GRAPHQL
- QUERY
Attributes
stats[R]
Public Class Methods
new(writer, opts)
click to toggle source
# File lib/gn_list_resolver/resolver.rb, line 12 def initialize(writer, opts) instance_vars_from_opts(opts) @processor = GnListResolver::ResultProcessor. new(writer, @stats, @with_classification) @count = 0 @jobs = [] @batch = 1000 @smoothing = 0.05 end
Public Instance Methods
resolve(data) { |stats| ... }
click to toggle source
# File lib/gn_list_resolver/resolver.rb, line 22 def resolve(data) resolution_stats(data.size) @threads.times do batch = data.shift(@batch) add_job(batch) end block_given? ? traverse_jobs(data, &Proc.new) : traverse_jobs(data) wrap_up block_given? ? yield(@stats.stats) : @stats.stats end
Private Instance Methods
add_job(batch)
click to toggle source
# File lib/gn_list_resolver/resolver.rb, line 41 def add_job(batch) job = batch.empty? ? nil : create_job(batch) @jobs << job end
add_jobs(indices, data)
click to toggle source
# File lib/gn_list_resolver/resolver.rb, line 73 def add_jobs(indices, data) indices.each do |i| batch = data.shift(@batch) @jobs[i] = batch.empty? ? nil : create_job(batch) end end
collect_names(batch)
click to toggle source
# File lib/gn_list_resolver/resolver.rb, line 103 def collect_names(batch) batch_data = {} batch.each do |row| id = row[:id].strip batch_data[id] = row[:original] @processor.input[id] = { rank: row[:rank] } end batch_data end
create_job(batch)
click to toggle source
# File lib/gn_list_resolver/resolver.rb, line 90 def create_job(batch) batch_data = collect_names(batch) rb = ResolverJob.new(batch, batch_data, @ds_id) Concurrent::Future.execute { rb.run } end
instance_vars_from_opts(opts)
click to toggle source
# File lib/gn_list_resolver/resolver.rb, line 96 def instance_vars_from_opts(opts) @stats = opts.stats @with_classification = opts.with_classification.freeze @ds_id = opts.data_source_id.freeze @threads = opts.threads end
process_job(job)
click to toggle source
# File lib/gn_list_resolver/resolver.rb, line 80 def process_job(job) if job.fulfilled? results, current_data, stats = job.value update_stats(stats) @processor.process(results, current_data) else GnListResolver.logger.error(job.reason.message) end end
process_results(data)
click to toggle source
# File lib/gn_list_resolver/resolver.rb, line 61 def process_results(data) indices = [] @jobs.each_with_index do |job, i| next if job.nil? || !job.complete? with_log do process_job(job) indices << i end end add_jobs(indices, data) unless indices.empty? end
resolution_stats(records_num)
click to toggle source
# File lib/gn_list_resolver/resolver.rb, line 55 def resolution_stats(records_num) @stats.stats[:total_records] = records_num @stats.stats[:resolution][:start_time] = Time.now @stats.stats[:status] = :resolution end
traverse_jobs(data) { |stats| ... }
click to toggle source
# File lib/gn_list_resolver/resolver.rb, line 46 def traverse_jobs(data) until data.empty? && @jobs.compact.empty? process_results(data) cmd = yield(@stats.stats) if block_given? break if cmd == "STOP" sleep(0.5) end end
update_stats(job_stats)
click to toggle source
rubocop:disable Metrics/AbcSize
# File lib/gn_list_resolver/resolver.rb, line 114 def update_stats(job_stats) s = @stats.stats current_speed = job_stats.stats[:current_speed] * @stats.penalty(@threads) s[:resolution][:completed_records] += job_stats.stats[:resolution][:completed_records] @stats.update_eta(current_speed) s[:resolution][:time_span] = Time.now - s[:resolution][:start_time] end
with_log() { || ... }
click to toggle source
# File lib/gn_list_resolver/resolver.rb, line 125 def with_log yield s = @count + 1 @count += @batch e = [@count, @stats.stats[:total_records]].min eta = @stats.stats[:resolution][:eta].to_i + Time.now.to_i msg = format("Resolve %s-%s/%s records %d rec/s; eta: %s", s, e, @stats.stats[:total_records], @stats.stats[:resolution][:speed].to_i, Time.at(eta)) GnListResolver.log(msg) end
wrap_up()
click to toggle source
# File lib/gn_list_resolver/resolver.rb, line 35 def wrap_up @stats.stats[:resolution][:stop_time] = Time.now @stats.stats[:status] = :finish @processor.writer.close end