As of January 1, 2020 this library no longer supports Python 2 on the latest released version.
Library versions released prior to that date will continue to be available. For more information please
visit Python 2 support on Google Cloud.
Source code for google.cloud.bigtable.row_data
# Copyright 2016 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Container for Google Cloud Bigtable Cells and Streaming Row Contents."""
import copy
import grpc
from google.api_core import exceptions
from google.api_core import retry
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud._helpers import _to_bytes
from google.cloud.bigtable_v2.types import bigtable as data_messages_v2_pb2
from google.cloud.bigtable_v2.types import data as data_v2_pb2
_MISSING_COLUMN_FAMILY = "Column family {} is not among the cells stored in this row."
_MISSING_COLUMN = (
"Column {} is not among the cells stored in this row in the " "column family {}."
)
_MISSING_INDEX = (
"Index {!r} is not valid for the cells stored in this row for column {} "
"in the column family {}. There are {} such cells."
)
[docs]class Cell(object):
"""Representation of a Google Cloud Bigtable Cell.
:type value: bytes
:param value: The value stored in the cell.
:type timestamp_micros: int
:param timestamp_micros: The timestamp_micros when the cell was stored.
:type labels: list
:param labels: (Optional) List of strings. Labels applied to the cell.
"""
def __init__(self, value, timestamp_micros, labels=None):
self.value = value
self.timestamp_micros = timestamp_micros
self.labels = list(labels) if labels is not None else []
[docs] @classmethod
def from_pb(cls, cell_pb):
"""Create a new cell from a Cell protobuf.
:type cell_pb: :class:`._generated.data_pb2.Cell`
:param cell_pb: The protobuf to convert.
:rtype: :class:`Cell`
:returns: The cell corresponding to the protobuf.
"""
if cell_pb.labels:
return cls(cell_pb.value, cell_pb.timestamp_micros, labels=cell_pb.labels)
else:
return cls(cell_pb.value, cell_pb.timestamp_micros)
@property
def timestamp(self):
return _datetime_from_microseconds(self.timestamp_micros)
def __eq__(self, other):
if not isinstance(other, self.__class__):
return NotImplemented
return (
other.value == self.value
and other.timestamp_micros == self.timestamp_micros
and other.labels == self.labels
)
def __ne__(self, other):
return not self == other
def __repr__(self):
return "<{name} value={value!r} timestamp={timestamp}>".format(
name=self.__class__.__name__, value=self.value, timestamp=self.timestamp
)
[docs]class PartialCellData(object):
"""Representation of partial cell in a Google Cloud Bigtable Table.
These are expected to be updated directly from a
:class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse`
:type row_key: bytes
:param row_key: The key for the row holding the (partial) cell.
:type family_name: str
:param family_name: The family name of the (partial) cell.
:type qualifier: bytes
:param qualifier: The column qualifier of the (partial) cell.
:type timestamp_micros: int
:param timestamp_micros: The timestamp (in microsecods) of the
(partial) cell.
:type labels: list of str
:param labels: labels assigned to the (partial) cell
:type value: bytes
:param value: The (accumulated) value of the (partial) cell.
"""
def __init__(
self, row_key, family_name, qualifier, timestamp_micros, labels=(), value=b""
):
self.row_key = row_key
self.family_name = family_name
self.qualifier = qualifier
self.timestamp_micros = timestamp_micros
self.labels = labels
self.value = value
[docs] def append_value(self, value):
"""Append bytes from a new chunk to value.
:type value: bytes
:param value: bytes to append
"""
self.value += value
[docs]class PartialRowData(object):
"""Representation of partial row in a Google Cloud Bigtable Table.
These are expected to be updated directly from a
:class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse`
:type row_key: bytes
:param row_key: The key for the row holding the (partial) data.
"""
def __init__(self, row_key):
self._row_key = row_key
self._cells = {}
def __eq__(self, other):
if not isinstance(other, self.__class__):
return NotImplemented
return other._row_key == self._row_key and other._cells == self._cells
def __ne__(self, other):
return not self == other
[docs] def to_dict(self):
"""Convert the cells to a dictionary.
This is intended to be used with HappyBase, so the column family and
column qualiers are combined (with ``:``).
:rtype: dict
:returns: Dictionary containing all the data in the cells of this row.
"""
result = {}
for column_family_id, columns in self._cells.items():
for column_qual, cells in columns.items():
key = _to_bytes(column_family_id) + b":" + _to_bytes(column_qual)
result[key] = cells
return result
@property
def cells(self):
"""Property returning all the cells accumulated on this partial row.
For example:
.. literalinclude:: snippets_table.py
:start-after: [START bigtable_api_row_data_cells]
:end-before: [END bigtable_api_row_data_cells]
:dedent: 4
:rtype: dict
:returns: Dictionary of the :class:`Cell` objects accumulated. This
dictionary has two-levels of keys (first for column families
and second for column names/qualifiers within a family). For
a given column, a list of :class:`Cell` objects is stored.
"""
return self._cells
@property
def row_key(self):
"""Getter for the current (partial) row's key.
:rtype: bytes
:returns: The current (partial) row's key.
"""
return self._row_key
[docs] def find_cells(self, column_family_id, column):
"""Get a time series of cells stored on this instance.
For example:
.. literalinclude:: snippets_table.py
:start-after: [START bigtable_api_row_find_cells]
:end-before: [END bigtable_api_row_find_cells]
:dedent: 4
Args:
column_family_id (str): The ID of the column family. Must be of the
form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``.
column (bytes): The column within the column family where the cells
are located.
Returns:
List[~google.cloud.bigtable.row_data.Cell]: The cells stored in the
specified column.
Raises:
KeyError: If ``column_family_id`` is not among the cells stored
in this row.
KeyError: If ``column`` is not among the cells stored in this row
for the given ``column_family_id``.
"""
try:
column_family = self._cells[column_family_id]
except KeyError:
raise KeyError(_MISSING_COLUMN_FAMILY.format(column_family_id))
try:
cells = column_family[column]
except KeyError:
raise KeyError(_MISSING_COLUMN.format(column, column_family_id))
return cells
[docs] def cell_value(self, column_family_id, column, index=0):
"""Get a single cell value stored on this instance.
For example:
.. literalinclude:: snippets_table.py
:start-after: [START bigtable_api_row_cell_value]
:end-before: [END bigtable_api_row_cell_value]
:dedent: 4
Args:
column_family_id (str): The ID of the column family. Must be of the
form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``.
column (bytes): The column within the column family where the cell
is located.
index (Optional[int]): The offset within the series of values. If
not specified, will return the first cell.
Returns:
~google.cloud.bigtable.row_data.Cell value: The cell value stored
in the specified column and specified index.
Raises:
KeyError: If ``column_family_id`` is not among the cells stored
in this row.
KeyError: If ``column`` is not among the cells stored in this row
for the given ``column_family_id``.
IndexError: If ``index`` cannot be found within the cells stored
in this row for the given ``column_family_id``, ``column``
pair.
"""
cells = self.find_cells(column_family_id, column)
try:
cell = cells[index]
except (TypeError, IndexError):
num_cells = len(cells)
msg = _MISSING_INDEX.format(index, column, column_family_id, num_cells)
raise IndexError(msg)
return cell.value
[docs] def cell_values(self, column_family_id, column, max_count=None):
"""Get a time series of cells stored on this instance.
For example:
.. literalinclude:: snippets_table.py
:start-after: [START bigtable_api_row_cell_values]
:end-before: [END bigtable_api_row_cell_values]
:dedent: 4
Args:
column_family_id (str): The ID of the column family. Must be of the
form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``.
column (bytes): The column within the column family where the cells
are located.
max_count (int): The maximum number of cells to use.
Returns:
A generator which provides: cell.value, cell.timestamp_micros
for each cell in the list of cells
Raises:
KeyError: If ``column_family_id`` is not among the cells stored
in this row.
KeyError: If ``column`` is not among the cells stored in this row
for the given ``column_family_id``.
"""
cells = self.find_cells(column_family_id, column)
if max_count is None:
max_count = len(cells)
for index, cell in enumerate(cells):
if index == max_count:
break
yield cell.value, cell.timestamp_micros
[docs]class InvalidReadRowsResponse(RuntimeError):
"""Exception raised to to invalid response data from back-end."""
[docs]class InvalidChunk(RuntimeError):
"""Exception raised to to invalid chunk data from back-end."""
def _retry_read_rows_exception(exc):
if isinstance(exc, grpc.RpcError):
exc = exceptions.from_grpc_error(exc)
return isinstance(exc, (exceptions.ServiceUnavailable, exceptions.DeadlineExceeded))
DEFAULT_RETRY_READ_ROWS = retry.Retry(
predicate=_retry_read_rows_exception,
initial=1.0,
maximum=15.0,
multiplier=2.0,
deadline=60.0, # 60 seconds
)
"""The default retry strategy to be used on retry-able errors.
Used by
:meth:`~google.cloud.bigtable.row_data.PartialRowsData._read_next_response`.
"""
[docs]class PartialRowsData(object):
"""Convenience wrapper for consuming a ``ReadRows`` streaming response.
:type read_method: :class:`client._table_data_client.read_rows`
:param read_method: ``ReadRows`` method.
:type request: :class:`data_messages_v2_pb2.ReadRowsRequest`
:param request: The ``ReadRowsRequest`` message used to create a
ReadRowsResponse iterator. If the iterator fails, a new
iterator is created, allowing the scan to continue from
the point just beyond the last successfully read row,
identified by self.last_scanned_row_key. The retry happens
inside of the Retry class, using a predicate for the
expected exceptions during iteration.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) Retry delay and deadline arguments. To override,
the default value :attr:`DEFAULT_RETRY_READ_ROWS` can be
used and modified with the
:meth:`~google.api_core.retry.Retry.with_delay` method
or the
:meth:`~google.api_core.retry.Retry.with_deadline` method.
"""
NEW_ROW = "New row" # No cells yet complete for row
ROW_IN_PROGRESS = "Row in progress" # Some cells complete for row
CELL_IN_PROGRESS = "Cell in progress" # Incomplete cell for row
STATE_NEW_ROW = 1
STATE_ROW_IN_PROGRESS = 2
STATE_CELL_IN_PROGRESS = 3
read_states = {
STATE_NEW_ROW: NEW_ROW,
STATE_ROW_IN_PROGRESS: ROW_IN_PROGRESS,
STATE_CELL_IN_PROGRESS: CELL_IN_PROGRESS,
}
def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS):
# Counter for rows returned to the user
self._counter = 0
# In-progress row, unset until first response, after commit/reset
self._row = None
# Last complete row, unset until first commit
self._previous_row = None
# In-progress cell, unset until first response, after completion
self._cell = None
# Last complete cell, unset until first completion, after new row
self._previous_cell = None
# May be cached from previous response
self.last_scanned_row_key = None
self.read_method = read_method
self.request = request
self.retry = retry
# The `timeout` parameter must be somewhat greater than the value
# contained in `self.retry`, in order to avoid race-like condition and
# allow registering the first deadline error before invoking the retry.
# Otherwise there is a risk of entering an infinite loop that resets
# the timeout counter just before it being triggered. The increment
# by 1 second here is customary but should not be much less than that.
self.response_iterator = read_method(request, timeout=self.retry._deadline + 1)
self.rows = {}
self._state = self.STATE_NEW_ROW
# Flag to stop iteration, for any reason not related to self.retry()
self._cancelled = False
@property
def state(self):
"""State machine state.
:rtype: str
:returns: name of state corresponding to current row / chunk
processing.
"""
return self.read_states[self._state]
[docs] def cancel(self):
"""Cancels the iterator, closing the stream."""
self._cancelled = True
self.response_iterator.cancel()
[docs] def consume_all(self, max_loops=None):
"""Consume the streamed responses until there are no more.
.. warning::
This method will be removed in future releases. Please use this
class as a generator instead.
:type max_loops: int
:param max_loops: (Optional) Maximum number of times to try to consume
an additional ``ReadRowsResponse``. You can use this
to avoid long wait times.
"""
for row in self:
self.rows[row.row_key] = row
def _create_retry_request(self):
"""Helper for :meth:`__iter__`."""
req_manager = _ReadRowsRequestManager(
self.request, self.last_scanned_row_key, self._counter
)
return req_manager.build_updated_request()
def _on_error(self, exc):
"""Helper for :meth:`__iter__`."""
# restart the read scan from AFTER the last successfully read row
retry_request = self.request
if self.last_scanned_row_key:
retry_request = self._create_retry_request()
self.response_iterator = self.read_method(retry_request)
def _read_next(self):
"""Helper for :meth:`__iter__`."""
return next(self.response_iterator)
def _read_next_response(self):
"""Helper for :meth:`__iter__`."""
return self.retry(self._read_next, on_error=self._on_error)()
[docs] def __iter__(self):
"""Consume the ``ReadRowsResponse`` s from the stream.
Read the rows and yield each to the reader
Parse the response and its chunks into a new/existing row in
:attr:`_rows`. Rows are returned in order by row key.
"""
while not self._cancelled:
try:
response = self._read_next_response()
except StopIteration:
if self.state != self.NEW_ROW:
raise ValueError("The row remains partial / is not committed.")
break
for chunk in response.chunks:
if self._cancelled:
break
self._process_chunk(chunk)
if chunk.commit_row:
self.last_scanned_row_key = self._previous_row.row_key
self._counter += 1
yield self._previous_row
resp_last_key = response.last_scanned_row_key
if resp_last_key and resp_last_key > self.last_scanned_row_key:
self.last_scanned_row_key = resp_last_key
def _process_chunk(self, chunk):
if chunk.reset_row:
self._validate_chunk_reset_row(chunk)
self._row = None
self._cell = self._previous_cell = None
self._state = self.STATE_NEW_ROW
return
self._update_cell(chunk)
if self._row is None:
if (
self._previous_row is not None
and self._cell.row_key <= self._previous_row.row_key
):
raise InvalidChunk()
self._row = PartialRowData(self._cell.row_key)
if chunk.value_size == 0:
self._state = self.STATE_ROW_IN_PROGRESS
self._save_current_cell()
else:
self._state = self.STATE_CELL_IN_PROGRESS
if chunk.commit_row:
if chunk.value_size > 0:
raise InvalidChunk()
self._previous_row = self._row
self._row = None
self._previous_cell = None
self._state = self.STATE_NEW_ROW
def _update_cell(self, chunk):
if self._cell is None:
qualifier = None
if "qualifier" in chunk:
qualifier = chunk.qualifier
family = None
if "family_name" in chunk:
family = chunk.family_name
self._cell = PartialCellData(
chunk.row_key,
family,
qualifier,
chunk.timestamp_micros,
chunk.labels,
chunk.value,
)
self._copy_from_previous(self._cell)
self._validate_cell_data_new_cell()
else:
self._cell.append_value(chunk.value)
def _validate_cell_data_new_cell(self):
cell = self._cell
if not cell.row_key or not cell.family_name or cell.qualifier is None:
raise InvalidChunk()
prev = self._previous_cell
if prev and prev.row_key != cell.row_key:
raise InvalidChunk()
def _validate_chunk_reset_row(self, chunk):
# No reset for new row
_raise_if(self._state == self.STATE_NEW_ROW)
# No reset with other keys
_raise_if(chunk.row_key)
_raise_if("family_name" in chunk)
_raise_if("qualifier" in chunk)
_raise_if(chunk.timestamp_micros)
_raise_if(chunk.labels)
_raise_if(chunk.value_size)
_raise_if(chunk.value)
_raise_if(chunk.commit_row)
def _save_current_cell(self):
"""Helper for :meth:`consume_next`."""
row, cell = self._row, self._cell
family = row._cells.setdefault(cell.family_name, {})
qualified = family.setdefault(cell.qualifier, [])
complete = Cell.from_pb(cell)
qualified.append(complete)
self._cell, self._previous_cell = None, cell
def _copy_from_previous(self, cell):
"""Helper for :meth:`consume_next`."""
previous = self._previous_cell
if previous is not None:
if not cell.row_key:
cell.row_key = previous.row_key
if not cell.family_name:
cell.family_name = previous.family_name
# NOTE: ``cell.qualifier`` **can** be empty string.
if cell.qualifier is None:
cell.qualifier = previous.qualifier
class _ReadRowsRequestManager(object):
"""Update the ReadRowsRequest message in case of failures by
filtering the already read keys.
:type message: class:`data_messages_v2_pb2.ReadRowsRequest`
:param message: Original ReadRowsRequest containing all of the parameters
of API call
:type last_scanned_key: bytes
:param last_scanned_key: last successfully scanned key
:type rows_read_so_far: int
:param rows_read_so_far: total no of rows successfully read so far.
this will be used for updating rows_limit
"""
def __init__(self, message, last_scanned_key, rows_read_so_far):
self.message = message
self.last_scanned_key = last_scanned_key
self.rows_read_so_far = rows_read_so_far
def build_updated_request(self):
"""Updates the given message request as per last scanned key"""
r_kwargs = {
"table_name": self.message.table_name,
"filter": self.message.filter,
}
if self.message.rows_limit != 0:
r_kwargs["rows_limit"] = max(
1, self.message.rows_limit - self.rows_read_so_far
)
# if neither RowSet.row_keys nor RowSet.row_ranges currently exist,
# add row_range that starts with last_scanned_key as start_key_open
# to request only rows that have not been returned yet
if "rows" not in self.message:
row_range = data_v2_pb2.RowRange(start_key_open=self.last_scanned_key)
r_kwargs["rows"] = data_v2_pb2.RowSet(row_ranges=[row_range])
else:
row_keys = self._filter_rows_keys()
row_ranges = self._filter_row_ranges()
r_kwargs["rows"] = data_v2_pb2.RowSet(
row_keys=row_keys, row_ranges=row_ranges
)
return data_messages_v2_pb2.ReadRowsRequest(**r_kwargs)
def _filter_rows_keys(self):
""" Helper for :meth:`build_updated_request`"""
return [
row_key
for row_key in self.message.rows.row_keys
if row_key > self.last_scanned_key
]
def _filter_row_ranges(self):
""" Helper for :meth:`build_updated_request`"""
new_row_ranges = []
for row_range in self.message.rows.row_ranges:
# if current end_key (open or closed) is set, return its value,
# if not, set to empty string ('').
# NOTE: Empty string in end_key means "end of table"
end_key = self._end_key_set(row_range)
# if end_key is already read, skip to the next row_range
if end_key and self._key_already_read(end_key):
continue
# if current start_key (open or closed) is set, return its value,
# if not, then set to empty string ('')
# NOTE: Empty string in start_key means "beginning of table"
start_key = self._start_key_set(row_range)
# if start_key was already read or doesn't exist,
# create a row_range with last_scanned_key as start_key_open
# to be passed to retry request
retry_row_range = row_range
if self._key_already_read(start_key):
retry_row_range = copy.deepcopy(row_range)
retry_row_range.start_key_closed = _to_bytes("")
retry_row_range.start_key_open = self.last_scanned_key
new_row_ranges.append(retry_row_range)
return new_row_ranges
def _key_already_read(self, key):
""" Helper for :meth:`_filter_row_ranges`"""
return key <= self.last_scanned_key
@staticmethod
def _start_key_set(row_range):
""" Helper for :meth:`_filter_row_ranges`"""
return row_range.start_key_open or row_range.start_key_closed
@staticmethod
def _end_key_set(row_range):
""" Helper for :meth:`_filter_row_ranges`"""
return row_range.end_key_open or row_range.end_key_closed
def _raise_if(predicate, *args):
"""Helper for validation methods."""
if predicate:
raise InvalidChunk(*args)