class Google::Cloud::Bigquery::Table::AsyncInserter

# AsyncInserter

Used to insert multiple rows in batches to a topic. See {Google::Cloud::Bigquery::Table#insert_async}.

@example

require "google/cloud/bigquery"

bigquery = Google::Cloud::Bigquery.new
dataset = bigquery.dataset "my_dataset"
table = dataset.table "my_table"
inserter = table.insert_async do |result|
  if result.error?
    log_error result.error
  else
    log_insert "inserted #{result.insert_count} rows " \
      "with #{result.error_count} errors"
  end
end

rows = [
  { "first_name" => "Alice", "age" => 21 },
  { "first_name" => "Bob", "age" => 22 }
]
inserter.insert rows

inserter.stop.wait!

@attr_reader [Integer] max_bytes The maximum size of rows to be

collected before the batch is inserted. Default is 10,000,000
(10MB).

@attr_reader [Integer] max_rows The maximum number of rows to be

collected before the batch is inserted. Default is 500

@attr_reader [Numeric] interval The number of seconds to collect rows

before the batch is inserted. Default is 10

@attr_reader [Integer] threads The number of threads used to insert

rows. Default is 4

Attributes

batch[R]

@private Implementation accessors

interval[R]
max_bytes[R]
max_rows[R]
table[R]

@private Implementation accessors

threads[R]

Public Class Methods

new(table, skip_invalid: nil, ignore_unknown: nil, max_bytes: 10_000_000, max_rows: 500, interval: 10, threads: 4, &block) click to toggle source

@private

Calls superclass method
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 77
def initialize table, skip_invalid: nil, ignore_unknown: nil, max_bytes: 10_000_000, max_rows: 500,
               interval: 10, threads: 4, &block
  # init MonitorMixin
  super()

  @table = table
  @skip_invalid = skip_invalid
  @ignore_unknown = ignore_unknown

  @max_bytes = max_bytes
  @max_rows = max_rows
  @interval = interval
  @threads = threads
  @callback = block

  @batch = nil

  @thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @threads

  @cond = new_cond
end

Public Instance Methods

flush() click to toggle source

Forces all rows in the current batch to be inserted immediately.

@return [AsyncInserter] returns self so calls can be chained.

# File lib/google/cloud/bigquery/table/async_inserter.rb, line 219
def flush
  synchronize do
    push_batch_request!
    @cond.signal
  end

  self
end
insert(rows, insert_ids: nil) click to toggle source

Adds rows to the async inserter to be inserted. Rows will be collected in batches and inserted together. See {Google::Cloud::Bigquery::Table#insert_async}.

Simple Ruby types are generally accepted per JSON rules, along with the following support for BigQuery's more complex types:

| BigQuery | Ruby | Notes | |————–|————————————–|—————————————————-| | `NUMERIC` | `BigDecimal` | `BigDecimal` values will be rounded to scale 9. | | `BIGNUMERIC` | `String` | Pass as `String` to avoid rounding to scale 9. | | `DATETIME` | `DateTime` | `DATETIME` does not support time zone. | | `DATE` | `Date` | | | `GEOGRAPHY` | `String` | | | `TIMESTAMP` | `Time` | | | `TIME` | `Google::Cloud::BigQuery::Time` | | | `BYTES` | `File`, `IO`, `StringIO`, or similar | | | `ARRAY` | `Array` | Nested arrays, `nil` values are not supported. | | `STRUCT` | `Hash` | Hash keys may be strings or symbols. |

Because BigQuery's streaming API is designed for high insertion rates, modifications to the underlying table metadata are eventually consistent when interacting with the streaming system. In most cases metadata changes are propagated within minutes, but during this period API responses may reflect the inconsistent state of the table.

@see cloud.google.com/bigquery/streaming-data-into-bigquery

Streaming Data Into BigQuery

@see cloud.google.com/bigquery/troubleshooting-errors#metadata-errors-for-streaming-inserts

BigQuery Troubleshooting: Metadata errors for streaming inserts

@param [Hash, Array<Hash>] rows A hash object or array of hash objects

containing the data. Required. `BigDecimal` values will be rounded to
scale 9 to conform with the BigQuery `NUMERIC` data type. To avoid
rounding `BIGNUMERIC` type values with scale greater than 9, use `String`
instead of `BigDecimal`.

@param [Array<String|Symbol>, Symbol] insert_ids A unique ID for each row. BigQuery uses this property to

 detect duplicate insertion requests on a best-effort basis. For more information, see [data
 consistency](https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency). Optional. If
 not provided, the client library will assign a UUID to each row before the request is sent.

The value `:skip` can be provided to skip the generation of IDs for all rows, or to skip the generation of
an ID for a specific row in the array.
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 146
def insert rows, insert_ids: nil
  return nil if rows.nil?
  return nil if rows.is_a?(Array) && rows.empty?
  rows, insert_ids = validate_insert_args rows, insert_ids

  synchronize do
    rows.zip(Array(insert_ids)).each do |row, insert_id|
      if @batch.nil?
        @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows
        @batch.insert row, insert_id
      else
        unless @batch.try_insert row, insert_id
          push_batch_request!

          @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows
          @batch.insert row, insert_id
        end
      end

      @batch_created_at ||= ::Time.now
      @background_thread ||= Thread.new { run_background }

      push_batch_request! if @batch.ready?
    end

    @cond.signal
  end

  true
end
started?() click to toggle source

Whether the inserter has been started.

@return [boolean] `true` when started, `false` otherwise.

# File lib/google/cloud/bigquery/table/async_inserter.rb, line 233
def started?
  !stopped?
end
stop() click to toggle source

Begins the process of stopping the inserter. Rows already in the queue will be inserted, but no new rows can be added. Use {#wait!} to block until the inserter is fully stopped and all pending rows have been inserted.

@return [AsyncInserter] returns self so calls can be chained.

# File lib/google/cloud/bigquery/table/async_inserter.rb, line 185
def stop
  synchronize do
    break if @stopped

    @stopped = true
    push_batch_request!
    @cond.signal
  end

  self
end
stopped?() click to toggle source

Whether the inserter has been stopped.

@return [boolean] `true` when stopped, `false` otherwise.

# File lib/google/cloud/bigquery/table/async_inserter.rb, line 242
def stopped?
  synchronize { @stopped }
end
wait!(timeout = nil) click to toggle source

Blocks until the inserter is fully stopped, all pending rows have been inserted, and all callbacks have completed. Does not stop the inserter. To stop the inserter, first call {#stop} and then call {#wait!} to block until the inserter is stopped.

@return [AsyncInserter] returns self so calls can be chained.

# File lib/google/cloud/bigquery/table/async_inserter.rb, line 205
def wait! timeout = nil
  synchronize do
    @thread_pool.shutdown
    @thread_pool.wait_for_termination timeout
  end

  self
end

Protected Instance Methods

push_batch_request!() click to toggle source
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 283
def push_batch_request!
  return unless @batch

  orig_rows = @batch.rows
  json_rows = @batch.json_rows
  insert_ids = @batch.insert_ids
  Concurrent::Future.new executor: @thread_pool do
    raise ArgumentError, "No rows provided" if json_rows.empty?
    insert_resp = @table.service.insert_tabledata_json_rows @table.dataset_id,
                                                            @table.table_id,
                                                            json_rows,
                                                            skip_invalid: @skip_invalid,
                                                            ignore_unknown: @ignore_unknown,
                                                            insert_ids: insert_ids

    result = Result.new InsertResponse.from_gapi(orig_rows, insert_resp)
  rescue StandardError => e
    result = Result.new nil, e
  ensure
    @callback&.call result
  end.execute

  @batch = nil
  @batch_created_at = nil
end
run_background() click to toggle source
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 261
def run_background
  synchronize do
    until @stopped
      if @batch.nil?
        @cond.wait
        next
      end

      time_since_first_publish = ::Time.now - @batch_created_at
      if time_since_first_publish < @interval
        # still waiting for the interval to insert the batch...
        timeout = @interval - time_since_first_publish
        @cond.wait timeout
      else
        # interval met, insert the batch...
        push_batch_request!
        @cond.wait
      end
    end
  end
end
validate_insert_args(rows, insert_ids) click to toggle source
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 248
def validate_insert_args rows, insert_ids
  rows = [rows] if rows.is_a? Hash
  raise ArgumentError, "No rows provided" if rows.empty?

  insert_ids = Array.new(rows.count) { :skip } if insert_ids == :skip
  insert_ids = Array insert_ids
  if insert_ids.count.positive? && insert_ids.count != rows.count
    raise ArgumentError, "insert_ids must be the same size as rows"
  end

  [rows, insert_ids]
end