class Google::Gax::Executor
Organizes bundling for an api service that requires it.
Public Class Methods
@param options [BundleOptions]configures strategy this instance
uses when executing bundled functions.
@param timer [Timer] the timer is used to handle the functionality of
timing threads.
# File lib/google/gax/bundling.rb, line 259 def initialize(options, timer: Timer.new) @options = options @tasks = {} @timer = timer # Use a Monitor in order to have the mutex behave like a reentrant lock. @tasks_lock = Monitor.new end
Public Instance Methods
This function should be called before the main thread exits in order to ensure that all api calls are made.
# File lib/google/gax/bundling.rb, line 362 def close @tasks_lock.synchronize do @tasks.each do |bundle_id, _| run_now(bundle_id) end end end
Schedules bundle_desc of bundling_request as part of bundle id.
@param api_call [Proc] used to make an api call when the task is run. @param bundle_id [String] the id of this bundle. @param bundle_desc [BundleDescriptor] describes the structure of the
bundled call.
@param bundling_request [Object] the request to pass as the arg to
the api_call.
@return [Event] an Event
that can be used to wait on the response.
# File lib/google/gax/bundling.rb, line 278 def schedule(api_call, bundle_id, bundle_desc, bundling_request) bundle = bundle_for(api_call, bundle_id, bundle_desc, bundling_request) elts = bundling_request[bundle_desc.bundled_field.to_s] event = bundle.extend(elts) count_threshold = @options.element_count_threshold if count_threshold > 0 && bundle.element_count >= count_threshold run_now(bundle.bundle_id) end size_threshold = @options.request_byte_threshold if size_threshold > 0 && bundle.request_bytesize >= size_threshold run_now(bundle.bundle_id) end # TODO: Implement byte and element count limits. event end
Private Instance Methods
Helper function for schedule
.
Given a return the corresponding bundle for a certain bundle id. Create a new bundle if the bundle does not exist yet.
@param api_call [Proc] used to make an api call when the task is run. @param bundle_id [String] the id of this bundle. @param bundle_desc [BundleDescriptor] describes the structure of the
bundled call.
@param bundling_request [Object] the request to pass as the arg to
the api_call.
@return [Task] the bundle containing the api_call
.
# File lib/google/gax/bundling.rb, line 312 def bundle_for(api_call, bundle_id, bundle_desc, bundling_request) @tasks_lock.synchronize do return @tasks[bundle_id] if @tasks.key?(bundle_id) bundle = Task.new(api_call, bundle_id, bundle_desc.bundled_field, bundling_request, subresponse_field: bundle_desc.subresponse_field) delay_threshold_millis = @options.delay_threshold_millis if delay_threshold_millis > 0 run_later(bundle.bundle_id, delay_threshold_millis) end @tasks[bundle_id] = bundle return bundle end end
Helper function for schedule
.
Creates a new thread that will execute the encapsulated api calls after the delay_threshold_millis
has elapsed. The thread that is spawned is added to the @threads hash to ensure that the thread will api call is made before the main thread exits.
@param bundle_id [String] the id corresponding to the bundle that
is run.
@param delay_threshold_millis [Numeric] the number of micro-seconds to
wait before running the bundle.
# File lib/google/gax/bundling.rb, line 338 def run_later(bundle_id, delay_threshold_millis) Thread.new do @timer.run_after(delay_threshold_millis / MILLIS_PER_SECOND) do run_now(bundle_id) end end end
Helper function for schedule
.
Immediately runs the bundle corresponding to the bundle id. @param bundle_id [String] the id corresponding to the bundle that
is run.
# File lib/google/gax/bundling.rb, line 351 def run_now(bundle_id) @tasks_lock.synchronize do if @tasks.key?(bundle_id) task = @tasks.delete(bundle_id) task.run end end end