class Google::Gax::Executor

Organizes bundling for an api service that requires it.

Public Class Methods

new(options, timer: Timer.new) click to toggle source

@param options [BundleOptions]configures strategy this instance

uses when executing bundled functions.

@param timer [Timer] the timer is used to handle the functionality of

timing threads.
# File lib/google/gax/bundling.rb, line 259
def initialize(options, timer: Timer.new)
  @options = options
  @tasks = {}
  @timer = timer

  # Use a Monitor in order to have the mutex behave like a reentrant lock.
  @tasks_lock = Monitor.new
end

Public Instance Methods

close() click to toggle source

This function should be called before the main thread exits in order to ensure that all api calls are made.

# File lib/google/gax/bundling.rb, line 362
def close
  @tasks_lock.synchronize do
    @tasks.each do |bundle_id, _|
      run_now(bundle_id)
    end
  end
end
schedule(api_call, bundle_id, bundle_desc, bundling_request) click to toggle source

Schedules bundle_desc of bundling_request as part of bundle id.

@param api_call [Proc] used to make an api call when the task is run. @param bundle_id [String] the id of this bundle. @param bundle_desc [BundleDescriptor] describes the structure of the

bundled call.

@param bundling_request [Object] the request to pass as the arg to

the api_call.

@return [Event] an Event that can be used to wait on the response.

# File lib/google/gax/bundling.rb, line 278
def schedule(api_call, bundle_id, bundle_desc,
             bundling_request)
  bundle = bundle_for(api_call, bundle_id, bundle_desc,
                      bundling_request)
  elts = bundling_request[bundle_desc.bundled_field.to_s]
  event = bundle.extend(elts)

  count_threshold = @options.element_count_threshold
  if count_threshold > 0 && bundle.element_count >= count_threshold
    run_now(bundle.bundle_id)
  end

  size_threshold = @options.request_byte_threshold
  if size_threshold > 0 && bundle.request_bytesize >= size_threshold
    run_now(bundle.bundle_id)
  end

  # TODO: Implement byte and element count limits.

  event
end

Private Instance Methods

bundle_for(api_call, bundle_id, bundle_desc, bundling_request) click to toggle source

Helper function for schedule.

Given a return the corresponding bundle for a certain bundle id. Create a new bundle if the bundle does not exist yet.

@param api_call [Proc] used to make an api call when the task is run. @param bundle_id [String] the id of this bundle. @param bundle_desc [BundleDescriptor] describes the structure of the

bundled call.

@param bundling_request [Object] the request to pass as the arg to

the api_call.

@return [Task] the bundle containing the api_call.

# File lib/google/gax/bundling.rb, line 312
def bundle_for(api_call, bundle_id, bundle_desc, bundling_request)
  @tasks_lock.synchronize do
    return @tasks[bundle_id] if @tasks.key?(bundle_id)
    bundle = Task.new(api_call, bundle_id, bundle_desc.bundled_field,
                      bundling_request,
                      subresponse_field: bundle_desc.subresponse_field)
    delay_threshold_millis = @options.delay_threshold_millis
    if delay_threshold_millis > 0
      run_later(bundle.bundle_id, delay_threshold_millis)
    end
    @tasks[bundle_id] = bundle
    return bundle
  end
end
run_later(bundle_id, delay_threshold_millis) click to toggle source

Helper function for schedule.

Creates a new thread that will execute the encapsulated api calls after the delay_threshold_millis has elapsed. The thread that is spawned is added to the @threads hash to ensure that the thread will api call is made before the main thread exits.

@param bundle_id [String] the id corresponding to the bundle that

is run.

@param delay_threshold_millis [Numeric] the number of micro-seconds to

wait before running the bundle.
# File lib/google/gax/bundling.rb, line 338
def run_later(bundle_id, delay_threshold_millis)
  Thread.new do
    @timer.run_after(delay_threshold_millis / MILLIS_PER_SECOND) do
      run_now(bundle_id)
    end
  end
end
run_now(bundle_id) click to toggle source

Helper function for schedule.

Immediately runs the bundle corresponding to the bundle id. @param bundle_id [String] the id corresponding to the bundle that

is run.
# File lib/google/gax/bundling.rb, line 351
def run_now(bundle_id)
  @tasks_lock.synchronize do
    if @tasks.key?(bundle_id)
      task = @tasks.delete(bundle_id)
      task.run
    end
  end
end