class Gruf::SynchronizedClient

Ensures that we only have one active call to a given endpoint with a given set of params. This can be useful to mitigate thundering herds.

Attributes

unsynchronized_methods[R]

Public Class Methods

new(service:, options: {}, client_options: {}) click to toggle source

Initialize the client and setup the stub

@param [Module] service The namespace of the client Stub that is desired to load @param [Hash] options A hash of options for the client @option options [Array] :unsynchronized_methods A list of methods (as symbols) that

should be excluded from synchronization

@option options [Integer] :internal_cache_expiry The length of time to keep results

around for other threads to fetch (in seconds)

@param [Hash] client_options A hash of options to pass to the gRPC client stub

Calls superclass method Gruf::Client::new
# File lib/gruf/synchronized_client.rb, line 39
def initialize(service:, options: {}, client_options: {})
  @unsynchronized_methods = options.delete(:unsynchronized_methods) { [] }
  @expiry = options.delete(:internal_cache_expiry) { Gruf.synchronized_client_internal_cache_expiry }
  @locks = Concurrent::Map.new
  @results = Concurrent::Map.new
  super
end

Public Instance Methods

call(request_method, params = {}, metadata = {}, opts = {}, &block) click to toggle source

Call the client's method with given params. If another call is already active for the same endpoint and the same params, block until the active call is complete. When unblocked, callers will get a copy of the original result.

@param [String|Symbol] request_method The method that is being requested on the service @param [Hash] params (Optional) A hash of parameters that will be inserted into the gRPC request

message that is required for the given above call

@param [Hash] metadata (Optional) A hash of metadata key/values that are transported with the client request @param [Hash] opts (Optional) A hash of options to send to the gRPC request_response method @return [Gruf::Response] The response from the server @raise [Gruf::Client::Error|GRPC::BadStatus] If an error occurs, an exception will be raised according to the error type that was returned

Calls superclass method Gruf::Client#call
# File lib/gruf/synchronized_client.rb, line 60
def call(request_method, params = {}, metadata = {}, opts = {}, &block)
  # Allow for bypassing extra behavior for selected methods
  return super if unsynchronized_methods.include?(request_method.to_sym)

  # Generate a unique key based on the method and params
  key = "#{request_method}.#{params.hash}"

  # Create a lock for this call if we haven't seen it already, then acquire it
  lock = @locks.compute_if_absent(key) { Mutex.new }
  lock.synchronize do
    # Return value from results cache if it exists. This occurs for callers that were
    # waiting on the lock while the first caller was making the actual grpc call.
    response = @results.get(lock)
    if response
      Gruf.logger.debug "Returning cached result for #{key}:#{lock.inspect}"
      next response
    end

    # Make the grpc call and record response for other callers that are blocked
    # on the same lock
    response = super
    @results.put(lock, response)

    # Schedule a task to come in later and clean out result to prevent memory bloat
    Concurrent::ScheduledTask.new(@expiry, args: [@results, lock]) { |h, k| h.delete(k) }.execute

    # Remove the lock from the map. The next caller to come through with the
    # same params will create a new lock and start the process over again.
    # Anyone who was waiting on this call will be using a local reference
    # to the same lock as us, and will fetch the result from the cache.
    @locks.delete(key)

    # Return response
    response
  end
end