class Bosh::Director::AgentClient

Constants

DEFAULT_POLL_INTERVAL
GET_STATE_MAX_RETRIES

get_task should retry at least once because some long running tasks (e.g. configure_networks) will restart the agent (current implementation) which most likely will result in first get_task message being lost because agent was not listening on NATS and second retry message will probably be received because agent came back up.

GET_TASK_MAX_RETRIES

in case of timeout errors

PROTOCOL_VERSION

Attributes

id[RW]

Public Class Methods

new(service_name, client_id, options = {}) click to toggle source
# File lib/bosh/director/agent_client.rb, line 35
def initialize(service_name, client_id, options = {})
  @service_name = service_name
  @client_id = client_id
  @nats_rpc = Config.nats_rpc
  @timeout = options[:timeout] || 45
  @logger = Config.logger
  @retry_methods = options[:retry_methods] || {}

  if options[:credentials]
    @encryption_handler =
      Bosh::Core::EncryptionHandler.new(@client_id, options[:credentials])
  end

  @resource_manager = Api::ResourceManager.new
end
with_vm_credentials_and_agent_id(vm_credentials, agent_id, options = {}) click to toggle source
# File lib/bosh/director/agent_client.rb, line 22
def self.with_vm_credentials_and_agent_id(vm_credentials, agent_id, options = {})
  defaults = {
    retry_methods: {
      get_state: GET_STATE_MAX_RETRIES,
      get_task: GET_TASK_MAX_RETRIES,
    }
  }

  defaults.merge!(credentials: vm_credentials) if vm_credentials

  self.new('agent', agent_id, defaults.merge(options))
end

Public Instance Methods

apply(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 75
def apply(*args)
  send_message(:apply, *args)
end
cancel_task(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 59
def cancel_task(*args)
  send_message(:cancel_task, *args)
end
compile_package(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 79
def compile_package(*args)
  send_message(:compile_package, *args)
end
delete_arp_entries(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 103
def delete_arp_entries(*args)
  fire_and_forget(:delete_arp_entries, *args)
end
drain(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 83
def drain(*args)
  send_cancellable_message(:drain, *args)
end
fetch_logs(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 87
def fetch_logs(*args)
  send_message(:fetch_logs, *args)
end
format_exception(exception) click to toggle source

Returns formatted exception information @param [Hash|#to_s] exception Serialized exception @return [String]

# File lib/bosh/director/agent_client.rb, line 233
def format_exception(exception)
  return exception.to_s unless exception.is_a?(Hash)

  msg = exception["message"].to_s

  if exception["backtrace"]
    msg += "\n"
    msg += Array(exception["backtrace"]).join("\n")
  end

  if exception["blobstore_id"]
    blob = download_and_delete_blob(exception["blobstore_id"])
    msg += "\n"
    msg += blob.to_s
  end

  msg
end
get_state(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 55
def get_state(*args)
  send_message(:get_state, *args)
end
handle_method(method_name, args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 186
def handle_method(method_name, args)
  result = {}
  result.extend(MonitorMixin)

  cond = result.new_cond
  timeout_time = Time.now.to_f + @timeout

  request_id = send_nats_request(method_name, args) do |response|
    if @encryption_handler
      begin
        response = @encryption_handler.decrypt(response["encrypted_data"])
      rescue Bosh::Core::EncryptionHandler::CryptError => e
        response["exception"] = "CryptError: #{e.inspect} #{e.backtrace}"
      end
      @logger.info("Response: #{response}")
    end

    result.synchronize do
      inject_compile_log(response)
      result.merge!(response)
      cond.signal
    end
  end

  result.synchronize do
    while result.empty?
      timeout = timeout_time - Time.now.to_f
      unless timeout > 0
        @nats_rpc.cancel_request(request_id)
        raise RpcTimeout,
          "Timed out sending '#{method_name}' to #{@client_id} " +
            "after #{@timeout} seconds"
      end
      cond.wait(timeout)
    end
  end

  if result.has_key?("exception")
    raise RpcRemoteException, format_exception(result["exception"])
  end

  result["value"]
end
list_disk(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 63
def list_disk(*args)
  send_message(:list_disk, *args)
end
method_missing(method_name, *args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 51
def method_missing(method_name, *args)
  handle_message_with_retry(method_name, *args)
end
migrate_disk(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 91
def migrate_disk(*args)
  send_message(:migrate_disk, *args)
end
mount_disk(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 95
def mount_disk(*args)
  send_message(:mount_disk, *args)
end
prepare(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 71
def prepare(*args)
  send_message(:prepare, *args)
end
run_errand(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 135
def run_errand(*args)
  start_task(:run_errand, *args)
end
run_script(script_name, options) click to toggle source
# File lib/bosh/director/agent_client.rb, line 119
def run_script(script_name, options)
  begin
    send_message(:run_script, script_name, options)
  rescue RpcRemoteException => e
    if e.message =~ /unknown message/
      @logger.warn("Ignoring run_script 'unknown message' error from the agent: #{e.inspect}. Received while trying to run: #{script_name}")
    else
      raise
    end
  end
end
send_nats_request(method_name, args, &callback) click to toggle source
# File lib/bosh/director/agent_client.rb, line 173
def send_nats_request(method_name, args, &callback)
  request = { :protocol => PROTOCOL_VERSION, :method => method_name, :arguments => args }

  if @encryption_handler
    @logger.info("Request: #{request}")
    request = { "encrypted_data" => @encryption_handler.encrypt(request) }
    request["session_id"] = @encryption_handler.session_id
  end

  recipient = "#{@service_name}.#{@client_id}"
  @nats_rpc.send_request(recipient, request, &callback)
end
start(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 67
def start(*args)
  send_message(:start, *args)
end
stop(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 131
def stop(*args)
  send_message(:stop, *args)
end
unmount_disk(*args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 99
def unmount_disk(*args)
  send_message(:unmount_disk, *args)
end
update_settings(certs) click to toggle source
# File lib/bosh/director/agent_client.rb, line 107
def update_settings(certs)
  begin
    send_message(:update_settings, {"trusted_certs" => certs})
  rescue RpcRemoteException => e
    if e.message =~ /unknown message/
      @logger.warn("Ignoring update_settings 'unknown message' error from the agent: #{e.inspect}")
    else
      raise
    end
  end
end
wait_for_task(agent_task_id, &blk) click to toggle source
# File lib/bosh/director/agent_client.rb, line 139
def wait_for_task(agent_task_id, &blk)
  task = get_task_status(agent_task_id)

  while task['state'] == 'running'
    blk.call if block_given?
    sleep(DEFAULT_POLL_INTERVAL)
    task = get_task_status(agent_task_id)
  end

  task['value']
end
wait_until_ready(deadline = 600) click to toggle source
# File lib/bosh/director/agent_client.rb, line 151
def wait_until_ready(deadline = 600)
  old_timeout = @timeout
  @timeout = 1.0
  @deadline = Time.now.to_i + deadline

  begin
    Config.job_cancelled?
    ping
  rescue TaskCancelled => e
    @logger.debug("Task was cancelled. Stop waiting response from vm")
    raise e
  rescue RpcTimeout
    retry if @deadline - Time.now.to_i > 0
    raise RpcTimeout, "Timed out pinging to #{@client_id} after #{deadline} seconds"
  rescue RpcRemoteException => e
    retry if e.message =~ /^restarting agent/ && @deadline - Time.now.to_i > 0
    raise e
  ensure
    @timeout = old_timeout
  end
end

Private Instance Methods

download_and_delete_blob(blob_id) click to toggle source

Downloads blob and ensures it's deleted from the blobstore @param [String] blob_id Blob id @return [String] Blob contents

# File lib/bosh/director/agent_client.rb, line 269
def download_and_delete_blob(blob_id)
  blob = @resource_manager.get_resource(blob_id)
  blob
ensure
  @resource_manager.delete_resource(blob_id)
end
fire_and_forget(message_name, *args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 289
def fire_and_forget(message_name, *args)
  send_nats_request(message_name, args)
rescue => e
  @logger.warn("Ignoring '#{e.message}' error from the agent: #{e.inspect}. Received while trying to run: #{message_name} on client: '#{@client_id}'")
end
get_task_status(agent_task_id) click to toggle source
# File lib/bosh/director/agent_client.rb, line 323
def get_task_status(agent_task_id)
  AgentMessageConverter.convert_old_message_to_new(get_task(agent_task_id))
end
handle_message_with_retry(message_name, *args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 276
def handle_message_with_retry(message_name, *args)
  retries = @retry_methods[message_name] || 0
  begin
    handle_method(message_name, args)
  rescue RpcTimeout
    if retries > 0
      retries -= 1
      retry
    end
    raise
  end
end
inject_compile_log(response) click to toggle source

the blob is removed from the blobstore once we have fetched it, but if there is a crash before it is injected into the response and then logged, there is a chance that we lose it

# File lib/bosh/director/agent_client.rb, line 257
def inject_compile_log(response)
  if response["value"] && response["value"].is_a?(Hash) &&
    response["value"]["result"].is_a?(Hash) &&
    blob_id = response["value"]["result"]["compile_log_id"]
    compile_log = download_and_delete_blob(blob_id)
    response["value"]["result"]["compile_log"] = compile_log
  end
end
send_cancellable_message(method_name, *args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 304
def send_cancellable_message(method_name, *args)
  task = start_task(method_name, *args)
  if task['agent_task_id']
    begin
      wait_for_task(task['agent_task_id']) { Config.job_cancelled? }
    rescue TaskCancelled => e
      cancel_task(task['agent_task_id'])
      raise e
    end
  else
    task['value']
  end
end
send_message(method_name, *args, &blk) click to toggle source
# File lib/bosh/director/agent_client.rb, line 295
def send_message(method_name, *args, &blk)
  task = start_task(method_name, *args)
  if task['agent_task_id']
    wait_for_task(task['agent_task_id'], &blk)
  else
    task['value']
  end
end
start_task(method_name, *args) click to toggle source
# File lib/bosh/director/agent_client.rb, line 319
def start_task(method_name, *args)
  AgentMessageConverter.convert_old_message_to_new(handle_message_with_retry(method_name, *args))
end