class Google::Gax::Task
Coordinates the execution of a single bundle.
@!attribute [r] bundle_id
@return [String] the id of this bundle.
@!attribute [r] bundled_field
@return [String] the field used to create the bundled request.
@!attribute [r] subresponse_field
@return [String] tptional field used to demultiplex responses.
Attributes
Public Class Methods
@param api_call [Proc] used to make an api call when the task is run. @param bundle_id
[String] the id of this bundle. @param bundled_field
[String] the field used to create the
bundled request.
@param bundling_request [Object] the request to pass as the arg to
the api_call.
@param subresponse_field
[String] optional field used to demultiplex
responses.
# File lib/google/gax/bundling.rb, line 100 def initialize(api_call, bundle_id, bundled_field, bundling_request, subresponse_field: nil) @api_call = api_call @bundle_id = bundle_id @bundled_field = bundled_field @bundling_request = bundling_request @subresponse_field = subresponse_field @inputs = [] @events = [] end
Public Instance Methods
The number of bundled elements in the repeated field. @return [Numeric]
# File lib/google/gax/bundling.rb, line 116 def element_count @inputs.reduce(0) { |acc, elem| acc + elem.count } end
This adds elements to the tasks.
@param elts [Array<Object>] an array of elements that can be appended
to the tasks bundle_field.
@return [Event] an Event
that can be used to wait on the response.
# File lib/google/gax/bundling.rb, line 204 def extend(elts) elts = [*elts] @inputs.push(elts) event = event_for(elts) @events.push(event) event end
The size of the request in bytes of the bundled field elements. @return [Numeric]
# File lib/google/gax/bundling.rb, line 122 def request_bytesize @inputs.reduce(0) do |sum, elts| sum + elts.reduce(0) do |inner_sum, elt| inner_sum + elt.to_s.bytesize end end end
Call the task's api_call.
The task's func will be called with the bundling requests function.
# File lib/google/gax/bundling.rb, line 133 def run return if @inputs.count == 0 request = @bundling_request request[@bundled_field].clear request[@bundled_field].concat(@inputs.flatten) if !@subresponse_field.nil? run_with_subresponses(request) else run_with_no_subresponse(request) end end
Private Instance Methods
Creates a cancellation proc that removes elts.
The returned proc returns true if all elements were successfully removed from @inputs and @events.
@param elts [Array<Object>] an array of elements that can be appended
to the tasks bundle_field.
@param [Event] an Event
that can be used to wait on the response. @return [Proc] the canceller that when called removes the elts
and events.
# File lib/google/gax/bundling.rb, line 233 def canceller_for(elts, event) proc do event_index = @events.find_index(event) || -1 in_index = @inputs.find_index(elts) || -1 @events.delete_at(event_index) unless event_index == -1 @inputs.delete_at(in_index) unless in_index == -1 if event_index == -1 || in_index == -1 false else true end end end
Creates an Event
that is set when the bundle with elts is sent.
@param elts [Array<Object>] an array of elements that can be appended
to the tasks bundle_field.
@return [Event] an Event
that can be used to wait on the response.
# File lib/google/gax/bundling.rb, line 217 def event_for(elts) event = Event.new event.canceller = canceller_for(elts, event) event end
Helper for run
to run the api call with no subresponses.
@param request [Object] the request to pass as the arg to
the api_call.
# File lib/google/gax/bundling.rb, line 149 def run_with_no_subresponse(request) response = @api_call.call(request) @events.each do |event| event.result = response end rescue GaxError => err @events.each do |event| event.result = err end ensure @inputs.clear @events.clear end
Helper for run
to run the api call with subresponses.
@param request [Object] the request to pass as the arg to
the api_call.
@param subresponse_field
subresponse_field.
# File lib/google/gax/bundling.rb, line 168 def run_with_subresponses(request) response = @api_call.call(request) in_sizes_sum = 0 @inputs.each { |elts| in_sizes_sum += elts.count } all_subresponses = response[@subresponse_field.to_s] if all_subresponses.count != in_sizes_sum # TODO: Implement a logging class to handle this. # warn DEMUX_WARNING @events.each do |event| event.result = response end else start = 0 @inputs.zip(@events).each do |i, event| response_copy = response.dup subresponses = all_subresponses[start, i.count] response_copy[@subresponse_field].clear response_copy[@subresponse_field].concat(subresponses) start += i.count event.result = response_copy end end rescue GaxError => err @events.each do |event| event.result = err end ensure @inputs.clear @events.clear end