module Google::Gax
Gax
defines Google
API extensions
rubocop:disable Metrics/ModuleLength
Constants
- BINDING
Private constants/methods/classes
- DEMUX_WARNING
- END_BINDING
- MILLIS_PER_SECOND
- Segment
- TERMINAL
- VERSION
- VERSION_MATCHER
Regex used by gapic to find version files and directories.
Public Class Methods
Computes a bundle id from the discriminator fields of `obj`.
discriminator_fields
may include '.' as a separator, which is used to indicate object traversal. This is meant to allow fields in the computed bundle_id. the return is an array computed by going through the discriminator fields in order and obtaining the str(value) object field (or nested object field) if any discriminator field cannot be found, ValueError is raised.
@param obj [Object] an object. @param discriminator_fields [Array<String>] a list of discriminator
fields in the order to be to be used in the id.
@return [Array<Object>] array of objects computed as described above.
# File lib/google/gax/bundling.rb, line 72 def compute_bundle_id(obj, discriminator_fields) result = [] discriminator_fields.each do |field| result.push(str_dotted_access(obj, field)) end result end
Constructs a dictionary mapping method names to CallSettings
.
The client_config
parameter is parsed from a client configuration JSON file of the form:
{ "interfaces": { "google.fake.v1.ServiceName": { "retry_codes": { "idempotent": ["UNAVAILABLE", "DEADLINE_EXCEEDED"], "non_idempotent": [] }, "retry_params": { "default": { "initial_retry_delay_millis": 100, "retry_delay_multiplier": 1.2, "max_retry_delay_millis": 1000, "initial_rpc_timeout_millis": 2000, "rpc_timeout_multiplier": 1.5, "max_rpc_timeout_millis": 30000, "total_timeout_millis": 45000 } }, "methods": { "CreateFoo": { "retry_codes_name": "idempotent", "retry_params_name": "default" }, "Publish": { "retry_codes_name": "non_idempotent", "retry_params_name": "default", "bundling": { "element_count_threshold": 40, "element_count_limit": 200, "request_byte_threshold": 90000, "request_byte_limit": 100000, "delay_threshold_millis": 100 } } } } } }
@param service_name [String] The fully-qualified name of this
service, used as a key into the client config file (in the example above, this value should be 'google.fake.v1.ServiceName').
@param client_config [Hash] A hash parsed from the standard
API client config file.
@param config_overrides [Hash] A hash in the same structure of
client_config to override the settings.
@param retry_names [Hash] A hash mapping the string names
used in the standard API client config file to API response status codes.
@param timeout [Numeric] The timeout parameter for all API calls
in this dictionary.
@param bundle_descriptors [Hash{String => BundleDescriptor}]
A dictionary of method names to BundleDescriptor objects for methods that are bundling-enabled.
@param page_descriptors [Hash{String => PageDescriptor}] A
dictionary of method names to PageDescriptor objects for methods that are page streaming-enabled.
@param metadata [Hash]
Header params to be passed to the API call.
@param kwargs [Hash]
Deprecated, same as metadata and if present will be merged with metadata
@param errors [Array<Exception>]
Configures the exceptions to wrap with GaxError.
@return [CallSettings, nil] A CallSettings
, or nil if the
service is not found in the config.
# File lib/google/gax/settings.rb, line 488 def construct_settings(service_name, client_config, config_overrides, retry_names, timeout, bundle_descriptors: {}, page_descriptors: {}, metadata: {}, kwargs: {}, errors: []) defaults = {} metadata.merge!(kwargs) if kwargs.is_a?(Hash) && metadata.is_a?(Hash) service_config = client_config.fetch('interfaces', {})[service_name] return nil unless service_config overrides = config_overrides.fetch('interfaces', {})[service_name] || {} service_config['methods'].each_pair do |method_name, method_config| snake_name = upper_camel_to_lower_underscore(method_name) overriding_method = overrides.fetch('methods', {}).fetch(method_name, {}) bundling_config = method_config.fetch('bundling', nil) if overriding_method && overriding_method.key?('bundling') bundling_config = overriding_method['bundling'] end bundle_descriptor = bundle_descriptors[snake_name] defaults[snake_name] = CallSettings.new( timeout: calc_method_timeout( timeout, method_config, overriding_method ), retry_options: merge_retry_options( construct_retry(method_config, service_config['retry_codes'], service_config['retry_params'], retry_names), construct_retry(overriding_method, overrides['retry_codes'], overrides['retry_params'], retry_names) ), page_descriptor: page_descriptors[snake_name], bundler: construct_bundling(bundling_config, bundle_descriptor), bundle_descriptor: bundle_descriptor, metadata: metadata, errors: errors ) end defaults end
Converts an rpc call into an API call governed by the settings.
In typical usage, func
will be a proc used to make an rpc request. This will mostly likely be a bound method from a request stub used to make an rpc call.
The result is created by applying a series of function decorators defined in this module to func
. settings
is used to determine which function decorators to apply.
The result is another proc which for most values of settings
has the same signature as the original. Only when settings
configures bundling does the signature change.
@param func [Proc] used to make a bare rpc call @param settings [CallSettings] provides the settings for this call @param params_extractor [Proc] extracts routing header params from the
request
@param exception_transformer [Proc] if an API exception occurs this
transformer is given the original exception for custom processing instead of raising the error directly
@return [Proc] a bound method on a request stub used to make an rpc call @raise [StandardError] if settings
has incompatible values,
e.g, if bundling and page_streaming are both configured
# File lib/google/gax/api_callable.rb, line 227 def create_api_call(func, settings, params_extractor: nil, exception_transformer: nil) api_caller = proc do |api_call, request, _settings, block| api_call.call(request, block) end if settings.page_descriptor if settings.bundler? raise 'ApiCallable has incompatible settings: ' \ 'bundling and page streaming' end page_descriptor = settings.page_descriptor api_caller = page_streamable(page_descriptor.request_page_token_field, page_descriptor.response_page_token_field, page_descriptor.resource_field) elsif settings.bundler? api_caller = bundleable(settings.bundle_descriptor) end proc do |request, options = nil, &block| this_settings = settings.merge(options) if params_extractor params = params_extractor.call(request) this_settings = with_routing_header(this_settings, params) end api_call = if this_settings.retry_codes? retryable(func, this_settings.retry_options, this_settings.metadata) else add_timeout_arg(func, this_settings.timeout, this_settings.metadata) end begin api_caller.call(api_call, request, this_settings, block) rescue *settings.errors => e error_class = Google::Gax.from_error(e) error = error_class.new('RPC failed') raise error if exception_transformer.nil? exception_transformer.call error rescue StandardError => error raise error if exception_transformer.nil? exception_transformer.call error end end end
# File lib/google/gax/errors.rb, line 74 def from_error(error) if error.respond_to? :code grpc_error_class_for error.code else GaxError end end
@private Identify the subclass for a gRPC error Note: ported from g/github.com/GoogleCloudPlatform/google-cloud-ruby/blob/master/google-cloud-core/lib/google/cloud/errors.rb
# File lib/google/gax/errors.rb, line 138 def self.grpc_error_class_for(grpc_error_code) # The gRPC status code 0 is for a successful response. # So there is no error subclass for a 0 status code, use current class. [GaxError, CanceledError, UnknownError, InvalidArgumentError, DeadlineExceededError, NotFoundError, AlreadyExistsError, PermissionDeniedError, ResourceExhaustedError, FailedPreconditionError, AbortedError, OutOfRangeError, UnimplementedError, InternalError, UnavailableError, DataLossError, UnauthenticatedError][grpc_error_code] || GaxError end
Helper function for compute_bundle_id
. Used to retrieve a nested field signified by name where dots in name indicate nested objects.
@param obj [Object] an object. @param name [String] a name for a field in the object. @return [String, nil] value of named attribute. Can be nil.
# File lib/google/gax/bundling.rb, line 51 def str_dotted_access(obj, name) name.split('.').each do |part| obj = obj[part] break if obj.nil? end obj.nil? ? nil : obj.to_s end
Utility for converting a Ruby Time instance to a Google::Protobuf::Timestamp.
@param time [Time] The Time to be converted.
@return [Google::Protobuf::Timestamp] The converted
Google::Protobuf::Timestamp.
# File lib/google/gax/util.rb, line 180 def time_to_timestamp(time) Google::Protobuf::Timestamp.new(seconds: time.to_i, nanos: time.nsec) end
Utility for converting a Google::Protobuf::Timestamp instance to a Ruby time.
@param timestamp [Google::Protobuf::Timestamp] The timestamp to be
converted.
@return [Time] The converted Time.
# File lib/google/gax/util.rb, line 169 def timestamp_to_time(timestamp) Time.at(timestamp.nanos * 10**-9 + timestamp.seconds) end
Creates an instance of a protobuf message from a hash that may include nested hashes. `google/protobuf` allows for the instantiation of protobuf messages using hashes but does not allow for nested hashes to instantiate nested submessages.
@param hash [Hash || Class] The hash to be converted into a proto message.
If an instance of the proto message class is given, it is returned unchanged.
@param message_class [Class] The corresponding protobuf message class of
the given hash.
@return [Object] An instance of the given message class.
# File lib/google/gax/util.rb, line 54 def to_proto(hash, message_class) return hash if hash.is_a? message_class # Sanity check: input must be a Hash unless hash.is_a? Hash raise ArgumentError.new( "Value #{hash} must be a Hash or a #{message_class.name}" ) end hash = coerce_submessages(hash, message_class) message_class.new(hash) end
Private Class Methods
Updates a_func
so that it gets called with the timeout as its final arg.
This converts a proc, a_func, into another proc with an additional positional arg.
@param a_func [Proc] a proc to be updated @param timeout [Numeric] to be added to the original proc as it
final positional arg.
@param metadata [Hash] request metadata headers @return [Proc] the original proc updated to the timeout arg
# File lib/google/gax/api_callable.rb, line 394 def add_timeout_arg(a_func, timeout, metadata) proc do |request, block| deadline = Time.now + timeout unless timeout.nil? op = a_func.call(request, deadline: deadline, metadata: metadata, return_op: true) res = op.execute block.call res, op if block res end end
Creates a proc that transforms an API call into a bundling call.
It transform a_func from an API call that receives the requests and returns the response into a proc that receives the same request, and returns a Google::Gax::Bundling::Event
.
The returned Event
object can be used to obtain the eventual result of the bundled call.
@param a_func [Proc] an API call that supports bundling. @param desc [BundleDescriptor] describes the bundling that
+a_func+ supports.
@param bundler orchestrates bundling. @return [Proc] A proc takes the API call's request and returns
an Event object.
# File lib/google/gax/api_callable.rb, line 288 def bundleable(desc) proc do |api_call, request, settings, block| return api_call(request, block) unless settings.bundler raise 'Bundling calls cannot accept blocks' if block the_id = Google::Gax.compute_bundle_id( request, desc.request_discriminator_fields ) settings.bundler.schedule(api_call, the_id, desc, request) end end
@private Determine timeout in seconds for the current method.
# File lib/google/gax/settings.rb, line 539 def calc_method_timeout(timeout, method_config, overriding_method) timeout_override = method_config['timeout_millis'] if overriding_method && overriding_method.key?('timeout_millis') timeout_override = overriding_method['timeout_millis'] end timeout_override ? timeout_override / 1000 : timeout end
Coerces the value of a field to be acceptable by the instantiation method of the wrapping message.
@private
@param val [Object] The value to be coerced. @param field_descriptor [Google::Protobuf::FieldDescriptor] The field
descriptor of the value.
@return [Object] The coerced version of the given value.
# File lib/google/gax/util.rb, line 157 def coerce(val, field_descriptor) return val unless (val.is_a? Hash) && !(map_field? field_descriptor) to_proto(val, field_descriptor.subtype.msgclass) end
Coerces the values of an array to be acceptable by the instantiation method the wrapping message.
@private
@param array [Array<Object>] The values to be coerced. @param field_descriptor [Google::Protobuf::FieldDescriptor] The field
descriptor of the values.
@return [Array<Object>] The coerced version of the given values.
# File lib/google/gax/util.rb, line 128 def coerce_array(array, field_descriptor) unless array.is_a? Array raise ArgumentError.new('Value ' + array.to_s + ' must be an array') end array.map do |val| coerce(val, field_descriptor) end end
Coerces the value of a field to be acceptable by the instantiation method of the wrapping message.
@private
@param val [Object] The value to be coerced. @param field_descriptor [Google::Protobuf::FieldDescriptor] The field
descriptor of the value.
@return [Object] The coerced version of the given value.
# File lib/google/gax/util.rb, line 107 def coerce_submessage(val, field_descriptor) if (field_descriptor.label == :repeated) && !(map_field? field_descriptor) coerce_array(val, field_descriptor) elsif field_descriptor.subtype.msgclass == Google::Protobuf::Timestamp && val.is_a?(Time) time_to_timestamp(val) else coerce(val, field_descriptor) end end
Coerces values of the given hash to be acceptable by the instantiation
method provided by `google/protobuf`
@private
@param hash [Hash] The hash whose nested hashes will be coerced. @param message_class [Class] The corresponding protobuf message class of
the given hash.
@return [Hash] A hash whose nested hashes have been coerced.
# File lib/google/gax/util.rb, line 77 def coerce_submessages(hash, message_class) return nil if hash.nil? coerced = {} message_descriptor = message_class.descriptor hash.each do |key, val| field_descriptor = message_descriptor.lookup(key.to_s) if field_descriptor && field_descriptor.type == :message coerced[key] = coerce_submessage(val, field_descriptor) elsif field_descriptor && field_descriptor.type == :bytes && (val.is_a?(IO) || val.is_a?(StringIO)) coerced[key] = val.binmode.read else # `google/protobuf` should throw an error if no field descriptor is # found. Simply pass through. coerced[key] = val end end coerced end
Helper for construct_settings
@param bundle_config A Hash specifying a bundle parameters, the value for
'bundling' field in a method config (See ``construct_settings()`` for information on this config.)
@param bundle_descriptor [BundleDescriptor] A BundleDescriptor
object describing the structure of bundling for this method. If not set, this method will not bundle.
@return An Executor
that configures bundling, or nil if this
method should not bundle.
# File lib/google/gax/settings.rb, line 328 def construct_bundling(bundle_config, bundle_descriptor) return unless bundle_config && bundle_descriptor options = BundleOptions.new bundle_config.each_pair do |key, value| options[key.intern] = value end # Bundling is currently not supported. # Executor.new(options) nil end
Helper for construct_settings
@param method_config [Hash] A dictionary representing a single
+methods+ entry of the standard API client config file. (See #construct_settings for information on this yaml.)
@param retry_codes [Hash] A dictionary parsed from the
+retry_codes_def+ entry of the standard API client config file. (See #construct_settings for information on this yaml.)
@param retry_params [Hash] A dictionary parsed from the
+retry_params+ entry of the standard API client config file. (See #construct_settings for information on this yaml.)
@param retry_names [Hash] A dictionary mapping the string names
used in the standard API client config file to API response status codes.
@return [RetryOptions, nil]
# File lib/google/gax/settings.rb, line 354 def construct_retry(method_config, retry_codes, retry_params, retry_names) return nil unless method_config codes = nil if retry_codes && method_config.key?('retry_codes_name') retry_codes_name = method_config['retry_codes_name'] codes = retry_codes.fetch(retry_codes_name, []).map do |name| retry_names[name] end end backoff_settings = nil if retry_params && method_config.key?('retry_params_name') params = retry_params[method_config['retry_params_name']] backoff_settings = BackoffSettings.new( *params.values_at(*BackoffSettings.members.map(&:to_s)) ) end RetryOptions.new(codes, backoff_settings) end
Hack to determine if field_descriptor is for a map.
TODO(geigerj): Remove this once protobuf Ruby supports an official way to determine if a FieldDescriptor represents a map. See: github.com/google/protobuf/issues/3425
# File lib/google/gax/util.rb, line 142 def map_field?(field_descriptor) (field_descriptor.label == :repeated) && (field_descriptor.subtype.name.include? '_MapEntry_') end
Helper for construct_settings
.
Takes two retry options, and merges them into a single RetryOption instance.
@param retry_options [RetryOptions] The base RetryOptions
. @param overrides [RetryOptions] The RetryOptions
used for overriding
+retry+. Use the values if it is not nil. If entire +overrides+ is nli, ignore the base retry and return nil.
@return [RetryOptions, nil]
# File lib/google/gax/settings.rb, line 385 def merge_retry_options(retry_options, overrides) return nil if overrides.nil? if overrides.retry_codes.nil? && overrides.backoff_settings.nil? return retry_options end codes = retry_options.retry_codes codes = overrides.retry_codes unless overrides.retry_codes.nil? backoff_settings = retry_options.backoff_settings unless overrides.backoff_settings.nil? backoff_settings = overrides.backoff_settings end RetryOptions.new(codes, backoff_settings) end
Creates a proc that yields an iterable to performs page-streaming.
@param a_func [Proc] an API call that is page streaming. @param request_page_token_field [String] The field of the page
token in the request.
@param response_page_token_field [String] The field of the next
page token in the response.
@param resource_field [String] The field to be streamed. @param page_token [Object] The page_token for the first page to be
streamed, or nil.
@return [Proc] A proc that returns an iterable over the specified field.
# File lib/google/gax/api_callable.rb, line 311 def page_streamable(request_page_token_field, response_page_token_field, resource_field) enumerable = PagedEnumerable.new(request_page_token_field, response_page_token_field, resource_field) enumerable.method(:start) end
Creates a proc equivalent to a_func, but that retries on certain exceptions.
@param a_func [Proc] @param retry_options [RetryOptions] Configures the exceptions
upon which the proc should retry, and the parameters to the exponential backoff retry algorithm.
@param metadata [Hash] request metadata headers @return [Proc] A proc that will retry on exception.
# File lib/google/gax/api_callable.rb, line 343 def retryable(a_func, retry_options, metadata) delay_mult = retry_options.backoff_settings.retry_delay_multiplier max_delay = (retry_options.backoff_settings.max_retry_delay_millis / MILLIS_PER_SECOND) timeout_mult = retry_options.backoff_settings.rpc_timeout_multiplier max_timeout = (retry_options.backoff_settings.max_rpc_timeout_millis / MILLIS_PER_SECOND) total_timeout = (retry_options.backoff_settings.total_timeout_millis / MILLIS_PER_SECOND) proc do |request, block| delay = retry_options.backoff_settings.initial_retry_delay_millis timeout = (retry_options.backoff_settings.initial_rpc_timeout_millis / MILLIS_PER_SECOND) deadline = Time.now + total_timeout begin op = a_func.call(request, deadline: Time.now + timeout, metadata: metadata, return_op: true) res = op.execute block.call res, op if block res rescue => exception unless exception.respond_to?(:code) && retry_options.retry_codes.include?(exception.code) raise RetryError.new('Exception occurred in retry method that ' \ 'was not classified as transient') end sleep(rand(delay) / MILLIS_PER_SECOND) now = Time.now delay = [delay * delay_mult, max_delay].min timeout = [timeout * timeout_mult, max_timeout, deadline - now].min if now >= deadline raise RetryError.new('Retry total timeout exceeded with exception') end retry end end end
Port of GRPC::GenericService.underscore that works on frozen strings. Note that this function often is used on strings inside Hashes, which are frozen by default, so the GRPC implementation cannot be used directly.
TODO(geigerj): Consider whether this logic can be factored out into a shared location that both gRPC and GAX can depend on in order to remove the additional dependency on gRPC this introduces.
# File lib/google/gax/settings.rb, line 409 def upper_camel_to_lower_underscore(s) s = s.gsub(/([A-Z]+)([A-Z][a-z])/, '\1_\2') s = s.gsub(/([a-z\d])([A-Z])/, '\1_\2') s = s.tr('-', '_') s = s.downcase s end
Create a new CallSettings
with the routing metadata from the request header params merged with the given settings.
@param settings [CallSettings] the settings for an API call. @param params [Hash] the request header params. @return [CallSettings] a new merged settings.
# File lib/google/gax/api_callable.rb, line 326 def with_routing_header(settings, params) routing_header = params.map { |k, v| "#{k}=#{v}" }.join('&') options = CallOptions.new( metadata: { 'x-goog-request-params' => routing_header } ) settings.merge(options) end
Private Instance Methods
Updates a_func
so that it gets called with the timeout as its final arg.
This converts a proc, a_func, into another proc with an additional positional arg.
@param a_func [Proc] a proc to be updated @param timeout [Numeric] to be added to the original proc as it
final positional arg.
@param metadata [Hash] request metadata headers @return [Proc] the original proc updated to the timeout arg
# File lib/google/gax/api_callable.rb, line 394 def add_timeout_arg(a_func, timeout, metadata) proc do |request, block| deadline = Time.now + timeout unless timeout.nil? op = a_func.call(request, deadline: deadline, metadata: metadata, return_op: true) res = op.execute block.call res, op if block res end end
Creates a proc that transforms an API call into a bundling call.
It transform a_func from an API call that receives the requests and returns the response into a proc that receives the same request, and returns a Google::Gax::Bundling::Event
.
The returned Event
object can be used to obtain the eventual result of the bundled call.
@param a_func [Proc] an API call that supports bundling. @param desc [BundleDescriptor] describes the bundling that
+a_func+ supports.
@param bundler orchestrates bundling. @return [Proc] A proc takes the API call's request and returns
an Event object.
# File lib/google/gax/api_callable.rb, line 288 def bundleable(desc) proc do |api_call, request, settings, block| return api_call(request, block) unless settings.bundler raise 'Bundling calls cannot accept blocks' if block the_id = Google::Gax.compute_bundle_id( request, desc.request_discriminator_fields ) settings.bundler.schedule(api_call, the_id, desc, request) end end
@private Determine timeout in seconds for the current method.
# File lib/google/gax/settings.rb, line 539 def calc_method_timeout(timeout, method_config, overriding_method) timeout_override = method_config['timeout_millis'] if overriding_method && overriding_method.key?('timeout_millis') timeout_override = overriding_method['timeout_millis'] end timeout_override ? timeout_override / 1000 : timeout end
Coerces the value of a field to be acceptable by the instantiation method of the wrapping message.
@private
@param val [Object] The value to be coerced. @param field_descriptor [Google::Protobuf::FieldDescriptor] The field
descriptor of the value.
@return [Object] The coerced version of the given value.
# File lib/google/gax/util.rb, line 157 def coerce(val, field_descriptor) return val unless (val.is_a? Hash) && !(map_field? field_descriptor) to_proto(val, field_descriptor.subtype.msgclass) end
Coerces the values of an array to be acceptable by the instantiation method the wrapping message.
@private
@param array [Array<Object>] The values to be coerced. @param field_descriptor [Google::Protobuf::FieldDescriptor] The field
descriptor of the values.
@return [Array<Object>] The coerced version of the given values.
# File lib/google/gax/util.rb, line 128 def coerce_array(array, field_descriptor) unless array.is_a? Array raise ArgumentError.new('Value ' + array.to_s + ' must be an array') end array.map do |val| coerce(val, field_descriptor) end end
Coerces the value of a field to be acceptable by the instantiation method of the wrapping message.
@private
@param val [Object] The value to be coerced. @param field_descriptor [Google::Protobuf::FieldDescriptor] The field
descriptor of the value.
@return [Object] The coerced version of the given value.
# File lib/google/gax/util.rb, line 107 def coerce_submessage(val, field_descriptor) if (field_descriptor.label == :repeated) && !(map_field? field_descriptor) coerce_array(val, field_descriptor) elsif field_descriptor.subtype.msgclass == Google::Protobuf::Timestamp && val.is_a?(Time) time_to_timestamp(val) else coerce(val, field_descriptor) end end
Coerces values of the given hash to be acceptable by the instantiation
method provided by `google/protobuf`
@private
@param hash [Hash] The hash whose nested hashes will be coerced. @param message_class [Class] The corresponding protobuf message class of
the given hash.
@return [Hash] A hash whose nested hashes have been coerced.
# File lib/google/gax/util.rb, line 77 def coerce_submessages(hash, message_class) return nil if hash.nil? coerced = {} message_descriptor = message_class.descriptor hash.each do |key, val| field_descriptor = message_descriptor.lookup(key.to_s) if field_descriptor && field_descriptor.type == :message coerced[key] = coerce_submessage(val, field_descriptor) elsif field_descriptor && field_descriptor.type == :bytes && (val.is_a?(IO) || val.is_a?(StringIO)) coerced[key] = val.binmode.read else # `google/protobuf` should throw an error if no field descriptor is # found. Simply pass through. coerced[key] = val end end coerced end
Computes a bundle id from the discriminator fields of `obj`.
discriminator_fields
may include '.' as a separator, which is used to indicate object traversal. This is meant to allow fields in the computed bundle_id. the return is an array computed by going through the discriminator fields in order and obtaining the str(value) object field (or nested object field) if any discriminator field cannot be found, ValueError is raised.
@param obj [Object] an object. @param discriminator_fields [Array<String>] a list of discriminator
fields in the order to be to be used in the id.
@return [Array<Object>] array of objects computed as described above.
# File lib/google/gax/bundling.rb, line 72 def compute_bundle_id(obj, discriminator_fields) result = [] discriminator_fields.each do |field| result.push(str_dotted_access(obj, field)) end result end
Helper for construct_settings
@param bundle_config A Hash specifying a bundle parameters, the value for
'bundling' field in a method config (See ``construct_settings()`` for information on this config.)
@param bundle_descriptor [BundleDescriptor] A BundleDescriptor
object describing the structure of bundling for this method. If not set, this method will not bundle.
@return An Executor
that configures bundling, or nil if this
method should not bundle.
# File lib/google/gax/settings.rb, line 328 def construct_bundling(bundle_config, bundle_descriptor) return unless bundle_config && bundle_descriptor options = BundleOptions.new bundle_config.each_pair do |key, value| options[key.intern] = value end # Bundling is currently not supported. # Executor.new(options) nil end
Helper for construct_settings
@param method_config [Hash] A dictionary representing a single
+methods+ entry of the standard API client config file. (See #construct_settings for information on this yaml.)
@param retry_codes [Hash] A dictionary parsed from the
+retry_codes_def+ entry of the standard API client config file. (See #construct_settings for information on this yaml.)
@param retry_params [Hash] A dictionary parsed from the
+retry_params+ entry of the standard API client config file. (See #construct_settings for information on this yaml.)
@param retry_names [Hash] A dictionary mapping the string names
used in the standard API client config file to API response status codes.
@return [RetryOptions, nil]
# File lib/google/gax/settings.rb, line 354 def construct_retry(method_config, retry_codes, retry_params, retry_names) return nil unless method_config codes = nil if retry_codes && method_config.key?('retry_codes_name') retry_codes_name = method_config['retry_codes_name'] codes = retry_codes.fetch(retry_codes_name, []).map do |name| retry_names[name] end end backoff_settings = nil if retry_params && method_config.key?('retry_params_name') params = retry_params[method_config['retry_params_name']] backoff_settings = BackoffSettings.new( *params.values_at(*BackoffSettings.members.map(&:to_s)) ) end RetryOptions.new(codes, backoff_settings) end
Constructs a dictionary mapping method names to CallSettings
.
The client_config
parameter is parsed from a client configuration JSON file of the form:
{ "interfaces": { "google.fake.v1.ServiceName": { "retry_codes": { "idempotent": ["UNAVAILABLE", "DEADLINE_EXCEEDED"], "non_idempotent": [] }, "retry_params": { "default": { "initial_retry_delay_millis": 100, "retry_delay_multiplier": 1.2, "max_retry_delay_millis": 1000, "initial_rpc_timeout_millis": 2000, "rpc_timeout_multiplier": 1.5, "max_rpc_timeout_millis": 30000, "total_timeout_millis": 45000 } }, "methods": { "CreateFoo": { "retry_codes_name": "idempotent", "retry_params_name": "default" }, "Publish": { "retry_codes_name": "non_idempotent", "retry_params_name": "default", "bundling": { "element_count_threshold": 40, "element_count_limit": 200, "request_byte_threshold": 90000, "request_byte_limit": 100000, "delay_threshold_millis": 100 } } } } } }
@param service_name [String] The fully-qualified name of this
service, used as a key into the client config file (in the example above, this value should be 'google.fake.v1.ServiceName').
@param client_config [Hash] A hash parsed from the standard
API client config file.
@param config_overrides [Hash] A hash in the same structure of
client_config to override the settings.
@param retry_names [Hash] A hash mapping the string names
used in the standard API client config file to API response status codes.
@param timeout [Numeric] The timeout parameter for all API calls
in this dictionary.
@param bundle_descriptors [Hash{String => BundleDescriptor}]
A dictionary of method names to BundleDescriptor objects for methods that are bundling-enabled.
@param page_descriptors [Hash{String => PageDescriptor}] A
dictionary of method names to PageDescriptor objects for methods that are page streaming-enabled.
@param metadata [Hash]
Header params to be passed to the API call.
@param kwargs [Hash]
Deprecated, same as metadata and if present will be merged with metadata
@param errors [Array<Exception>]
Configures the exceptions to wrap with GaxError.
@return [CallSettings, nil] A CallSettings
, or nil if the
service is not found in the config.
# File lib/google/gax/settings.rb, line 488 def construct_settings(service_name, client_config, config_overrides, retry_names, timeout, bundle_descriptors: {}, page_descriptors: {}, metadata: {}, kwargs: {}, errors: []) defaults = {} metadata.merge!(kwargs) if kwargs.is_a?(Hash) && metadata.is_a?(Hash) service_config = client_config.fetch('interfaces', {})[service_name] return nil unless service_config overrides = config_overrides.fetch('interfaces', {})[service_name] || {} service_config['methods'].each_pair do |method_name, method_config| snake_name = upper_camel_to_lower_underscore(method_name) overriding_method = overrides.fetch('methods', {}).fetch(method_name, {}) bundling_config = method_config.fetch('bundling', nil) if overriding_method && overriding_method.key?('bundling') bundling_config = overriding_method['bundling'] end bundle_descriptor = bundle_descriptors[snake_name] defaults[snake_name] = CallSettings.new( timeout: calc_method_timeout( timeout, method_config, overriding_method ), retry_options: merge_retry_options( construct_retry(method_config, service_config['retry_codes'], service_config['retry_params'], retry_names), construct_retry(overriding_method, overrides['retry_codes'], overrides['retry_params'], retry_names) ), page_descriptor: page_descriptors[snake_name], bundler: construct_bundling(bundling_config, bundle_descriptor), bundle_descriptor: bundle_descriptor, metadata: metadata, errors: errors ) end defaults end
Converts an rpc call into an API call governed by the settings.
In typical usage, func
will be a proc used to make an rpc request. This will mostly likely be a bound method from a request stub used to make an rpc call.
The result is created by applying a series of function decorators defined in this module to func
. settings
is used to determine which function decorators to apply.
The result is another proc which for most values of settings
has the same signature as the original. Only when settings
configures bundling does the signature change.
@param func [Proc] used to make a bare rpc call @param settings [CallSettings] provides the settings for this call @param params_extractor [Proc] extracts routing header params from the
request
@param exception_transformer [Proc] if an API exception occurs this
transformer is given the original exception for custom processing instead of raising the error directly
@return [Proc] a bound method on a request stub used to make an rpc call @raise [StandardError] if settings
has incompatible values,
e.g, if bundling and page_streaming are both configured
# File lib/google/gax/api_callable.rb, line 227 def create_api_call(func, settings, params_extractor: nil, exception_transformer: nil) api_caller = proc do |api_call, request, _settings, block| api_call.call(request, block) end if settings.page_descriptor if settings.bundler? raise 'ApiCallable has incompatible settings: ' \ 'bundling and page streaming' end page_descriptor = settings.page_descriptor api_caller = page_streamable(page_descriptor.request_page_token_field, page_descriptor.response_page_token_field, page_descriptor.resource_field) elsif settings.bundler? api_caller = bundleable(settings.bundle_descriptor) end proc do |request, options = nil, &block| this_settings = settings.merge(options) if params_extractor params = params_extractor.call(request) this_settings = with_routing_header(this_settings, params) end api_call = if this_settings.retry_codes? retryable(func, this_settings.retry_options, this_settings.metadata) else add_timeout_arg(func, this_settings.timeout, this_settings.metadata) end begin api_caller.call(api_call, request, this_settings, block) rescue *settings.errors => e error_class = Google::Gax.from_error(e) error = error_class.new('RPC failed') raise error if exception_transformer.nil? exception_transformer.call error rescue StandardError => error raise error if exception_transformer.nil? exception_transformer.call error end end end
# File lib/google/gax/errors.rb, line 74 def from_error(error) if error.respond_to? :code grpc_error_class_for error.code else GaxError end end
Hack to determine if field_descriptor is for a map.
TODO(geigerj): Remove this once protobuf Ruby supports an official way to determine if a FieldDescriptor represents a map. See: github.com/google/protobuf/issues/3425
# File lib/google/gax/util.rb, line 142 def map_field?(field_descriptor) (field_descriptor.label == :repeated) && (field_descriptor.subtype.name.include? '_MapEntry_') end
Helper for construct_settings
.
Takes two retry options, and merges them into a single RetryOption instance.
@param retry_options [RetryOptions] The base RetryOptions
. @param overrides [RetryOptions] The RetryOptions
used for overriding
+retry+. Use the values if it is not nil. If entire +overrides+ is nli, ignore the base retry and return nil.
@return [RetryOptions, nil]
# File lib/google/gax/settings.rb, line 385 def merge_retry_options(retry_options, overrides) return nil if overrides.nil? if overrides.retry_codes.nil? && overrides.backoff_settings.nil? return retry_options end codes = retry_options.retry_codes codes = overrides.retry_codes unless overrides.retry_codes.nil? backoff_settings = retry_options.backoff_settings unless overrides.backoff_settings.nil? backoff_settings = overrides.backoff_settings end RetryOptions.new(codes, backoff_settings) end
Creates a proc that yields an iterable to performs page-streaming.
@param a_func [Proc] an API call that is page streaming. @param request_page_token_field [String] The field of the page
token in the request.
@param response_page_token_field [String] The field of the next
page token in the response.
@param resource_field [String] The field to be streamed. @param page_token [Object] The page_token for the first page to be
streamed, or nil.
@return [Proc] A proc that returns an iterable over the specified field.
# File lib/google/gax/api_callable.rb, line 311 def page_streamable(request_page_token_field, response_page_token_field, resource_field) enumerable = PagedEnumerable.new(request_page_token_field, response_page_token_field, resource_field) enumerable.method(:start) end
Creates a proc equivalent to a_func, but that retries on certain exceptions.
@param a_func [Proc] @param retry_options [RetryOptions] Configures the exceptions
upon which the proc should retry, and the parameters to the exponential backoff retry algorithm.
@param metadata [Hash] request metadata headers @return [Proc] A proc that will retry on exception.
# File lib/google/gax/api_callable.rb, line 343 def retryable(a_func, retry_options, metadata) delay_mult = retry_options.backoff_settings.retry_delay_multiplier max_delay = (retry_options.backoff_settings.max_retry_delay_millis / MILLIS_PER_SECOND) timeout_mult = retry_options.backoff_settings.rpc_timeout_multiplier max_timeout = (retry_options.backoff_settings.max_rpc_timeout_millis / MILLIS_PER_SECOND) total_timeout = (retry_options.backoff_settings.total_timeout_millis / MILLIS_PER_SECOND) proc do |request, block| delay = retry_options.backoff_settings.initial_retry_delay_millis timeout = (retry_options.backoff_settings.initial_rpc_timeout_millis / MILLIS_PER_SECOND) deadline = Time.now + total_timeout begin op = a_func.call(request, deadline: Time.now + timeout, metadata: metadata, return_op: true) res = op.execute block.call res, op if block res rescue => exception unless exception.respond_to?(:code) && retry_options.retry_codes.include?(exception.code) raise RetryError.new('Exception occurred in retry method that ' \ 'was not classified as transient') end sleep(rand(delay) / MILLIS_PER_SECOND) now = Time.now delay = [delay * delay_mult, max_delay].min timeout = [timeout * timeout_mult, max_timeout, deadline - now].min if now >= deadline raise RetryError.new('Retry total timeout exceeded with exception') end retry end end end
Helper function for compute_bundle_id
. Used to retrieve a nested field signified by name where dots in name indicate nested objects.
@param obj [Object] an object. @param name [String] a name for a field in the object. @return [String, nil] value of named attribute. Can be nil.
# File lib/google/gax/bundling.rb, line 51 def str_dotted_access(obj, name) name.split('.').each do |part| obj = obj[part] break if obj.nil? end obj.nil? ? nil : obj.to_s end
Utility for converting a Ruby Time instance to a Google::Protobuf::Timestamp.
@param time [Time] The Time to be converted.
@return [Google::Protobuf::Timestamp] The converted
Google::Protobuf::Timestamp.
# File lib/google/gax/util.rb, line 180 def time_to_timestamp(time) Google::Protobuf::Timestamp.new(seconds: time.to_i, nanos: time.nsec) end
Utility for converting a Google::Protobuf::Timestamp instance to a Ruby time.
@param timestamp [Google::Protobuf::Timestamp] The timestamp to be
converted.
@return [Time] The converted Time.
# File lib/google/gax/util.rb, line 169 def timestamp_to_time(timestamp) Time.at(timestamp.nanos * 10**-9 + timestamp.seconds) end
Creates an instance of a protobuf message from a hash that may include nested hashes. `google/protobuf` allows for the instantiation of protobuf messages using hashes but does not allow for nested hashes to instantiate nested submessages.
@param hash [Hash || Class] The hash to be converted into a proto message.
If an instance of the proto message class is given, it is returned unchanged.
@param message_class [Class] The corresponding protobuf message class of
the given hash.
@return [Object] An instance of the given message class.
# File lib/google/gax/util.rb, line 54 def to_proto(hash, message_class) return hash if hash.is_a? message_class # Sanity check: input must be a Hash unless hash.is_a? Hash raise ArgumentError.new( "Value #{hash} must be a Hash or a #{message_class.name}" ) end hash = coerce_submessages(hash, message_class) message_class.new(hash) end
Port of GRPC::GenericService.underscore that works on frozen strings. Note that this function often is used on strings inside Hashes, which are frozen by default, so the GRPC implementation cannot be used directly.
TODO(geigerj): Consider whether this logic can be factored out into a shared location that both gRPC and GAX can depend on in order to remove the additional dependency on gRPC this introduces.
# File lib/google/gax/settings.rb, line 409 def upper_camel_to_lower_underscore(s) s = s.gsub(/([A-Z]+)([A-Z][a-z])/, '\1_\2') s = s.gsub(/([a-z\d])([A-Z])/, '\1_\2') s = s.tr('-', '_') s = s.downcase s end
Create a new CallSettings
with the routing metadata from the request header params merged with the given settings.
@param settings [CallSettings] the settings for an API call. @param params [Hash] the request header params. @return [CallSettings] a new merged settings.
# File lib/google/gax/api_callable.rb, line 326 def with_routing_header(settings, params) routing_header = params.map { |k, v| "#{k}=#{v}" }.join('&') options = CallOptions.new( metadata: { 'x-goog-request-params' => routing_header } ) settings.merge(options) end