class Blender::Driver::Serf
Attributes
concurrency[R]
filter_by[R]
filter_tag[R]
Public Class Methods
new(config = {})
click to toggle source
Calls superclass method
# File lib/blender/drivers/serf.rb, line 29 def initialize(config = {}) cfg = config.dup @filter_by = cfg.delete(:filter_by) || :host @concurrency = cfg.delete(:concurrency) || 1 if @filter_by == :tag @filter_tag = cfg.delete(:filter_tag) raise ArgumentError, 'Must specify filter_tag when filter_by is set to :tag' unless @filter_tag end super(cfg) end
Public Instance Methods
execute(tasks, hosts)
click to toggle source
# File lib/blender/drivers/serf.rb, line 96 def execute(tasks, hosts) Log.debug("Serf query on #{filter_by}s [#{hosts.inspect}]") tasks.each do |task| hosts.each_slice(concurrency) do |nodes| cmd = run_command(task.command, nodes) if cmd.exitstatus != 0 and !task.metadata[:ignore_failure] raise ExecutionFailed, cmd.stderr end end end end
exit_status(responses, nodes)
click to toggle source
# File lib/blender/drivers/serf.rb, line 65 def exit_status(responses, nodes) case filter_by when :host if responses.size == nodes.size ExecOutput.new(0, responses.inspect, '') else ExecOutput.new(-1, '', "Insufficient number of responses. Expected:#{nodes.size}, Got:#{responses.size}") end when :tag, :none ExecOutput.new(0, responses.inspect, '') else raise ArgumentError, "Unknown filter_by option: #{filter_by}" end end
query_opts(command, nodes)
click to toggle source
# File lib/blender/drivers/serf.rb, line 80 def query_opts(command, nodes) opts = { Timeout: (command.timeout || 15)*1e9.to_i} case filter_by when :host opts.merge!(FilterNodes: nodes) when :tag raise 'filter by :tag only supports single tag' unless nodes.size == 1 opts.merge!(FilterTags: {filter_tag => nodes.first}) when :none raise 'filter by :none only supported with localhost' unless nodes == ['localhost'] else raise ArgumentError, "Unknown filter_by option: #{filter_by}" end [ command.query, command.payload, opts] end
run_command(command, nodes)
click to toggle source
# File lib/blender/drivers/serf.rb, line 53 def run_command(command, nodes) begin responses = serf_query(command, nodes) if command.process command.process.call(responses) end exit_status(responses, nodes) rescue StandardError => e ExecOutput.new( -1, '', e.message) end end
serf_query(command, host)
click to toggle source
# File lib/blender/drivers/serf.rb, line 40 def serf_query(command, host) responses = [] Log.debug("Invoking serf query '#{command.query}' with payload '#{command.payload}' against #{@current_host}") Log.debug("Serf RPC address #{config[:host]}:#{config[:port]}") Serfx.connect(config) do |conn| conn.query(*query_opts(command, host)) do |event| responses << event stdout.puts event.inspect end end responses end