class Google::Gax::Task

Coordinates the execution of a single bundle.

@!attribute [r] bundle_id

@return [String] the id of this bundle.

@!attribute [r] bundled_field

@return [String] the field used to create the bundled request.

@!attribute [r] subresponse_field

@return [String] tptional field used to demultiplex responses.

Attributes

bundle_id[R]
bundled_field[R]
subresponse_field[R]

Public Class Methods

new(api_call, bundle_id, bundled_field, bundling_request, subresponse_field: nil) click to toggle source

@param api_call [Proc] used to make an api call when the task is run. @param bundle_id [String] the id of this bundle. @param bundled_field [String] the field used to create the

bundled request.

@param bundling_request [Object] the request to pass as the arg to

the api_call.

@param subresponse_field [String] optional field used to demultiplex

responses.
# File lib/google/gax/bundling.rb, line 100
def initialize(api_call,
               bundle_id,
               bundled_field,
               bundling_request,
               subresponse_field: nil)
  @api_call = api_call
  @bundle_id = bundle_id
  @bundled_field = bundled_field
  @bundling_request = bundling_request
  @subresponse_field = subresponse_field
  @inputs = []
  @events = []
end

Public Instance Methods

element_count() click to toggle source

The number of bundled elements in the repeated field. @return [Numeric]

# File lib/google/gax/bundling.rb, line 116
def element_count
  @inputs.reduce(0) { |acc, elem| acc + elem.count }
end
extend(elts) click to toggle source

This adds elements to the tasks.

@param elts [Array<Object>] an array of elements that can be appended

to the tasks bundle_field.

@return [Event] an Event that can be used to wait on the response.

# File lib/google/gax/bundling.rb, line 204
def extend(elts)
  elts = [*elts]
  @inputs.push(elts)
  event = event_for(elts)
  @events.push(event)
  event
end
request_bytesize() click to toggle source

The size of the request in bytes of the bundled field elements. @return [Numeric]

# File lib/google/gax/bundling.rb, line 122
def request_bytesize
  @inputs.reduce(0) do |sum, elts|
    sum + elts.reduce(0) do |inner_sum, elt|
      inner_sum + elt.to_s.bytesize
    end
  end
end
run() click to toggle source

Call the task's api_call.

The task's func will be called with the bundling requests function.

# File lib/google/gax/bundling.rb, line 133
def run
  return if @inputs.count == 0
  request = @bundling_request
  request[@bundled_field].clear
  request[@bundled_field].concat(@inputs.flatten)
  if !@subresponse_field.nil?
    run_with_subresponses(request)
  else
    run_with_no_subresponse(request)
  end
end

Private Instance Methods

canceller_for(elts, event) click to toggle source

Creates a cancellation proc that removes elts.

The returned proc returns true if all elements were successfully removed from @inputs and @events.

@param elts [Array<Object>] an array of elements that can be appended

to the tasks bundle_field.

@param [Event] an Event that can be used to wait on the response. @return [Proc] the canceller that when called removes the elts

and events.
# File lib/google/gax/bundling.rb, line 233
def canceller_for(elts, event)
  proc do
    event_index = @events.find_index(event) || -1
    in_index = @inputs.find_index(elts) || -1
    @events.delete_at(event_index) unless event_index == -1
    @inputs.delete_at(in_index) unless in_index == -1
    if event_index == -1 || in_index == -1
      false
    else
      true
    end
  end
end
event_for(elts) click to toggle source

Creates an Event that is set when the bundle with elts is sent.

@param elts [Array<Object>] an array of elements that can be appended

to the tasks bundle_field.

@return [Event] an Event that can be used to wait on the response.

# File lib/google/gax/bundling.rb, line 217
def event_for(elts)
  event = Event.new
  event.canceller = canceller_for(elts, event)
  event
end
run_with_no_subresponse(request) click to toggle source

Helper for run to run the api call with no subresponses.

@param request [Object] the request to pass as the arg to

the api_call.
# File lib/google/gax/bundling.rb, line 149
def run_with_no_subresponse(request)
  response = @api_call.call(request)
  @events.each do |event|
    event.result = response
  end
rescue GaxError => err
  @events.each do |event|
    event.result = err
  end
ensure
  @inputs.clear
  @events.clear
end
run_with_subresponses(request) click to toggle source

Helper for run to run the api call with subresponses.

@param request [Object] the request to pass as the arg to

the api_call.

@param subresponse_field subresponse_field.

# File lib/google/gax/bundling.rb, line 168
def run_with_subresponses(request)
  response = @api_call.call(request)
  in_sizes_sum = 0
  @inputs.each { |elts| in_sizes_sum += elts.count }
  all_subresponses = response[@subresponse_field.to_s]
  if all_subresponses.count != in_sizes_sum
    # TODO: Implement a logging class to handle this.
    # warn DEMUX_WARNING
    @events.each do |event|
      event.result = response
    end
  else
    start = 0
    @inputs.zip(@events).each do |i, event|
      response_copy = response.dup
      subresponses = all_subresponses[start, i.count]
      response_copy[@subresponse_field].clear
      response_copy[@subresponse_field].concat(subresponses)
      start += i.count
      event.result = response_copy
    end
  end
rescue GaxError => err
  @events.each do |event|
    event.result = err
  end
ensure
  @inputs.clear
  @events.clear
end