module Google::Gax

Gax defines Google API extensions

rubocop:disable Metrics/ModuleLength

Gax defines Google API extensions

Gax defines Google API extensions

Constants

BINDING

Private constants/methods/classes

DEMUX_WARNING
END_BINDING
MILLIS_PER_SECOND
Segment
TERMINAL
VERSION
VERSION_MATCHER

Regex used by gapic to find version files and directories.

Public Class Methods

compute_bundle_id(obj, discriminator_fields) click to toggle source

Computes a bundle id from the discriminator fields of `obj`.

discriminator_fields may include '.' as a separator, which is used to indicate object traversal. This is meant to allow fields in the computed bundle_id. the return is an array computed by going through the discriminator fields in order and obtaining the str(value) object field (or nested object field) if any discriminator field cannot be found, ValueError is raised.

@param obj [Object] an object. @param discriminator_fields [Array<String>] a list of discriminator

fields in the order to be to be used in the id.

@return [Array<Object>] array of objects computed as described above.

# File lib/google/gax/bundling.rb, line 72
def compute_bundle_id(obj, discriminator_fields)
  result = []
  discriminator_fields.each do |field|
    result.push(str_dotted_access(obj, field))
  end
  result
end
construct_settings(service_name, client_config, config_overrides, retry_names, timeout, bundle_descriptors: {}, page_descriptors: {}, metadata: {}, kwargs: {}, errors: []) click to toggle source

Constructs a dictionary mapping method names to CallSettings.

The client_config parameter is parsed from a client configuration JSON file of the form:

{
  "interfaces": {
    "google.fake.v1.ServiceName": {
      "retry_codes": {
        "idempotent": ["UNAVAILABLE", "DEADLINE_EXCEEDED"],
        "non_idempotent": []
      },
      "retry_params": {
        "default": {
          "initial_retry_delay_millis": 100,
          "retry_delay_multiplier": 1.2,
          "max_retry_delay_millis": 1000,
          "initial_rpc_timeout_millis": 2000,
          "rpc_timeout_multiplier": 1.5,
          "max_rpc_timeout_millis": 30000,
          "total_timeout_millis": 45000
        }
      },
      "methods": {
        "CreateFoo": {
          "retry_codes_name": "idempotent",
          "retry_params_name": "default"
        },
        "Publish": {
          "retry_codes_name": "non_idempotent",
          "retry_params_name": "default",
          "bundling": {
            "element_count_threshold": 40,
            "element_count_limit": 200,
            "request_byte_threshold": 90000,
            "request_byte_limit": 100000,
            "delay_threshold_millis": 100
          }
        }
      }
    }
  }
}

@param service_name [String] The fully-qualified name of this

service, used as a key into the client config file (in the
example above, this value should be
'google.fake.v1.ServiceName').

@param client_config [Hash] A hash parsed from the standard

API client config file.

@param config_overrides [Hash] A hash in the same structure of

client_config to override the settings.

@param retry_names [Hash] A hash mapping the string names

used in the standard API client config file to API response
status codes.

@param timeout [Numeric] The timeout parameter for all API calls

in this dictionary.

@param bundle_descriptors [Hash{String => BundleDescriptor}]

A dictionary of method names to BundleDescriptor objects for
methods that are bundling-enabled.

@param page_descriptors [Hash{String => PageDescriptor}] A

dictionary of method names to PageDescriptor objects for
methods that are page streaming-enabled.

@param metadata [Hash]

Header params to be passed to the API call.

@param kwargs [Hash]

Deprecated, same as metadata and if present will be merged with metadata

@param errors [Array<Exception>]

Configures the exceptions to wrap with GaxError.

@return [CallSettings, nil] A CallSettings, or nil if the

service is not found in the config.
# File lib/google/gax/settings.rb, line 488
def construct_settings(service_name, client_config, config_overrides,
                       retry_names, timeout, bundle_descriptors: {},
                       page_descriptors: {}, metadata: {}, kwargs: {},
                       errors: [])
  defaults = {}

  metadata.merge!(kwargs) if kwargs.is_a?(Hash) && metadata.is_a?(Hash)

  service_config = client_config.fetch('interfaces', {})[service_name]
  return nil unless service_config

  overrides = config_overrides.fetch('interfaces', {})[service_name] || {}

  service_config['methods'].each_pair do |method_name, method_config|
    snake_name = upper_camel_to_lower_underscore(method_name)

    overriding_method =
      overrides.fetch('methods', {}).fetch(method_name, {})

    bundling_config = method_config.fetch('bundling', nil)
    if overriding_method && overriding_method.key?('bundling')
      bundling_config = overriding_method['bundling']
    end
    bundle_descriptor = bundle_descriptors[snake_name]

    defaults[snake_name] = CallSettings.new(
      timeout: calc_method_timeout(
        timeout, method_config, overriding_method
      ),
      retry_options: merge_retry_options(
        construct_retry(method_config,
                        service_config['retry_codes'],
                        service_config['retry_params'],
                        retry_names),
        construct_retry(overriding_method,
                        overrides['retry_codes'],
                        overrides['retry_params'],
                        retry_names)
      ),
      page_descriptor: page_descriptors[snake_name],
      bundler: construct_bundling(bundling_config, bundle_descriptor),
      bundle_descriptor: bundle_descriptor,
      metadata: metadata,
      errors: errors
    )
  end

  defaults
end
create_api_call(func, settings, params_extractor: nil, exception_transformer: nil) click to toggle source

Converts an rpc call into an API call governed by the settings.

In typical usage, func will be a proc used to make an rpc request. This will mostly likely be a bound method from a request stub used to make an rpc call.

The result is created by applying a series of function decorators defined in this module to func. settings is used to determine which function decorators to apply.

The result is another proc which for most values of settings has the same signature as the original. Only when settings configures bundling does the signature change.

@param func [Proc] used to make a bare rpc call @param settings [CallSettings] provides the settings for this call @param params_extractor [Proc] extracts routing header params from the

request

@param exception_transformer [Proc] if an API exception occurs this

transformer is given the original exception for custom processing
instead of raising the error directly

@return [Proc] a bound method on a request stub used to make an rpc call @raise [StandardError] if settings has incompatible values,

e.g, if bundling and page_streaming are both configured
# File lib/google/gax/api_callable.rb, line 227
def create_api_call(func, settings, params_extractor: nil,
                    exception_transformer: nil)
  api_caller = proc do |api_call, request, _settings, block|
    api_call.call(request, block)
  end

  if settings.page_descriptor
    if settings.bundler?
      raise 'ApiCallable has incompatible settings: ' \
          'bundling and page streaming'
    end
    page_descriptor = settings.page_descriptor
    api_caller = page_streamable(page_descriptor.request_page_token_field,
                                 page_descriptor.response_page_token_field,
                                 page_descriptor.resource_field)
  elsif settings.bundler?
    api_caller = bundleable(settings.bundle_descriptor)
  end

  proc do |request, options = nil, &block|
    this_settings = settings.merge(options)
    if params_extractor
      params = params_extractor.call(request)
      this_settings = with_routing_header(this_settings, params)
    end
    api_call = if this_settings.retry_codes?
                 retryable(func, this_settings.retry_options,
                           this_settings.metadata)
               else
                 add_timeout_arg(func, this_settings.timeout,
                                 this_settings.metadata)
               end
    begin
      api_caller.call(api_call, request, this_settings, block)
    rescue *settings.errors => e
      error_class = Google::Gax.from_error(e)
      error = error_class.new('RPC failed')
      raise error if exception_transformer.nil?
      exception_transformer.call error
    rescue StandardError => error
      raise error if exception_transformer.nil?
      exception_transformer.call error
    end
  end
end
from_error(error) click to toggle source
# File lib/google/gax/errors.rb, line 74
def from_error(error)
  if error.respond_to? :code
    grpc_error_class_for error.code
  else
    GaxError
  end
end
grpc_error_class_for(grpc_error_code) click to toggle source

@private Identify the subclass for a gRPC error Note: ported from g/github.com/GoogleCloudPlatform/google-cloud-ruby/blob/master/google-cloud-core/lib/google/cloud/errors.rb

# File lib/google/gax/errors.rb, line 138
def self.grpc_error_class_for(grpc_error_code)
  # The gRPC status code 0 is for a successful response.
  # So there is no error subclass for a 0 status code, use current class.
  [GaxError, CanceledError, UnknownError, InvalidArgumentError,
   DeadlineExceededError, NotFoundError, AlreadyExistsError,
   PermissionDeniedError, ResourceExhaustedError, FailedPreconditionError,
   AbortedError, OutOfRangeError, UnimplementedError, InternalError,
   UnavailableError, DataLossError,
   UnauthenticatedError][grpc_error_code] || GaxError
end
str_dotted_access(obj, name) click to toggle source

Helper function for compute_bundle_id. Used to retrieve a nested field signified by name where dots in name indicate nested objects.

@param obj [Object] an object. @param name [String] a name for a field in the object. @return [String, nil] value of named attribute. Can be nil.

# File lib/google/gax/bundling.rb, line 51
def str_dotted_access(obj, name)
  name.split('.').each do |part|
    obj = obj[part]
    break if obj.nil?
  end
  obj.nil? ? nil : obj.to_s
end
time_to_timestamp(time) click to toggle source

Utility for converting a Ruby Time instance to a Google::Protobuf::Timestamp.

@param time [Time] The Time to be converted.

@return [Google::Protobuf::Timestamp] The converted

Google::Protobuf::Timestamp.
# File lib/google/gax/util.rb, line 180
def time_to_timestamp(time)
  Google::Protobuf::Timestamp.new(seconds: time.to_i, nanos: time.nsec)
end
timestamp_to_time(timestamp) click to toggle source

Utility for converting a Google::Protobuf::Timestamp instance to a Ruby time.

@param timestamp [Google::Protobuf::Timestamp] The timestamp to be

converted.

@return [Time] The converted Time.

# File lib/google/gax/util.rb, line 169
def timestamp_to_time(timestamp)
  Time.at(timestamp.nanos * 10**-9 + timestamp.seconds)
end
to_proto(hash, message_class) click to toggle source

Creates an instance of a protobuf message from a hash that may include nested hashes. `google/protobuf` allows for the instantiation of protobuf messages using hashes but does not allow for nested hashes to instantiate nested submessages.

@param hash [Hash || Class] The hash to be converted into a proto message.

If an instance of the proto message class is given, it is returned
unchanged.

@param message_class [Class] The corresponding protobuf message class of

the given hash.

@return [Object] An instance of the given message class.

# File lib/google/gax/util.rb, line 54
def to_proto(hash, message_class)
  return hash if hash.is_a? message_class

  # Sanity check: input must be a Hash
  unless hash.is_a? Hash
    raise ArgumentError.new(
      "Value #{hash} must be a Hash or a #{message_class.name}"
    )
  end
  hash = coerce_submessages(hash, message_class)
  message_class.new(hash)
end

Private Class Methods

add_timeout_arg(a_func, timeout, metadata) click to toggle source

Updates a_func so that it gets called with the timeout as its final arg.

This converts a proc, a_func, into another proc with an additional positional arg.

@param a_func [Proc] a proc to be updated @param timeout [Numeric] to be added to the original proc as it

final positional arg.

@param metadata [Hash] request metadata headers @return [Proc] the original proc updated to the timeout arg

# File lib/google/gax/api_callable.rb, line 394
def add_timeout_arg(a_func, timeout, metadata)
  proc do |request, block|
    deadline = Time.now + timeout unless timeout.nil?
    op = a_func.call(request,
                     deadline: deadline,
                     metadata: metadata,
                     return_op: true)
    res = op.execute
    block.call res, op if block
    res
  end
end
bundleable(desc) click to toggle source

Creates a proc that transforms an API call into a bundling call.

It transform a_func from an API call that receives the requests and returns the response into a proc that receives the same request, and returns a Google::Gax::Bundling::Event.

The returned Event object can be used to obtain the eventual result of the bundled call.

@param a_func [Proc] an API call that supports bundling. @param desc [BundleDescriptor] describes the bundling that

+a_func+ supports.

@param bundler orchestrates bundling. @return [Proc] A proc takes the API call's request and returns

an Event object.
# File lib/google/gax/api_callable.rb, line 288
def bundleable(desc)
  proc do |api_call, request, settings, block|
    return api_call(request, block) unless settings.bundler
    raise 'Bundling calls cannot accept blocks' if block
    the_id = Google::Gax.compute_bundle_id(
      request,
      desc.request_discriminator_fields
    )
    settings.bundler.schedule(api_call, the_id, desc, request)
  end
end
calc_method_timeout(timeout, method_config, overriding_method) click to toggle source

@private Determine timeout in seconds for the current method.

# File lib/google/gax/settings.rb, line 539
def calc_method_timeout(timeout, method_config, overriding_method)
  timeout_override = method_config['timeout_millis']
  if overriding_method && overriding_method.key?('timeout_millis')
    timeout_override = overriding_method['timeout_millis']
  end
  timeout_override ? timeout_override / 1000 : timeout
end
coerce(val, field_descriptor) click to toggle source

Coerces the value of a field to be acceptable by the instantiation method of the wrapping message.

@private

@param val [Object] The value to be coerced. @param field_descriptor [Google::Protobuf::FieldDescriptor] The field

descriptor of the value.

@return [Object] The coerced version of the given value.

# File lib/google/gax/util.rb, line 157
def coerce(val, field_descriptor)
  return val unless (val.is_a? Hash) && !(map_field? field_descriptor)
  to_proto(val, field_descriptor.subtype.msgclass)
end
coerce_array(array, field_descriptor) click to toggle source

Coerces the values of an array to be acceptable by the instantiation method the wrapping message.

@private

@param array [Array<Object>] The values to be coerced. @param field_descriptor [Google::Protobuf::FieldDescriptor] The field

descriptor of the values.

@return [Array<Object>] The coerced version of the given values.

# File lib/google/gax/util.rb, line 128
def coerce_array(array, field_descriptor)
  unless array.is_a? Array
    raise ArgumentError.new('Value ' + array.to_s + ' must be an array')
  end
  array.map do |val|
    coerce(val, field_descriptor)
  end
end
coerce_submessage(val, field_descriptor) click to toggle source

Coerces the value of a field to be acceptable by the instantiation method of the wrapping message.

@private

@param val [Object] The value to be coerced. @param field_descriptor [Google::Protobuf::FieldDescriptor] The field

descriptor of the value.

@return [Object] The coerced version of the given value.

# File lib/google/gax/util.rb, line 107
def coerce_submessage(val, field_descriptor)
  if (field_descriptor.label == :repeated) && !(map_field? field_descriptor)
    coerce_array(val, field_descriptor)
  elsif field_descriptor.subtype.msgclass == Google::Protobuf::Timestamp &&
        val.is_a?(Time)
    time_to_timestamp(val)
  else
    coerce(val, field_descriptor)
  end
end
coerce_submessages(hash, message_class) click to toggle source

Coerces values of the given hash to be acceptable by the instantiation

method provided by `google/protobuf`

@private

@param hash [Hash] The hash whose nested hashes will be coerced. @param message_class [Class] The corresponding protobuf message class of

the given hash.

@return [Hash] A hash whose nested hashes have been coerced.

# File lib/google/gax/util.rb, line 77
def coerce_submessages(hash, message_class)
  return nil if hash.nil?
  coerced = {}
  message_descriptor = message_class.descriptor
  hash.each do |key, val|
    field_descriptor = message_descriptor.lookup(key.to_s)
    if field_descriptor && field_descriptor.type == :message
      coerced[key] = coerce_submessage(val, field_descriptor)
    elsif field_descriptor && field_descriptor.type == :bytes &&
          (val.is_a?(IO) || val.is_a?(StringIO))
      coerced[key] = val.binmode.read
    else
      # `google/protobuf` should throw an error if no field descriptor is
      # found. Simply pass through.
      coerced[key] = val
    end
  end
  coerced
end
construct_bundling(bundle_config, bundle_descriptor) click to toggle source

Helper for construct_settings

@param bundle_config A Hash specifying a bundle parameters, the value for

'bundling' field in a method config (See ``construct_settings()`` for
information on this config.)

@param bundle_descriptor [BundleDescriptor] A BundleDescriptor

object describing the structure of bundling for this
method. If not set, this method will not bundle.

@return An Executor that configures bundling, or nil if this

method should not bundle.
# File lib/google/gax/settings.rb, line 328
def construct_bundling(bundle_config, bundle_descriptor)
  return unless bundle_config && bundle_descriptor
  options = BundleOptions.new
  bundle_config.each_pair do |key, value|
    options[key.intern] = value
  end
  # Bundling is currently not supported.
  # Executor.new(options)
  nil
end
construct_retry(method_config, retry_codes, retry_params, retry_names) click to toggle source

Helper for construct_settings

@param method_config [Hash] A dictionary representing a single

+methods+ entry of the standard API client config file. (See
#construct_settings for information on this yaml.)

@param retry_codes [Hash] A dictionary parsed from the

+retry_codes_def+ entry of the standard API client config
file. (See #construct_settings for information on this yaml.)

@param retry_params [Hash] A dictionary parsed from the

+retry_params+ entry of the standard API client config
file. (See #construct_settings for information on this yaml.)

@param retry_names [Hash] A dictionary mapping the string names

used in the standard API client config file to API response
status codes.

@return [RetryOptions, nil]

# File lib/google/gax/settings.rb, line 354
def construct_retry(method_config, retry_codes, retry_params, retry_names)
  return nil unless method_config
  codes = nil
  if retry_codes && method_config.key?('retry_codes_name')
    retry_codes_name = method_config['retry_codes_name']
    codes = retry_codes.fetch(retry_codes_name, []).map do |name|
      retry_names[name]
    end
  end

  backoff_settings = nil
  if retry_params && method_config.key?('retry_params_name')
    params = retry_params[method_config['retry_params_name']]
    backoff_settings = BackoffSettings.new(
      *params.values_at(*BackoffSettings.members.map(&:to_s))
    )
  end

  RetryOptions.new(codes, backoff_settings)
end
map_field?(field_descriptor) click to toggle source

Hack to determine if field_descriptor is for a map.

TODO(geigerj): Remove this once protobuf Ruby supports an official way to determine if a FieldDescriptor represents a map. See: github.com/google/protobuf/issues/3425

# File lib/google/gax/util.rb, line 142
def map_field?(field_descriptor)
  (field_descriptor.label == :repeated) &&
    (field_descriptor.subtype.name.include? '_MapEntry_')
end
merge_retry_options(retry_options, overrides) click to toggle source

Helper for construct_settings.

Takes two retry options, and merges them into a single RetryOption instance.

@param retry_options [RetryOptions] The base RetryOptions. @param overrides [RetryOptions] The RetryOptions used for overriding

+retry+. Use the values if it is not nil. If entire
+overrides+ is nli, ignore the base retry and return nil.

@return [RetryOptions, nil]

# File lib/google/gax/settings.rb, line 385
def merge_retry_options(retry_options, overrides)
  return nil if overrides.nil?

  if overrides.retry_codes.nil? && overrides.backoff_settings.nil?
    return retry_options
  end

  codes = retry_options.retry_codes
  codes = overrides.retry_codes unless overrides.retry_codes.nil?
  backoff_settings = retry_options.backoff_settings
  unless overrides.backoff_settings.nil?
    backoff_settings = overrides.backoff_settings
  end

  RetryOptions.new(codes, backoff_settings)
end
page_streamable(request_page_token_field, response_page_token_field, resource_field) click to toggle source

Creates a proc that yields an iterable to performs page-streaming.

@param a_func [Proc] an API call that is page streaming. @param request_page_token_field [String] The field of the page

token in the request.

@param response_page_token_field [String] The field of the next

page token in the response.

@param resource_field [String] The field to be streamed. @param page_token [Object] The page_token for the first page to be

streamed, or nil.

@return [Proc] A proc that returns an iterable over the specified field.

# File lib/google/gax/api_callable.rb, line 311
def page_streamable(request_page_token_field,
                    response_page_token_field,
                    resource_field)
  enumerable = PagedEnumerable.new(request_page_token_field,
                                   response_page_token_field,
                                   resource_field)
  enumerable.method(:start)
end
retryable(a_func, retry_options, metadata) click to toggle source

Creates a proc equivalent to a_func, but that retries on certain exceptions.

@param a_func [Proc] @param retry_options [RetryOptions] Configures the exceptions

upon which the proc should retry, and the parameters to the
exponential backoff retry algorithm.

@param metadata [Hash] request metadata headers @return [Proc] A proc that will retry on exception.

# File lib/google/gax/api_callable.rb, line 343
def retryable(a_func, retry_options, metadata)
  delay_mult = retry_options.backoff_settings.retry_delay_multiplier
  max_delay = (retry_options.backoff_settings.max_retry_delay_millis /
               MILLIS_PER_SECOND)
  timeout_mult = retry_options.backoff_settings.rpc_timeout_multiplier
  max_timeout = (retry_options.backoff_settings.max_rpc_timeout_millis /
                 MILLIS_PER_SECOND)
  total_timeout = (retry_options.backoff_settings.total_timeout_millis /
                   MILLIS_PER_SECOND)

  proc do |request, block|
    delay = retry_options.backoff_settings.initial_retry_delay_millis
    timeout = (retry_options.backoff_settings.initial_rpc_timeout_millis /
               MILLIS_PER_SECOND)
    deadline = Time.now + total_timeout
    begin
      op = a_func.call(request,
                       deadline: Time.now + timeout,
                       metadata: metadata,
                       return_op: true)
      res = op.execute
      block.call res, op if block
      res
    rescue => exception
      unless exception.respond_to?(:code) &&
             retry_options.retry_codes.include?(exception.code)
        raise RetryError.new('Exception occurred in retry method that ' \
          'was not classified as transient')
      end
      sleep(rand(delay) / MILLIS_PER_SECOND)
      now = Time.now
      delay = [delay * delay_mult, max_delay].min
      timeout = [timeout * timeout_mult, max_timeout, deadline - now].min
      if now >= deadline
        raise RetryError.new('Retry total timeout exceeded with exception')
      end
      retry
    end
  end
end
upper_camel_to_lower_underscore(s) click to toggle source

Port of GRPC::GenericService.underscore that works on frozen strings. Note that this function often is used on strings inside Hashes, which are frozen by default, so the GRPC implementation cannot be used directly.

TODO(geigerj): Consider whether this logic can be factored out into a shared location that both gRPC and GAX can depend on in order to remove the additional dependency on gRPC this introduces.

# File lib/google/gax/settings.rb, line 409
def upper_camel_to_lower_underscore(s)
  s = s.gsub(/([A-Z]+)([A-Z][a-z])/, '\1_\2')
  s = s.gsub(/([a-z\d])([A-Z])/, '\1_\2')
  s = s.tr('-', '_')
  s = s.downcase
  s
end
with_routing_header(settings, params) click to toggle source

Create a new CallSettings with the routing metadata from the request header params merged with the given settings.

@param settings [CallSettings] the settings for an API call. @param params [Hash] the request header params. @return [CallSettings] a new merged settings.

# File lib/google/gax/api_callable.rb, line 326
def with_routing_header(settings, params)
  routing_header = params.map { |k, v| "#{k}=#{v}" }.join('&')
  options = CallOptions.new(
    metadata: { 'x-goog-request-params' => routing_header }
  )
  settings.merge(options)
end

Private Instance Methods

add_timeout_arg(a_func, timeout, metadata) click to toggle source

Updates a_func so that it gets called with the timeout as its final arg.

This converts a proc, a_func, into another proc with an additional positional arg.

@param a_func [Proc] a proc to be updated @param timeout [Numeric] to be added to the original proc as it

final positional arg.

@param metadata [Hash] request metadata headers @return [Proc] the original proc updated to the timeout arg

# File lib/google/gax/api_callable.rb, line 394
def add_timeout_arg(a_func, timeout, metadata)
  proc do |request, block|
    deadline = Time.now + timeout unless timeout.nil?
    op = a_func.call(request,
                     deadline: deadline,
                     metadata: metadata,
                     return_op: true)
    res = op.execute
    block.call res, op if block
    res
  end
end
bundleable(desc) click to toggle source

Creates a proc that transforms an API call into a bundling call.

It transform a_func from an API call that receives the requests and returns the response into a proc that receives the same request, and returns a Google::Gax::Bundling::Event.

The returned Event object can be used to obtain the eventual result of the bundled call.

@param a_func [Proc] an API call that supports bundling. @param desc [BundleDescriptor] describes the bundling that

+a_func+ supports.

@param bundler orchestrates bundling. @return [Proc] A proc takes the API call's request and returns

an Event object.
# File lib/google/gax/api_callable.rb, line 288
def bundleable(desc)
  proc do |api_call, request, settings, block|
    return api_call(request, block) unless settings.bundler
    raise 'Bundling calls cannot accept blocks' if block
    the_id = Google::Gax.compute_bundle_id(
      request,
      desc.request_discriminator_fields
    )
    settings.bundler.schedule(api_call, the_id, desc, request)
  end
end
calc_method_timeout(timeout, method_config, overriding_method) click to toggle source

@private Determine timeout in seconds for the current method.

# File lib/google/gax/settings.rb, line 539
def calc_method_timeout(timeout, method_config, overriding_method)
  timeout_override = method_config['timeout_millis']
  if overriding_method && overriding_method.key?('timeout_millis')
    timeout_override = overriding_method['timeout_millis']
  end
  timeout_override ? timeout_override / 1000 : timeout
end
coerce(val, field_descriptor) click to toggle source

Coerces the value of a field to be acceptable by the instantiation method of the wrapping message.

@private

@param val [Object] The value to be coerced. @param field_descriptor [Google::Protobuf::FieldDescriptor] The field

descriptor of the value.

@return [Object] The coerced version of the given value.

# File lib/google/gax/util.rb, line 157
def coerce(val, field_descriptor)
  return val unless (val.is_a? Hash) && !(map_field? field_descriptor)
  to_proto(val, field_descriptor.subtype.msgclass)
end
coerce_array(array, field_descriptor) click to toggle source

Coerces the values of an array to be acceptable by the instantiation method the wrapping message.

@private

@param array [Array<Object>] The values to be coerced. @param field_descriptor [Google::Protobuf::FieldDescriptor] The field

descriptor of the values.

@return [Array<Object>] The coerced version of the given values.

# File lib/google/gax/util.rb, line 128
def coerce_array(array, field_descriptor)
  unless array.is_a? Array
    raise ArgumentError.new('Value ' + array.to_s + ' must be an array')
  end
  array.map do |val|
    coerce(val, field_descriptor)
  end
end
coerce_submessage(val, field_descriptor) click to toggle source

Coerces the value of a field to be acceptable by the instantiation method of the wrapping message.

@private

@param val [Object] The value to be coerced. @param field_descriptor [Google::Protobuf::FieldDescriptor] The field

descriptor of the value.

@return [Object] The coerced version of the given value.

# File lib/google/gax/util.rb, line 107
def coerce_submessage(val, field_descriptor)
  if (field_descriptor.label == :repeated) && !(map_field? field_descriptor)
    coerce_array(val, field_descriptor)
  elsif field_descriptor.subtype.msgclass == Google::Protobuf::Timestamp &&
        val.is_a?(Time)
    time_to_timestamp(val)
  else
    coerce(val, field_descriptor)
  end
end
coerce_submessages(hash, message_class) click to toggle source

Coerces values of the given hash to be acceptable by the instantiation

method provided by `google/protobuf`

@private

@param hash [Hash] The hash whose nested hashes will be coerced. @param message_class [Class] The corresponding protobuf message class of

the given hash.

@return [Hash] A hash whose nested hashes have been coerced.

# File lib/google/gax/util.rb, line 77
def coerce_submessages(hash, message_class)
  return nil if hash.nil?
  coerced = {}
  message_descriptor = message_class.descriptor
  hash.each do |key, val|
    field_descriptor = message_descriptor.lookup(key.to_s)
    if field_descriptor && field_descriptor.type == :message
      coerced[key] = coerce_submessage(val, field_descriptor)
    elsif field_descriptor && field_descriptor.type == :bytes &&
          (val.is_a?(IO) || val.is_a?(StringIO))
      coerced[key] = val.binmode.read
    else
      # `google/protobuf` should throw an error if no field descriptor is
      # found. Simply pass through.
      coerced[key] = val
    end
  end
  coerced
end
compute_bundle_id(obj, discriminator_fields) click to toggle source

Computes a bundle id from the discriminator fields of `obj`.

discriminator_fields may include '.' as a separator, which is used to indicate object traversal. This is meant to allow fields in the computed bundle_id. the return is an array computed by going through the discriminator fields in order and obtaining the str(value) object field (or nested object field) if any discriminator field cannot be found, ValueError is raised.

@param obj [Object] an object. @param discriminator_fields [Array<String>] a list of discriminator

fields in the order to be to be used in the id.

@return [Array<Object>] array of objects computed as described above.

# File lib/google/gax/bundling.rb, line 72
def compute_bundle_id(obj, discriminator_fields)
  result = []
  discriminator_fields.each do |field|
    result.push(str_dotted_access(obj, field))
  end
  result
end
construct_bundling(bundle_config, bundle_descriptor) click to toggle source

Helper for construct_settings

@param bundle_config A Hash specifying a bundle parameters, the value for

'bundling' field in a method config (See ``construct_settings()`` for
information on this config.)

@param bundle_descriptor [BundleDescriptor] A BundleDescriptor

object describing the structure of bundling for this
method. If not set, this method will not bundle.

@return An Executor that configures bundling, or nil if this

method should not bundle.
# File lib/google/gax/settings.rb, line 328
def construct_bundling(bundle_config, bundle_descriptor)
  return unless bundle_config && bundle_descriptor
  options = BundleOptions.new
  bundle_config.each_pair do |key, value|
    options[key.intern] = value
  end
  # Bundling is currently not supported.
  # Executor.new(options)
  nil
end
construct_retry(method_config, retry_codes, retry_params, retry_names) click to toggle source

Helper for construct_settings

@param method_config [Hash] A dictionary representing a single

+methods+ entry of the standard API client config file. (See
#construct_settings for information on this yaml.)

@param retry_codes [Hash] A dictionary parsed from the

+retry_codes_def+ entry of the standard API client config
file. (See #construct_settings for information on this yaml.)

@param retry_params [Hash] A dictionary parsed from the

+retry_params+ entry of the standard API client config
file. (See #construct_settings for information on this yaml.)

@param retry_names [Hash] A dictionary mapping the string names

used in the standard API client config file to API response
status codes.

@return [RetryOptions, nil]

# File lib/google/gax/settings.rb, line 354
def construct_retry(method_config, retry_codes, retry_params, retry_names)
  return nil unless method_config
  codes = nil
  if retry_codes && method_config.key?('retry_codes_name')
    retry_codes_name = method_config['retry_codes_name']
    codes = retry_codes.fetch(retry_codes_name, []).map do |name|
      retry_names[name]
    end
  end

  backoff_settings = nil
  if retry_params && method_config.key?('retry_params_name')
    params = retry_params[method_config['retry_params_name']]
    backoff_settings = BackoffSettings.new(
      *params.values_at(*BackoffSettings.members.map(&:to_s))
    )
  end

  RetryOptions.new(codes, backoff_settings)
end
construct_settings(service_name, client_config, config_overrides, retry_names, timeout, bundle_descriptors: {}, page_descriptors: {}, metadata: {}, kwargs: {}, errors: []) click to toggle source

Constructs a dictionary mapping method names to CallSettings.

The client_config parameter is parsed from a client configuration JSON file of the form:

{
  "interfaces": {
    "google.fake.v1.ServiceName": {
      "retry_codes": {
        "idempotent": ["UNAVAILABLE", "DEADLINE_EXCEEDED"],
        "non_idempotent": []
      },
      "retry_params": {
        "default": {
          "initial_retry_delay_millis": 100,
          "retry_delay_multiplier": 1.2,
          "max_retry_delay_millis": 1000,
          "initial_rpc_timeout_millis": 2000,
          "rpc_timeout_multiplier": 1.5,
          "max_rpc_timeout_millis": 30000,
          "total_timeout_millis": 45000
        }
      },
      "methods": {
        "CreateFoo": {
          "retry_codes_name": "idempotent",
          "retry_params_name": "default"
        },
        "Publish": {
          "retry_codes_name": "non_idempotent",
          "retry_params_name": "default",
          "bundling": {
            "element_count_threshold": 40,
            "element_count_limit": 200,
            "request_byte_threshold": 90000,
            "request_byte_limit": 100000,
            "delay_threshold_millis": 100
          }
        }
      }
    }
  }
}

@param service_name [String] The fully-qualified name of this

service, used as a key into the client config file (in the
example above, this value should be
'google.fake.v1.ServiceName').

@param client_config [Hash] A hash parsed from the standard

API client config file.

@param config_overrides [Hash] A hash in the same structure of

client_config to override the settings.

@param retry_names [Hash] A hash mapping the string names

used in the standard API client config file to API response
status codes.

@param timeout [Numeric] The timeout parameter for all API calls

in this dictionary.

@param bundle_descriptors [Hash{String => BundleDescriptor}]

A dictionary of method names to BundleDescriptor objects for
methods that are bundling-enabled.

@param page_descriptors [Hash{String => PageDescriptor}] A

dictionary of method names to PageDescriptor objects for
methods that are page streaming-enabled.

@param metadata [Hash]

Header params to be passed to the API call.

@param kwargs [Hash]

Deprecated, same as metadata and if present will be merged with metadata

@param errors [Array<Exception>]

Configures the exceptions to wrap with GaxError.

@return [CallSettings, nil] A CallSettings, or nil if the

service is not found in the config.
# File lib/google/gax/settings.rb, line 488
def construct_settings(service_name, client_config, config_overrides,
                       retry_names, timeout, bundle_descriptors: {},
                       page_descriptors: {}, metadata: {}, kwargs: {},
                       errors: [])
  defaults = {}

  metadata.merge!(kwargs) if kwargs.is_a?(Hash) && metadata.is_a?(Hash)

  service_config = client_config.fetch('interfaces', {})[service_name]
  return nil unless service_config

  overrides = config_overrides.fetch('interfaces', {})[service_name] || {}

  service_config['methods'].each_pair do |method_name, method_config|
    snake_name = upper_camel_to_lower_underscore(method_name)

    overriding_method =
      overrides.fetch('methods', {}).fetch(method_name, {})

    bundling_config = method_config.fetch('bundling', nil)
    if overriding_method && overriding_method.key?('bundling')
      bundling_config = overriding_method['bundling']
    end
    bundle_descriptor = bundle_descriptors[snake_name]

    defaults[snake_name] = CallSettings.new(
      timeout: calc_method_timeout(
        timeout, method_config, overriding_method
      ),
      retry_options: merge_retry_options(
        construct_retry(method_config,
                        service_config['retry_codes'],
                        service_config['retry_params'],
                        retry_names),
        construct_retry(overriding_method,
                        overrides['retry_codes'],
                        overrides['retry_params'],
                        retry_names)
      ),
      page_descriptor: page_descriptors[snake_name],
      bundler: construct_bundling(bundling_config, bundle_descriptor),
      bundle_descriptor: bundle_descriptor,
      metadata: metadata,
      errors: errors
    )
  end

  defaults
end
create_api_call(func, settings, params_extractor: nil, exception_transformer: nil) click to toggle source

Converts an rpc call into an API call governed by the settings.

In typical usage, func will be a proc used to make an rpc request. This will mostly likely be a bound method from a request stub used to make an rpc call.

The result is created by applying a series of function decorators defined in this module to func. settings is used to determine which function decorators to apply.

The result is another proc which for most values of settings has the same signature as the original. Only when settings configures bundling does the signature change.

@param func [Proc] used to make a bare rpc call @param settings [CallSettings] provides the settings for this call @param params_extractor [Proc] extracts routing header params from the

request

@param exception_transformer [Proc] if an API exception occurs this

transformer is given the original exception for custom processing
instead of raising the error directly

@return [Proc] a bound method on a request stub used to make an rpc call @raise [StandardError] if settings has incompatible values,

e.g, if bundling and page_streaming are both configured
# File lib/google/gax/api_callable.rb, line 227
def create_api_call(func, settings, params_extractor: nil,
                    exception_transformer: nil)
  api_caller = proc do |api_call, request, _settings, block|
    api_call.call(request, block)
  end

  if settings.page_descriptor
    if settings.bundler?
      raise 'ApiCallable has incompatible settings: ' \
          'bundling and page streaming'
    end
    page_descriptor = settings.page_descriptor
    api_caller = page_streamable(page_descriptor.request_page_token_field,
                                 page_descriptor.response_page_token_field,
                                 page_descriptor.resource_field)
  elsif settings.bundler?
    api_caller = bundleable(settings.bundle_descriptor)
  end

  proc do |request, options = nil, &block|
    this_settings = settings.merge(options)
    if params_extractor
      params = params_extractor.call(request)
      this_settings = with_routing_header(this_settings, params)
    end
    api_call = if this_settings.retry_codes?
                 retryable(func, this_settings.retry_options,
                           this_settings.metadata)
               else
                 add_timeout_arg(func, this_settings.timeout,
                                 this_settings.metadata)
               end
    begin
      api_caller.call(api_call, request, this_settings, block)
    rescue *settings.errors => e
      error_class = Google::Gax.from_error(e)
      error = error_class.new('RPC failed')
      raise error if exception_transformer.nil?
      exception_transformer.call error
    rescue StandardError => error
      raise error if exception_transformer.nil?
      exception_transformer.call error
    end
  end
end
from_error(error) click to toggle source
# File lib/google/gax/errors.rb, line 74
def from_error(error)
  if error.respond_to? :code
    grpc_error_class_for error.code
  else
    GaxError
  end
end
map_field?(field_descriptor) click to toggle source

Hack to determine if field_descriptor is for a map.

TODO(geigerj): Remove this once protobuf Ruby supports an official way to determine if a FieldDescriptor represents a map. See: github.com/google/protobuf/issues/3425

# File lib/google/gax/util.rb, line 142
def map_field?(field_descriptor)
  (field_descriptor.label == :repeated) &&
    (field_descriptor.subtype.name.include? '_MapEntry_')
end
merge_retry_options(retry_options, overrides) click to toggle source

Helper for construct_settings.

Takes two retry options, and merges them into a single RetryOption instance.

@param retry_options [RetryOptions] The base RetryOptions. @param overrides [RetryOptions] The RetryOptions used for overriding

+retry+. Use the values if it is not nil. If entire
+overrides+ is nli, ignore the base retry and return nil.

@return [RetryOptions, nil]

# File lib/google/gax/settings.rb, line 385
def merge_retry_options(retry_options, overrides)
  return nil if overrides.nil?

  if overrides.retry_codes.nil? && overrides.backoff_settings.nil?
    return retry_options
  end

  codes = retry_options.retry_codes
  codes = overrides.retry_codes unless overrides.retry_codes.nil?
  backoff_settings = retry_options.backoff_settings
  unless overrides.backoff_settings.nil?
    backoff_settings = overrides.backoff_settings
  end

  RetryOptions.new(codes, backoff_settings)
end
page_streamable(request_page_token_field, response_page_token_field, resource_field) click to toggle source

Creates a proc that yields an iterable to performs page-streaming.

@param a_func [Proc] an API call that is page streaming. @param request_page_token_field [String] The field of the page

token in the request.

@param response_page_token_field [String] The field of the next

page token in the response.

@param resource_field [String] The field to be streamed. @param page_token [Object] The page_token for the first page to be

streamed, or nil.

@return [Proc] A proc that returns an iterable over the specified field.

# File lib/google/gax/api_callable.rb, line 311
def page_streamable(request_page_token_field,
                    response_page_token_field,
                    resource_field)
  enumerable = PagedEnumerable.new(request_page_token_field,
                                   response_page_token_field,
                                   resource_field)
  enumerable.method(:start)
end
retryable(a_func, retry_options, metadata) click to toggle source

Creates a proc equivalent to a_func, but that retries on certain exceptions.

@param a_func [Proc] @param retry_options [RetryOptions] Configures the exceptions

upon which the proc should retry, and the parameters to the
exponential backoff retry algorithm.

@param metadata [Hash] request metadata headers @return [Proc] A proc that will retry on exception.

# File lib/google/gax/api_callable.rb, line 343
def retryable(a_func, retry_options, metadata)
  delay_mult = retry_options.backoff_settings.retry_delay_multiplier
  max_delay = (retry_options.backoff_settings.max_retry_delay_millis /
               MILLIS_PER_SECOND)
  timeout_mult = retry_options.backoff_settings.rpc_timeout_multiplier
  max_timeout = (retry_options.backoff_settings.max_rpc_timeout_millis /
                 MILLIS_PER_SECOND)
  total_timeout = (retry_options.backoff_settings.total_timeout_millis /
                   MILLIS_PER_SECOND)

  proc do |request, block|
    delay = retry_options.backoff_settings.initial_retry_delay_millis
    timeout = (retry_options.backoff_settings.initial_rpc_timeout_millis /
               MILLIS_PER_SECOND)
    deadline = Time.now + total_timeout
    begin
      op = a_func.call(request,
                       deadline: Time.now + timeout,
                       metadata: metadata,
                       return_op: true)
      res = op.execute
      block.call res, op if block
      res
    rescue => exception
      unless exception.respond_to?(:code) &&
             retry_options.retry_codes.include?(exception.code)
        raise RetryError.new('Exception occurred in retry method that ' \
          'was not classified as transient')
      end
      sleep(rand(delay) / MILLIS_PER_SECOND)
      now = Time.now
      delay = [delay * delay_mult, max_delay].min
      timeout = [timeout * timeout_mult, max_timeout, deadline - now].min
      if now >= deadline
        raise RetryError.new('Retry total timeout exceeded with exception')
      end
      retry
    end
  end
end
str_dotted_access(obj, name) click to toggle source

Helper function for compute_bundle_id. Used to retrieve a nested field signified by name where dots in name indicate nested objects.

@param obj [Object] an object. @param name [String] a name for a field in the object. @return [String, nil] value of named attribute. Can be nil.

# File lib/google/gax/bundling.rb, line 51
def str_dotted_access(obj, name)
  name.split('.').each do |part|
    obj = obj[part]
    break if obj.nil?
  end
  obj.nil? ? nil : obj.to_s
end
time_to_timestamp(time) click to toggle source

Utility for converting a Ruby Time instance to a Google::Protobuf::Timestamp.

@param time [Time] The Time to be converted.

@return [Google::Protobuf::Timestamp] The converted

Google::Protobuf::Timestamp.
# File lib/google/gax/util.rb, line 180
def time_to_timestamp(time)
  Google::Protobuf::Timestamp.new(seconds: time.to_i, nanos: time.nsec)
end
timestamp_to_time(timestamp) click to toggle source

Utility for converting a Google::Protobuf::Timestamp instance to a Ruby time.

@param timestamp [Google::Protobuf::Timestamp] The timestamp to be

converted.

@return [Time] The converted Time.

# File lib/google/gax/util.rb, line 169
def timestamp_to_time(timestamp)
  Time.at(timestamp.nanos * 10**-9 + timestamp.seconds)
end
to_proto(hash, message_class) click to toggle source

Creates an instance of a protobuf message from a hash that may include nested hashes. `google/protobuf` allows for the instantiation of protobuf messages using hashes but does not allow for nested hashes to instantiate nested submessages.

@param hash [Hash || Class] The hash to be converted into a proto message.

If an instance of the proto message class is given, it is returned
unchanged.

@param message_class [Class] The corresponding protobuf message class of

the given hash.

@return [Object] An instance of the given message class.

# File lib/google/gax/util.rb, line 54
def to_proto(hash, message_class)
  return hash if hash.is_a? message_class

  # Sanity check: input must be a Hash
  unless hash.is_a? Hash
    raise ArgumentError.new(
      "Value #{hash} must be a Hash or a #{message_class.name}"
    )
  end
  hash = coerce_submessages(hash, message_class)
  message_class.new(hash)
end
upper_camel_to_lower_underscore(s) click to toggle source

Port of GRPC::GenericService.underscore that works on frozen strings. Note that this function often is used on strings inside Hashes, which are frozen by default, so the GRPC implementation cannot be used directly.

TODO(geigerj): Consider whether this logic can be factored out into a shared location that both gRPC and GAX can depend on in order to remove the additional dependency on gRPC this introduces.

# File lib/google/gax/settings.rb, line 409
def upper_camel_to_lower_underscore(s)
  s = s.gsub(/([A-Z]+)([A-Z][a-z])/, '\1_\2')
  s = s.gsub(/([a-z\d])([A-Z])/, '\1_\2')
  s = s.tr('-', '_')
  s = s.downcase
  s
end
with_routing_header(settings, params) click to toggle source

Create a new CallSettings with the routing metadata from the request header params merged with the given settings.

@param settings [CallSettings] the settings for an API call. @param params [Hash] the request header params. @return [CallSettings] a new merged settings.

# File lib/google/gax/api_callable.rb, line 326
def with_routing_header(settings, params)
  routing_header = params.map { |k, v| "#{k}=#{v}" }.join('&')
  options = CallOptions.new(
    metadata: { 'x-goog-request-params' => routing_header }
  )
  settings.merge(options)
end