class Blender::Driver::Serf

Attributes

concurrency[R]
filter_by[R]
filter_tag[R]

Public Class Methods

new(config = {}) click to toggle source
Calls superclass method
# File lib/blender/drivers/serf.rb, line 29
def initialize(config = {})
  cfg = config.dup
  @filter_by =  cfg.delete(:filter_by) || :host
  @concurrency = cfg.delete(:concurrency) || 1
  if @filter_by == :tag
    @filter_tag =  cfg.delete(:filter_tag)
    raise ArgumentError, 'Must specify filter_tag when filter_by is set to :tag' unless @filter_tag
  end
  super(cfg)
end

Public Instance Methods

execute(tasks, hosts) click to toggle source
# File lib/blender/drivers/serf.rb, line 96
def execute(tasks, hosts)
  Log.debug("Serf query on #{filter_by}s [#{hosts.inspect}]")
  tasks.each do |task|
    hosts.each_slice(concurrency) do |nodes|
      cmd = run_command(task.command, nodes)
      if cmd.exitstatus != 0 and !task.metadata[:ignore_failure]
        raise ExecutionFailed, cmd.stderr
      end
    end
  end
end
exit_status(responses, nodes) click to toggle source
# File lib/blender/drivers/serf.rb, line 65
def exit_status(responses, nodes)
  case filter_by
  when :host
    if responses.size == nodes.size
      ExecOutput.new(0, responses.inspect, '')
    else
      ExecOutput.new(-1, '', "Insufficient number of responses. Expected:#{nodes.size}, Got:#{responses.size}")
    end
  when :tag, :none
    ExecOutput.new(0, responses.inspect, '')
  else
    raise ArgumentError, "Unknown filter_by option: #{filter_by}"
  end
end
query_opts(command, nodes) click to toggle source
# File lib/blender/drivers/serf.rb, line 80
def query_opts(command, nodes)
  opts = { Timeout: (command.timeout || 15)*1e9.to_i}
  case filter_by
  when :host
    opts.merge!(FilterNodes: nodes)
  when :tag
    raise 'filter by :tag only supports single tag' unless nodes.size == 1
    opts.merge!(FilterTags: {filter_tag => nodes.first})
  when :none
    raise 'filter by :none only supported with localhost' unless nodes == ['localhost']
  else
    raise ArgumentError, "Unknown filter_by option: #{filter_by}"
  end
  [ command.query, command.payload, opts]
end
run_command(command, nodes) click to toggle source
# File lib/blender/drivers/serf.rb, line 53
def run_command(command, nodes)
  begin
    responses = serf_query(command, nodes)
    if command.process
      command.process.call(responses)
    end
    exit_status(responses, nodes)
  rescue StandardError => e
    ExecOutput.new( -1, '', e.message)
  end
end
serf_query(command, host) click to toggle source
# File lib/blender/drivers/serf.rb, line 40
def serf_query(command, host)
  responses = []
  Log.debug("Invoking serf query '#{command.query}' with payload '#{command.payload}' against #{@current_host}")
  Log.debug("Serf RPC address #{config[:host]}:#{config[:port]}")
  Serfx.connect(config) do |conn|
    conn.query(*query_opts(command, host)) do |event|
      responses <<  event
      stdout.puts event.inspect
    end
  end
  responses
end