class OCI::Streaming::StreamClient

The API for the Streaming Service.

Attributes

api_client[R]

Client used to make HTTP requests. @return [OCI::ApiClient]

endpoint[R]

Fully qualified endpoint URL @return [String]

retry_config[R]

The default retry configuration to apply to all operations in this service client. This can be overridden on a per-operation basis. The default retry configuration value is `nil`, which means that an operation will not perform any retries @return [OCI::Retry::RetryConfig]

Public Class Methods

new(config: nil, endpoint: nil, signer: nil, proxy_settings: nil, retry_config: nil) click to toggle source

Creates a new StreamClient. Notes:

If a config is not specified, then the global OCI.config will be used.

This client is not thread-safe

@param [Config] config A Config object. @param [String] endpoint The fully qualified endpoint URL @param [OCI::BaseSigner] signer A signer implementation which can be used by this client. If this is not provided then

a signer will be constructed via the provided config. One use case of this parameter is instance principals authentication,
so that the instance principals signer can be provided to the client

@param [OCI::ApiClientProxySettings] proxy_settings If your environment requires you to use a proxy server for outgoing HTTP requests

the details for the proxy can be provided in this parameter

@param [OCI::Retry::RetryConfig] retry_config The retry configuration for this service client. This represents the default retry configuration to

apply across all operations. This can be overridden on a per-operation basis. The default retry configuration value is `nil`, which means that an operation
will not perform any retries
# File lib/oci/streaming/stream_client.rb, line 43
def initialize(config: nil, endpoint: nil, signer: nil, proxy_settings: nil, retry_config: nil)
  raise 'A fully qualified endpoint URL must be defined' unless endpoint

  @endpoint = endpoint + '/20180418'

  # If the signer is an InstancePrincipalsSecurityTokenSigner or SecurityTokenSigner and no config was supplied (they are self-sufficient signers)
  # then create a dummy config to pass to the ApiClient constructor. If customers wish to create a client which uses instance principals
  # and has config (either populated programmatically or loaded from a file), they must construct that config themselves and then
  # pass it to this constructor.
  #
  # If there is no signer (or the signer is not an instance principals signer) and no config was supplied, this is not valid
  # so try and load the config from the default file.
  config = OCI::Config.validate_and_build_config_with_signer(config, signer)

  signer = OCI::Signer.config_file_auth_builder(config) if signer.nil?

  @api_client = OCI::ApiClient.new(config, signer, proxy_settings: proxy_settings)
  @retry_config = retry_config
  logger.info "StreamClient endpoint set to '#{@endpoint}'." if logger
end

Public Instance Methods

consumer_commit(stream_id, cursor, opts = {}) click to toggle source

Provides a mechanism to manually commit offsets, if not using commit-on-get consumer semantics. This commits offsets assicated with the provided cursor, extends the timeout on each of the affected partitions, and returns an updated cursor.

@param [String] stream_id The OCID of the stream.

@param [String] cursor The group-cursor representing the offsets of the group. This cursor is retrieved from the CreateGroupCursor API call.

@param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level

retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry

@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a

particular request, please provide the request ID.

@return [Response] A Response object with data of type {OCI::Streaming::Models::Cursor Cursor} @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/consumer_commit.rb.html) to see an example of how to use consumer_commit API.

# File lib/oci/streaming/stream_client.rb, line 90
def consumer_commit(stream_id, cursor, opts = {})
  logger.debug 'Calling operation StreamClient#consumer_commit.' if logger

  raise "Missing the required parameter 'stream_id' when calling consumer_commit." if stream_id.nil?
  raise "Missing the required parameter 'cursor' when calling consumer_commit." if cursor.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)

  path = '/streams/{streamId}/commit'.sub('{streamId}', stream_id.to_s)
  operation_signing_strategy = :standard

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}
  query_params[:cursor] = cursor

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = nil

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#consumer_commit') do
    @api_client.call_api(
      :POST,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body,
      return_type: 'OCI::Streaming::Models::Cursor'
    )
  end
  # rubocop:enable Metrics/BlockLength
end
consumer_heartbeat(stream_id, cursor, opts = {}) click to toggle source

Allows long-running processes to extend the timeout on partitions reserved by a consumer instance.

@param [String] stream_id The OCID of the stream.

@param [String] cursor The group-cursor representing the offsets of the group. This cursor is retrieved from the CreateGroupCursor API call.

@param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level

retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry

@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a

particular request, please provide the request ID.

@return [Response] A Response object with data of type {OCI::Streaming::Models::Cursor Cursor} @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/consumer_heartbeat.rb.html) to see an example of how to use consumer_heartbeat API.

# File lib/oci/streaming/stream_client.rb, line 152
def consumer_heartbeat(stream_id, cursor, opts = {})
  logger.debug 'Calling operation StreamClient#consumer_heartbeat.' if logger

  raise "Missing the required parameter 'stream_id' when calling consumer_heartbeat." if stream_id.nil?
  raise "Missing the required parameter 'cursor' when calling consumer_heartbeat." if cursor.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)

  path = '/streams/{streamId}/heartbeat'.sub('{streamId}', stream_id.to_s)
  operation_signing_strategy = :standard

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}
  query_params[:cursor] = cursor

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = nil

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#consumer_heartbeat') do
    @api_client.call_api(
      :POST,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body,
      return_type: 'OCI::Streaming::Models::Cursor'
    )
  end
  # rubocop:enable Metrics/BlockLength
end
create_cursor(stream_id, create_cursor_details, opts = {}) click to toggle source

Creates a cursor. Cursors are used to consume a stream, starting from a specific point in the partition and going forward from there. You can create a cursor based on an offset, a time, the trim horizon, or the most recent message in the stream. As the oldest message inside the retention period boundary, using the trim horizon effectively lets you consume all messages in the stream. A cursor based on the most recent message allows consumption of only messages that are added to the stream after you create the cursor. Cursors expire five minutes after you receive them from the service.

@param [String] stream_id The OCID of the stream.

@param [OCI::Streaming::Models::CreateCursorDetails] create_cursor_details The information used to create the cursor. @param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level

retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry

@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a

particular request, please provide the request ID.

@return [Response] A Response object with data of type {OCI::Streaming::Models::Cursor Cursor} @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/create_cursor.rb.html) to see an example of how to use create_cursor API.

# File lib/oci/streaming/stream_client.rb, line 217
def create_cursor(stream_id, create_cursor_details, opts = {})
  logger.debug 'Calling operation StreamClient#create_cursor.' if logger

  raise "Missing the required parameter 'stream_id' when calling create_cursor." if stream_id.nil?
  raise "Missing the required parameter 'create_cursor_details' when calling create_cursor." if create_cursor_details.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)

  path = '/streams/{streamId}/cursors'.sub('{streamId}', stream_id.to_s)
  operation_signing_strategy = :standard

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = @api_client.object_to_http_body(create_cursor_details)

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#create_cursor') do
    @api_client.call_api(
      :POST,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body,
      return_type: 'OCI::Streaming::Models::Cursor'
    )
  end
  # rubocop:enable Metrics/BlockLength
end
create_group_cursor(stream_id, create_group_cursor_details, opts = {}) click to toggle source

Creates a group-cursor.

@param [String] stream_id The OCID of the stream.

@param [OCI::Streaming::Models::CreateGroupCursorDetails] create_group_cursor_details The information used to create the cursor. @param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level

retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry

@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a

particular request, please provide the request ID.

@return [Response] A Response object with data of type {OCI::Streaming::Models::Cursor Cursor} @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/create_group_cursor.rb.html) to see an example of how to use create_group_cursor API.

# File lib/oci/streaming/stream_client.rb, line 277
def create_group_cursor(stream_id, create_group_cursor_details, opts = {})
  logger.debug 'Calling operation StreamClient#create_group_cursor.' if logger

  raise "Missing the required parameter 'stream_id' when calling create_group_cursor." if stream_id.nil?
  raise "Missing the required parameter 'create_group_cursor_details' when calling create_group_cursor." if create_group_cursor_details.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)

  path = '/streams/{streamId}/groupCursors'.sub('{streamId}', stream_id.to_s)
  operation_signing_strategy = :standard

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = @api_client.object_to_http_body(create_group_cursor_details)

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#create_group_cursor') do
    @api_client.call_api(
      :POST,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body,
      return_type: 'OCI::Streaming::Models::Cursor'
    )
  end
  # rubocop:enable Metrics/BlockLength
end
get_group(stream_id, group_name, opts = {}) click to toggle source

Returns the current state of a consumer group.

@param [String] stream_id The OCID of the stream.

@param [String] group_name The name of the consumer group. @param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level

retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry

@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a

particular request, please provide the request ID.

@return [Response] A Response object with data of type {OCI::Streaming::Models::Group Group} @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/get_group.rb.html) to see an example of how to use get_group API.

# File lib/oci/streaming/stream_client.rb, line 337
def get_group(stream_id, group_name, opts = {})
  logger.debug 'Calling operation StreamClient#get_group.' if logger

  raise "Missing the required parameter 'stream_id' when calling get_group." if stream_id.nil?
  raise "Missing the required parameter 'group_name' when calling get_group." if group_name.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)
  raise "Parameter value for 'group_name' must not be blank" if OCI::Internal::Util.blank_string?(group_name)

  path = '/streams/{streamId}/groups/{groupName}'.sub('{streamId}', stream_id.to_s).sub('{groupName}', group_name.to_s)
  operation_signing_strategy = :standard

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = nil

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#get_group') do
    @api_client.call_api(
      :GET,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body,
      return_type: 'OCI::Streaming::Models::Group'
    )
  end
  # rubocop:enable Metrics/BlockLength
end
get_messages(stream_id, cursor, opts = {}) click to toggle source

Returns messages from the specified stream using the specified cursor as the starting point for consumption. By default, the number of messages returned is undefined, but the service returns as many as possible. To get messages, you must first obtain a cursor using the {#create_cursor create_cursor} operation. In the response, retrieve the value of the 'opc-next-cursor' header to pass as a parameter to get the next batch of messages in the stream.

@param [String] stream_id The OCID of the stream.

@param [String] cursor The cursor used to consume the stream.

@param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level

retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry

@option opts [Integer] :limit The maximum number of messages to return. You can specify any value up to 10000. By default, the service returns as many messages as possible.

Consider your average message size to help avoid exceeding throughput on the stream.

@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a

particular request, please provide the request ID.

@return [Response] A Response object with data of type Array<{OCI::Streaming::Models::Message Message}> @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/get_messages.rb.html) to see an example of how to use get_messages API.

# File lib/oci/streaming/stream_client.rb, line 404
def get_messages(stream_id, cursor, opts = {})
  logger.debug 'Calling operation StreamClient#get_messages.' if logger

  raise "Missing the required parameter 'stream_id' when calling get_messages." if stream_id.nil?
  raise "Missing the required parameter 'cursor' when calling get_messages." if cursor.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)

  path = '/streams/{streamId}/messages'.sub('{streamId}', stream_id.to_s)
  operation_signing_strategy = :standard

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}
  query_params[:cursor] = cursor
  query_params[:limit] = opts[:limit] if opts[:limit]

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = nil

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#get_messages') do
    @api_client.call_api(
      :GET,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body,
      return_type: 'Array<OCI::Streaming::Models::Message>'
    )
  end
  # rubocop:enable Metrics/BlockLength
end
logger() click to toggle source

@return [Logger] The logger for this client. May be nil.

# File lib/oci/streaming/stream_client.rb, line 66
def logger
  @api_client.config.logger
end
put_messages(stream_id, put_messages_details, opts = {}) click to toggle source

Emits messages to a stream. There's no limit to the number of messages in a request, but the total size of a message or request must be 1 MiB or less. The service calculates the partition ID from the message key and stores messages that share a key on the same partition. If a message does not contain a key or if the key is null, the service generates a message key for you. The partition ID cannot be passed as a parameter.

@param [String] stream_id The OCID of the stream.

@param [OCI::Streaming::Models::PutMessagesDetails] put_messages_details Array of messages to put into the stream. @param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level

retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry

@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a

particular request, please provide the request ID.

@return [Response] A Response object with data of type {OCI::Streaming::Models::PutMessagesResult PutMessagesResult} @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/put_messages.rb.html) to see an example of how to use put_messages API.

# File lib/oci/streaming/stream_client.rb, line 469
def put_messages(stream_id, put_messages_details, opts = {})
  logger.debug 'Calling operation StreamClient#put_messages.' if logger

  raise "Missing the required parameter 'stream_id' when calling put_messages." if stream_id.nil?
  raise "Missing the required parameter 'put_messages_details' when calling put_messages." if put_messages_details.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)

  path = '/streams/{streamId}/messages'.sub('{streamId}', stream_id.to_s)
  operation_signing_strategy = :exclude_body

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = @api_client.object_to_http_body(put_messages_details)

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#put_messages') do
    @api_client.call_api(
      :POST,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body,
      return_type: 'OCI::Streaming::Models::PutMessagesResult'
    )
  end
  # rubocop:enable Metrics/BlockLength
end
update_group(stream_id, group_name, update_group_details, opts = {}) click to toggle source

Forcefully changes the current location of a group as a whole; reseting processing location of all consumers to a particular location in the stream.

@param [String] stream_id The OCID of the stream.

@param [String] group_name The name of the consumer group. @param [OCI::Streaming::Models::UpdateGroupDetails] update_group_details The information used to modify the group. @param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level

retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry

@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a

particular request, please provide the request ID.

@return [Response] A Response object with data of type nil @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/update_group.rb.html) to see an example of how to use update_group API.

# File lib/oci/streaming/stream_client.rb, line 530
def update_group(stream_id, group_name, update_group_details, opts = {})
  logger.debug 'Calling operation StreamClient#update_group.' if logger

  raise "Missing the required parameter 'stream_id' when calling update_group." if stream_id.nil?
  raise "Missing the required parameter 'group_name' when calling update_group." if group_name.nil?
  raise "Missing the required parameter 'update_group_details' when calling update_group." if update_group_details.nil?
  raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id)
  raise "Parameter value for 'group_name' must not be blank" if OCI::Internal::Util.blank_string?(group_name)

  path = '/streams/{streamId}/groups/{groupName}'.sub('{streamId}', stream_id.to_s).sub('{groupName}', group_name.to_s)
  operation_signing_strategy = :standard

  # rubocop:disable Style/NegatedIf
  # Query Params
  query_params = {}

  # Header Params
  header_params = {}
  header_params[:accept] = 'application/json'
  header_params[:'content-type'] = 'application/json'
  header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id]
  # rubocop:enable Style/NegatedIf

  post_body = @api_client.object_to_http_body(update_group_details)

  # rubocop:disable Metrics/BlockLength
  OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#update_group') do
    @api_client.call_api(
      :PUT,
      path,
      endpoint,
      header_params: header_params,
      query_params: query_params,
      operation_signing_strategy: operation_signing_strategy,
      body: post_body
    )
  end
  # rubocop:enable Metrics/BlockLength
end

Private Instance Methods

applicable_retry_config(opts = {}) click to toggle source

rubocop:enable Metrics/CyclomaticComplexity, Metrics/AbcSize, Metrics/PerceivedComplexity rubocop:enable Style/IfUnlessModifier, Metrics/ParameterLists rubocop:enable Metrics/MethodLength, Layout/EmptyLines

# File lib/oci/streaming/stream_client.rb, line 575
def applicable_retry_config(opts = {})
  return @retry_config unless opts.key?(:retry_config)

  opts[:retry_config]
end