class Wayfarer::Processor
Runs jobs.
Attributes
job[R]
Public Class Methods
new(job, frontier, dispatcher)
click to toggle source
# File lib/wayfarer/processor.rb, line 21 def initialize(job, frontier, dispatcher) @job = job @frontier = frontier @dispatcher = dispatcher @halted = Concurrent::AtomicBoolean.new(false) end
Public Instance Methods
halt!()
click to toggle source
Sets a halt flag.
# File lib/wayfarer/processor.rb, line 35 def halt! @halted.make_true end
halted?()
click to toggle source
Whether processing is done. @return [true, false]
# File lib/wayfarer/processor.rb, line 30 def halted? @halted.value end
run(*_uris)
click to toggle source
Runs the job. @param [*Array<URI>, *Array<String>] uris
# File lib/wayfarer/processor.rb, line 41 def run(*_uris) notify_observers!(FirstCycle.new(@frontier)) while @halted.false? && @frontier.cycle current_uris = @frontier.current_uris queue = current_uris.inject(Queue.new, :push) notify_observers!(NewCycle.new(current_uris.count)) @threads = Array.new(config.connection_count) do Thread.new do begin loop do uri = queue.pop(true) break if uri.nil? || @halted.true? handle_dispatch_result(@dispatcher.dispatch(@job, uri)) end rescue ThreadError notify_observers!(CycleFinished.new) end end end @threads.each(&:join) notify_observers!(AboutToCycle.new(@frontier.staged_uris.count)) end ensure halt! @frontier.free @dispatcher.adapter_pool.free end
Private Instance Methods
handle_dispatch_result(result)
click to toggle source
# File lib/wayfarer/processor.rb, line 76 def handle_dispatch_result(result) case result when Dispatcher::Mismatch then handle_mismatch(result) when Dispatcher::Halt then handle_halt(result) when Dispatcher::Stage then handle_stage(result) when Dispatcher::Error then handle_error(result) end end
handle_error(error)
click to toggle source
# File lib/wayfarer/processor.rb, line 99 def handle_error(error) notify_observers!(UnhandledError.new(error.exception)) end
handle_halt(halt)
click to toggle source
# File lib/wayfarer/processor.rb, line 89 def handle_halt(halt) notify_observers!(HaltInitiated.new(halt.action, halt.uri)) halt! end
handle_mismatch(mismatch)
click to toggle source
# File lib/wayfarer/processor.rb, line 85 def handle_mismatch(mismatch) notify_observers!(MismatchedURI.new(mismatch.uri)) end
handle_stage(stage)
click to toggle source
# File lib/wayfarer/processor.rb, line 94 def handle_stage(stage) notify_observers!(StagingURIs.new(stage.uris.count)) @frontier.stage(*stage.uris) unless halted? end