class Google::Cloud::Bigquery::Table::AsyncInserter
Used to insert multiple rows in batches to a topic. See {Google::Cloud::Bigquery::Table#insert_async}.
@example
require "google/cloud/bigquery" bigquery = Google::Cloud::Bigquery.new dataset = bigquery.dataset "my_dataset" table = dataset.table "my_table" inserter = table.insert_async do |result| if result.error? log_error result.error else log_insert "inserted #{result.insert_count} rows " \ "with #{result.error_count} errors" end end rows = [ { "first_name" => "Alice", "age" => 21 }, { "first_name" => "Bob", "age" => 22 } ] inserter.insert rows inserter.stop.wait!
@attr_reader [Integer] max_bytes
The maximum size of rows to be
collected before the batch is inserted. Default is 10,000,000 (10MB).
@attr_reader [Integer] max_rows
The maximum number of rows to be
collected before the batch is inserted. Default is 500
@attr_reader [Numeric] interval The number of seconds to collect rows
before the batch is inserted. Default is 10
@attr_reader [Integer] threads The number of threads used to insert
rows. Default is 4
Attributes
@private Implementation accessors
@private Implementation accessors
Public Class Methods
@private
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 77 def initialize table, skip_invalid: nil, ignore_unknown: nil, max_bytes: 10_000_000, max_rows: 500, interval: 10, threads: 4, &block # init MonitorMixin super() @table = table @skip_invalid = skip_invalid @ignore_unknown = ignore_unknown @max_bytes = max_bytes @max_rows = max_rows @interval = interval @threads = threads @callback = block @batch = nil @thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @threads @cond = new_cond end
Public Instance Methods
Forces all rows in the current batch to be inserted immediately.
@return [AsyncInserter] returns self so calls can be chained.
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 219 def flush synchronize do push_batch_request! @cond.signal end self end
Adds rows to the async inserter to be inserted. Rows will be collected in batches and inserted together. See {Google::Cloud::Bigquery::Table#insert_async}.
Simple Ruby types are generally accepted per JSON rules, along with the following support for BigQuery's more complex types:
| BigQuery | Ruby | Notes | |————–|————————————–|—————————————————-| | `NUMERIC` | `BigDecimal` | `BigDecimal` values will be rounded to scale 9. | | `BIGNUMERIC` | `String` | Pass as `String` to avoid rounding to scale 9. | | `DATETIME` | `DateTime` | `DATETIME` does not support time zone. | | `DATE` | `Date` | | | `GEOGRAPHY` | `String` | | | `TIMESTAMP` | `Time` | | | `TIME` | `Google::Cloud::BigQuery::Time` | | | `BYTES` | `File`, `IO`, `StringIO`, or similar | | | `ARRAY` | `Array` | Nested arrays, `nil` values are not supported. | | `STRUCT` | `Hash` | Hash keys may be strings or symbols. |
Because BigQuery's streaming API is designed for high insertion rates, modifications to the underlying table metadata are eventually consistent when interacting with the streaming system. In most cases metadata changes are propagated within minutes, but during this period API responses may reflect the inconsistent state of the table.
@see cloud.google.com/bigquery/streaming-data-into-bigquery
Streaming Data Into BigQuery
@see cloud.google.com/bigquery/troubleshooting-errors#metadata-errors-for-streaming-inserts
BigQuery Troubleshooting: Metadata errors for streaming inserts
@param [Hash, Array<Hash>] rows A hash object or array of hash objects
containing the data. Required. `BigDecimal` values will be rounded to scale 9 to conform with the BigQuery `NUMERIC` data type. To avoid rounding `BIGNUMERIC` type values with scale greater than 9, use `String` instead of `BigDecimal`.
@param [Array<String|Symbol>, Symbol] insert_ids A unique ID for each row. BigQuery uses this property to
detect duplicate insertion requests on a best-effort basis. For more information, see [data consistency](https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency). Optional. If not provided, the client library will assign a UUID to each row before the request is sent. The value `:skip` can be provided to skip the generation of IDs for all rows, or to skip the generation of an ID for a specific row in the array.
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 146 def insert rows, insert_ids: nil return nil if rows.nil? return nil if rows.is_a?(Array) && rows.empty? rows, insert_ids = validate_insert_args rows, insert_ids synchronize do rows.zip(Array(insert_ids)).each do |row, insert_id| if @batch.nil? @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows @batch.insert row, insert_id else unless @batch.try_insert row, insert_id push_batch_request! @batch = Batch.new max_bytes: @max_bytes, max_rows: @max_rows @batch.insert row, insert_id end end @batch_created_at ||= ::Time.now @background_thread ||= Thread.new { run_background } push_batch_request! if @batch.ready? end @cond.signal end true end
Whether the inserter has been started.
@return [boolean] `true` when started, `false` otherwise.
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 233 def started? !stopped? end
Begins the process of stopping the inserter. Rows already in the queue will be inserted, but no new rows can be added. Use {#wait!} to block until the inserter is fully stopped and all pending rows have been inserted.
@return [AsyncInserter] returns self so calls can be chained.
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 185 def stop synchronize do break if @stopped @stopped = true push_batch_request! @cond.signal end self end
Whether the inserter has been stopped.
@return [boolean] `true` when stopped, `false` otherwise.
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 242 def stopped? synchronize { @stopped } end
Blocks until the inserter is fully stopped, all pending rows have been inserted, and all callbacks have completed. Does not stop the inserter. To stop the inserter, first call {#stop} and then call {#wait!} to block until the inserter is stopped.
@return [AsyncInserter] returns self so calls can be chained.
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 205 def wait! timeout = nil synchronize do @thread_pool.shutdown @thread_pool.wait_for_termination timeout end self end
Protected Instance Methods
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 283 def push_batch_request! return unless @batch orig_rows = @batch.rows json_rows = @batch.json_rows insert_ids = @batch.insert_ids Concurrent::Future.new executor: @thread_pool do raise ArgumentError, "No rows provided" if json_rows.empty? insert_resp = @table.service.insert_tabledata_json_rows @table.dataset_id, @table.table_id, json_rows, skip_invalid: @skip_invalid, ignore_unknown: @ignore_unknown, insert_ids: insert_ids result = Result.new InsertResponse.from_gapi(orig_rows, insert_resp) rescue StandardError => e result = Result.new nil, e ensure @callback&.call result end.execute @batch = nil @batch_created_at = nil end
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 261 def run_background synchronize do until @stopped if @batch.nil? @cond.wait next end time_since_first_publish = ::Time.now - @batch_created_at if time_since_first_publish < @interval # still waiting for the interval to insert the batch... timeout = @interval - time_since_first_publish @cond.wait timeout else # interval met, insert the batch... push_batch_request! @cond.wait end end end end
# File lib/google/cloud/bigquery/table/async_inserter.rb, line 248 def validate_insert_args rows, insert_ids rows = [rows] if rows.is_a? Hash raise ArgumentError, "No rows provided" if rows.empty? insert_ids = Array.new(rows.count) { :skip } if insert_ids == :skip insert_ids = Array insert_ids if insert_ids.count.positive? && insert_ids.count != rows.count raise ArgumentError, "insert_ids must be the same size as rows" end [rows, insert_ids] end