class Mutest::Parallel::Master
Master
parallel worker
Constants
- MAP
Public Class Methods
call(config)
click to toggle source
Run master
@param [Config] config
@return [Actor::Sender]
# File lib/mutest/parallel/master.rb, line 14 def self.call(config) config.env.spawn do |mailbox| new(config, mailbox).__send__(:run) end end
new(*)
click to toggle source
Initialize object
@return [undefined]
Calls superclass method
# File lib/mutest/parallel/master.rb, line 23 def initialize(*) super @stop = false @workers = 0 @active_jobs = Set.new @index = 0 end
Private Instance Methods
handle(message)
click to toggle source
Handle messages
@param [Actor::Message] message
@return [undefined]
# File lib/mutest/parallel/master.rb, line 66 def handle(message) type = message.type payload = message.payload method = MAP.fetch(type) do raise Actor::ProtocolError, "Unexpected message: #{type.inspect}" end __send__(method, payload) end
handle_ready(sender)
click to toggle source
Handle ready worker
@param [Actor::Sender] sender
@return [undefined]
# File lib/mutest/parallel/master.rb, line 124 def handle_ready(sender) if stop_work? stop_worker(sender) return end sender.call(Actor::Message.new(:job, next_job)) end
handle_result(job_result)
click to toggle source
Handle result
@param [JobResult] job_result
@return [undefined]
# File lib/mutest/parallel/master.rb, line 103 def handle_result(job_result) @active_jobs.delete(job_result.job) sink.result(job_result.payload) end
handle_status(sender)
click to toggle source
Handle status
@param [Actor::Sender] sender
@return [undefined]
# File lib/mutest/parallel/master.rb, line 89 def handle_status(sender) status = Status.new( payload: sink.status, done: sink.stop? || @workers.zero?, active_jobs: @active_jobs.dup.freeze ) sender.call(Actor::Message.new(:status, status)) end
handle_stop(sender)
click to toggle source
Handle stop
@param [Actor::Sender] sender
@return [undefined]
# File lib/mutest/parallel/master.rb, line 113 def handle_stop(sender) @stop = true receive_loop sender.call(Actor::Message.new(:stop)) end
next_job()
click to toggle source
Next job if any
@return [Job]
if next job is available
@return [nil]
# File lib/mutest/parallel/master.rb, line 139 def next_job Job.new( index: @index, payload: source.next ).tap do |job| @index += 1 @active_jobs << job end end
receive_loop()
click to toggle source
Run receive loop
@return [undefined]
# File lib/mutest/parallel/master.rb, line 80 def receive_loop handle(mailbox.receiver.call) until @workers.zero? && @stop end
run()
click to toggle source
Run work loop
rubocop:disable MethodLength
@return [undefined]
# File lib/mutest/parallel/master.rb, line 39 def run config.jobs.times do @workers += 1 config.env.spawn do |worker_mailbox| Worker.run( mailbox: worker_mailbox, processor: config.processor, parent: mailbox.sender ) end end receive_loop end
sink()
click to toggle source
Job
result sink
@return [Sink]
# File lib/mutest/parallel/master.rb, line 176 def sink config.sink end
source()
click to toggle source
Job
source
@return [Source]
# File lib/mutest/parallel/master.rb, line 169 def source config.source end
stop_work?()
click to toggle source
Test
if scheduling stopped
@return [Boolean]
# File lib/mutest/parallel/master.rb, line 162 def stop_work? @stop || !source.next? || sink.stop? end
stop_worker(sender)
click to toggle source
Stop worker
@param [Actor::Sender] sender
@return [undefined]
# File lib/mutest/parallel/master.rb, line 154 def stop_worker(sender) @workers -= 1 sender.call(Actor::Message.new(:stop)) end