class Tsuga::Service::Clusterer
Constants
- PROXIMITY_RATIO
- RUN_SANITY_CHECK
- Tile
- VERBOSE
Attributes
_adapter[R]
_queue[R]
_source[R]
Public Class Methods
new(source: nil, adapter: nil)
click to toggle source
# File lib/tsuga/service/clusterer.rb, line 15 def initialize(source: nil, adapter: nil) @_source = source @_adapter = adapter @_queue = WriteQueue.new(adapter: adapter) end
Public Instance Methods
run()
click to toggle source
# File lib/tsuga/service/clusterer.rb, line 21 def run # delete all clusters _adapter.delete_all # create lowest-level clusters _source.find_each do |record| _queue.push _adapter.build_from(Tsuga::MAX_DEPTH, record) end _queue.flush # for all depths N from 18 to 3 (Tsuga::MAX_DEPTH-1).downto(Tsuga::MIN_DEPTH) do |depth| progress.log "depth #{depth}" if VERBOSE progress.title = "#{depth}.0" if VERBOSE # create clusters at this level from children # TODO: use a save queue, only run saves if > 100 clusters to write cluster_ids = Set.new _adapter.at_depth(depth+1).find_each do |child| _queue.push _adapter.build_from(depth, child) end _queue.flush cluster_ids = MutableSet.new(_adapter.at_depth(depth).collect_ids) if cluster_ids.empty? progress.log "nothing to cluster" if VERBOSE break end # TODO: group points to cluster by tile, and run on tiles in parallel. progress.title = "#{depth}.1" if VERBOSE progress.log "started with #{cluster_ids.length} clusters" if VERBOSE progress.set_phase(depth, 1, cluster_ids.length) if VERBOSE while cluster_ids.any? progress.set_progress(cluster_ids.length) if VERBOSE cluster = _adapter.find_by_id(cluster_ids.first) raise 'internal error: cluster was already removed' if cluster.nil? tile = Tile.including(cluster, depth: depth) clusters = _adapter.in_tile(*tile.neighbours).to_a processed_cluster_ids = clusters.collect(&:id) # clusters we aggregate in this loop iteration # they are _not_ the same as what we pass to the aggregator, # just those inside the fence fenced_cluster_ids = _adapter.in_tile(tile).collect_ids raise RuntimeError, 'no cluster in fence' if fenced_cluster_ids.empty? Aggregator.new(clusters:clusters, ratio:PROXIMITY_RATIO, fence:tile).tap do |aggregator| aggregator.run if VERBOSE progress.log("aggregator: %4d left, %2d processed, %2d in fence, %2d updated, %2d dropped" % [ cluster_ids.length, processed_cluster_ids.length, fenced_cluster_ids.length, aggregator.updated_clusters.length, aggregator.dropped_clusters.length]) if aggregator.updated_clusters.any? progress.log("updated: #{aggregator.updated_clusters.collect(&:id).join(', ')}") end if aggregator.dropped_clusters.any? progress.log("dropped: #{aggregator.dropped_clusters.collect(&:id).join(', ')}") end end cluster_ids.remove! fenced_cluster_ids # updated clusters may need to be reprocessed (they might have fallen close enough to tile edges) # TODO: as further optimisation, do not mark for reprocessing clusters that are still inside the fence cluster_ids.merge! aggregator.updated_clusters.collect(&:id) # destroyed clusters may include some on the outer fringe of the fence tile cluster_ids.remove! aggregator.dropped_clusters.collect(&:id) aggregator.dropped_clusters.each(&:destroy) _adapter.mass_update(aggregator.updated_clusters) end if RUN_SANITY_CHECK # sanity check: all <cluster_ids> should exist not_removed = cluster_ids - _adapter.at_depth(depth).collect_ids if not_removed.any? raise "cluster_ids contains IDs of deleted clusters: #{not_removed.to_a.join(', ')}" end # sanity check: sum of weights should match that of lower level deeper_weight = _adapter.at_depth(depth+1).sum(:weight) this_weight = _adapter.at_depth(depth).sum(:weight) if deeper_weight != this_weight raise "mismatch between weight at this depth (#{this_weight}) and deeper level (#{deeper_weight})" end end end # set parent_id in the whole tree # this is made slightly more complicated by #find_each's scoping progress.title = "#{depth}.2" if VERBOSE child_mappings = {} _adapter.at_depth(depth).find_each do |cluster| cluster.children_ids.each do |child_id| child_mappings[child_id] = cluster.id end end child_mappings.each_pair do |child_id, parent_id| cluster = _adapter.find_by_id(child_id) cluster.parent_id = parent_id _queue.push cluster end _queue.flush end progress.finish if VERBOSE end
Private Instance Methods
_build_clusters(tile)
click to toggle source
return the record IDs used
# File lib/tsuga/service/clusterer.rb, line 245 def _build_clusters(tile) used_ids = [] clusters = [] _adapter.in_tile(*tile.children).find_each do |child| cluster = _adapter.build_from(tile.depth, child) clusters << cluster used_ids << child.id end return [used_ids, clusters] end
progress()
click to toggle source
# File lib/tsuga/service/clusterer.rb, line 137 def progress @_progressbar ||= ProgressBar.create.extend(SteppedProgressBar) end