class OCI::Streaming::StreamClient
The API for the Streaming
Service.
Attributes
Client used to make HTTP requests. @return [OCI::ApiClient]
Fully qualified endpoint URL @return [String]
The default retry configuration to apply to all operations in this service client. This can be overridden on a per-operation basis. The default retry configuration value is `nil`, which means that an operation will not perform any retries @return [OCI::Retry::RetryConfig]
Public Class Methods
Creates a new StreamClient
. Notes:
If a config is not specified, then the global OCI.config will be used. This client is not thread-safe
@param [Config] config A Config
object. @param [String] endpoint The fully qualified endpoint URL @param [OCI::BaseSigner] signer A signer implementation which can be used by this client. If this is not provided then
a signer will be constructed via the provided config. One use case of this parameter is instance principals authentication, so that the instance principals signer can be provided to the client
@param [OCI::ApiClientProxySettings] proxy_settings If your environment requires you to use a proxy server for outgoing HTTP requests
the details for the proxy can be provided in this parameter
@param [OCI::Retry::RetryConfig] retry_config
The retry configuration for this service client. This represents the default retry configuration to
apply across all operations. This can be overridden on a per-operation basis. The default retry configuration value is `nil`, which means that an operation will not perform any retries
# File lib/oci/streaming/stream_client.rb, line 43 def initialize(config: nil, endpoint: nil, signer: nil, proxy_settings: nil, retry_config: nil) raise 'A fully qualified endpoint URL must be defined' unless endpoint @endpoint = endpoint + '/20180418' # If the signer is an InstancePrincipalsSecurityTokenSigner or SecurityTokenSigner and no config was supplied (they are self-sufficient signers) # then create a dummy config to pass to the ApiClient constructor. If customers wish to create a client which uses instance principals # and has config (either populated programmatically or loaded from a file), they must construct that config themselves and then # pass it to this constructor. # # If there is no signer (or the signer is not an instance principals signer) and no config was supplied, this is not valid # so try and load the config from the default file. config = OCI::Config.validate_and_build_config_with_signer(config, signer) signer = OCI::Signer.config_file_auth_builder(config) if signer.nil? @api_client = OCI::ApiClient.new(config, signer, proxy_settings: proxy_settings) @retry_config = retry_config logger.info "StreamClient endpoint set to '#{@endpoint}'." if logger end
Public Instance Methods
Provides a mechanism to manually commit offsets, if not using commit-on-get consumer semantics. This commits offsets assicated with the provided cursor, extends the timeout on each of the affected partitions, and returns an updated cursor.
@param [String] stream_id The OCID of the stream.
@param [String] cursor The group-cursor representing the offsets of the group. This cursor is retrieved from the CreateGroupCursor API call.
@param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level
retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry
@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a
particular request, please provide the request ID.
@return [Response] A Response
object with data of type {OCI::Streaming::Models::Cursor Cursor} @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/consumer_commit.rb.html) to see an example of how to use consumer_commit
API.
# File lib/oci/streaming/stream_client.rb, line 90 def consumer_commit(stream_id, cursor, opts = {}) logger.debug 'Calling operation StreamClient#consumer_commit.' if logger raise "Missing the required parameter 'stream_id' when calling consumer_commit." if stream_id.nil? raise "Missing the required parameter 'cursor' when calling consumer_commit." if cursor.nil? raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id) path = '/streams/{streamId}/commit'.sub('{streamId}', stream_id.to_s) operation_signing_strategy = :standard # rubocop:disable Style/NegatedIf # Query Params query_params = {} query_params[:cursor] = cursor # Header Params header_params = {} header_params[:accept] = 'application/json' header_params[:'content-type'] = 'application/json' header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id] # rubocop:enable Style/NegatedIf post_body = nil # rubocop:disable Metrics/BlockLength OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#consumer_commit') do @api_client.call_api( :POST, path, endpoint, header_params: header_params, query_params: query_params, operation_signing_strategy: operation_signing_strategy, body: post_body, return_type: 'OCI::Streaming::Models::Cursor' ) end # rubocop:enable Metrics/BlockLength end
Allows long-running processes to extend the timeout on partitions reserved by a consumer instance.
@param [String] stream_id The OCID of the stream.
@param [String] cursor The group-cursor representing the offsets of the group. This cursor is retrieved from the CreateGroupCursor API call.
@param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level
retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry
@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a
particular request, please provide the request ID.
@return [Response] A Response
object with data of type {OCI::Streaming::Models::Cursor Cursor} @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/consumer_heartbeat.rb.html) to see an example of how to use consumer_heartbeat
API.
# File lib/oci/streaming/stream_client.rb, line 152 def consumer_heartbeat(stream_id, cursor, opts = {}) logger.debug 'Calling operation StreamClient#consumer_heartbeat.' if logger raise "Missing the required parameter 'stream_id' when calling consumer_heartbeat." if stream_id.nil? raise "Missing the required parameter 'cursor' when calling consumer_heartbeat." if cursor.nil? raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id) path = '/streams/{streamId}/heartbeat'.sub('{streamId}', stream_id.to_s) operation_signing_strategy = :standard # rubocop:disable Style/NegatedIf # Query Params query_params = {} query_params[:cursor] = cursor # Header Params header_params = {} header_params[:accept] = 'application/json' header_params[:'content-type'] = 'application/json' header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id] # rubocop:enable Style/NegatedIf post_body = nil # rubocop:disable Metrics/BlockLength OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#consumer_heartbeat') do @api_client.call_api( :POST, path, endpoint, header_params: header_params, query_params: query_params, operation_signing_strategy: operation_signing_strategy, body: post_body, return_type: 'OCI::Streaming::Models::Cursor' ) end # rubocop:enable Metrics/BlockLength end
Creates a cursor. Cursors are used to consume a stream, starting from a specific point in the partition and going forward from there. You can create a cursor based on an offset, a time, the trim horizon, or the most recent message in the stream. As the oldest message inside the retention period boundary, using the trim horizon effectively lets you consume all messages in the stream. A cursor based on the most recent message allows consumption of only messages that are added to the stream after you create the cursor. Cursors expire five minutes after you receive them from the service.
@param [String] stream_id The OCID of the stream.
@param [OCI::Streaming::Models::CreateCursorDetails] create_cursor_details The information used to create the cursor. @param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level
retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry
@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a
particular request, please provide the request ID.
@return [Response] A Response
object with data of type {OCI::Streaming::Models::Cursor Cursor} @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/create_cursor.rb.html) to see an example of how to use create_cursor
API.
# File lib/oci/streaming/stream_client.rb, line 217 def create_cursor(stream_id, create_cursor_details, opts = {}) logger.debug 'Calling operation StreamClient#create_cursor.' if logger raise "Missing the required parameter 'stream_id' when calling create_cursor." if stream_id.nil? raise "Missing the required parameter 'create_cursor_details' when calling create_cursor." if create_cursor_details.nil? raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id) path = '/streams/{streamId}/cursors'.sub('{streamId}', stream_id.to_s) operation_signing_strategy = :standard # rubocop:disable Style/NegatedIf # Query Params query_params = {} # Header Params header_params = {} header_params[:accept] = 'application/json' header_params[:'content-type'] = 'application/json' header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id] # rubocop:enable Style/NegatedIf post_body = @api_client.object_to_http_body(create_cursor_details) # rubocop:disable Metrics/BlockLength OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#create_cursor') do @api_client.call_api( :POST, path, endpoint, header_params: header_params, query_params: query_params, operation_signing_strategy: operation_signing_strategy, body: post_body, return_type: 'OCI::Streaming::Models::Cursor' ) end # rubocop:enable Metrics/BlockLength end
Creates a group-cursor.
@param [String] stream_id The OCID of the stream.
@param [OCI::Streaming::Models::CreateGroupCursorDetails] create_group_cursor_details The information used to create the cursor. @param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level
retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry
@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a
particular request, please provide the request ID.
@return [Response] A Response
object with data of type {OCI::Streaming::Models::Cursor Cursor} @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/create_group_cursor.rb.html) to see an example of how to use create_group_cursor
API.
# File lib/oci/streaming/stream_client.rb, line 277 def create_group_cursor(stream_id, create_group_cursor_details, opts = {}) logger.debug 'Calling operation StreamClient#create_group_cursor.' if logger raise "Missing the required parameter 'stream_id' when calling create_group_cursor." if stream_id.nil? raise "Missing the required parameter 'create_group_cursor_details' when calling create_group_cursor." if create_group_cursor_details.nil? raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id) path = '/streams/{streamId}/groupCursors'.sub('{streamId}', stream_id.to_s) operation_signing_strategy = :standard # rubocop:disable Style/NegatedIf # Query Params query_params = {} # Header Params header_params = {} header_params[:accept] = 'application/json' header_params[:'content-type'] = 'application/json' header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id] # rubocop:enable Style/NegatedIf post_body = @api_client.object_to_http_body(create_group_cursor_details) # rubocop:disable Metrics/BlockLength OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#create_group_cursor') do @api_client.call_api( :POST, path, endpoint, header_params: header_params, query_params: query_params, operation_signing_strategy: operation_signing_strategy, body: post_body, return_type: 'OCI::Streaming::Models::Cursor' ) end # rubocop:enable Metrics/BlockLength end
Returns the current state of a consumer group.
@param [String] stream_id The OCID of the stream.
@param [String] group_name The name of the consumer group. @param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level
retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry
@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a
particular request, please provide the request ID.
@return [Response] A Response
object with data of type {OCI::Streaming::Models::Group Group} @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/get_group.rb.html) to see an example of how to use get_group
API.
# File lib/oci/streaming/stream_client.rb, line 337 def get_group(stream_id, group_name, opts = {}) logger.debug 'Calling operation StreamClient#get_group.' if logger raise "Missing the required parameter 'stream_id' when calling get_group." if stream_id.nil? raise "Missing the required parameter 'group_name' when calling get_group." if group_name.nil? raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id) raise "Parameter value for 'group_name' must not be blank" if OCI::Internal::Util.blank_string?(group_name) path = '/streams/{streamId}/groups/{groupName}'.sub('{streamId}', stream_id.to_s).sub('{groupName}', group_name.to_s) operation_signing_strategy = :standard # rubocop:disable Style/NegatedIf # Query Params query_params = {} # Header Params header_params = {} header_params[:accept] = 'application/json' header_params[:'content-type'] = 'application/json' header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id] # rubocop:enable Style/NegatedIf post_body = nil # rubocop:disable Metrics/BlockLength OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#get_group') do @api_client.call_api( :GET, path, endpoint, header_params: header_params, query_params: query_params, operation_signing_strategy: operation_signing_strategy, body: post_body, return_type: 'OCI::Streaming::Models::Group' ) end # rubocop:enable Metrics/BlockLength end
Returns messages from the specified stream using the specified cursor as the starting point for consumption. By default, the number of messages returned is undefined, but the service returns as many as possible. To get messages, you must first obtain a cursor using the {#create_cursor create_cursor
} operation. In the response, retrieve the value of the 'opc-next-cursor' header to pass as a parameter to get the next batch of messages in the stream.
@param [String] stream_id The OCID of the stream.
@param [String] cursor The cursor used to consume the stream.
@param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level
retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry
@option opts [Integer] :limit The maximum number of messages to return. You can specify any value up to 10000. By default, the service returns as many messages as possible.
Consider your average message size to help avoid exceeding throughput on the stream.
@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a
particular request, please provide the request ID.
@return [Response] A Response
object with data of type Array<{OCI::Streaming::Models::Message Message}> @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/get_messages.rb.html) to see an example of how to use get_messages
API.
# File lib/oci/streaming/stream_client.rb, line 404 def get_messages(stream_id, cursor, opts = {}) logger.debug 'Calling operation StreamClient#get_messages.' if logger raise "Missing the required parameter 'stream_id' when calling get_messages." if stream_id.nil? raise "Missing the required parameter 'cursor' when calling get_messages." if cursor.nil? raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id) path = '/streams/{streamId}/messages'.sub('{streamId}', stream_id.to_s) operation_signing_strategy = :standard # rubocop:disable Style/NegatedIf # Query Params query_params = {} query_params[:cursor] = cursor query_params[:limit] = opts[:limit] if opts[:limit] # Header Params header_params = {} header_params[:accept] = 'application/json' header_params[:'content-type'] = 'application/json' header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id] # rubocop:enable Style/NegatedIf post_body = nil # rubocop:disable Metrics/BlockLength OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#get_messages') do @api_client.call_api( :GET, path, endpoint, header_params: header_params, query_params: query_params, operation_signing_strategy: operation_signing_strategy, body: post_body, return_type: 'Array<OCI::Streaming::Models::Message>' ) end # rubocop:enable Metrics/BlockLength end
@return [Logger] The logger for this client. May be nil.
# File lib/oci/streaming/stream_client.rb, line 66 def logger @api_client.config.logger end
Emits messages to a stream. There's no limit to the number of messages in a request, but the total size of a message or request must be 1 MiB or less. The service calculates the partition ID from the message key and stores messages that share a key on the same partition. If a message does not contain a key or if the key is null, the service generates a message key for you. The partition ID cannot be passed as a parameter.
@param [String] stream_id The OCID of the stream.
@param [OCI::Streaming::Models::PutMessagesDetails] put_messages_details Array of messages to put into the stream. @param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level
retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry
@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a
particular request, please provide the request ID.
@return [Response] A Response
object with data of type {OCI::Streaming::Models::PutMessagesResult PutMessagesResult} @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/put_messages.rb.html) to see an example of how to use put_messages
API.
# File lib/oci/streaming/stream_client.rb, line 469 def put_messages(stream_id, put_messages_details, opts = {}) logger.debug 'Calling operation StreamClient#put_messages.' if logger raise "Missing the required parameter 'stream_id' when calling put_messages." if stream_id.nil? raise "Missing the required parameter 'put_messages_details' when calling put_messages." if put_messages_details.nil? raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id) path = '/streams/{streamId}/messages'.sub('{streamId}', stream_id.to_s) operation_signing_strategy = :exclude_body # rubocop:disable Style/NegatedIf # Query Params query_params = {} # Header Params header_params = {} header_params[:accept] = 'application/json' header_params[:'content-type'] = 'application/json' header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id] # rubocop:enable Style/NegatedIf post_body = @api_client.object_to_http_body(put_messages_details) # rubocop:disable Metrics/BlockLength OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#put_messages') do @api_client.call_api( :POST, path, endpoint, header_params: header_params, query_params: query_params, operation_signing_strategy: operation_signing_strategy, body: post_body, return_type: 'OCI::Streaming::Models::PutMessagesResult' ) end # rubocop:enable Metrics/BlockLength end
Forcefully changes the current location of a group as a whole; reseting processing location of all consumers to a particular location in the stream.
@param [String] stream_id The OCID of the stream.
@param [String] group_name The name of the consumer group. @param [OCI::Streaming::Models::UpdateGroupDetails] update_group_details The information used to modify the group. @param [Hash] opts the optional parameters @option opts [OCI::Retry::RetryConfig] :retry_config The retry configuration to apply to this operation. If no key is provided then the service-level
retry configuration defined by {#retry_config} will be used. If an explicit `nil` value is provided then the operation will not retry
@option opts [String] :opc_request_id The unique Oracle-assigned identifier for the request. If you need to contact Oracle about a
particular request, please provide the request ID.
@return [Response] A Response
object with data of type nil @note Click [here](docs.cloud.oracle.com/en-us/iaas/tools/ruby-sdk-examples/latest/streaming/update_group.rb.html) to see an example of how to use update_group
API.
# File lib/oci/streaming/stream_client.rb, line 530 def update_group(stream_id, group_name, update_group_details, opts = {}) logger.debug 'Calling operation StreamClient#update_group.' if logger raise "Missing the required parameter 'stream_id' when calling update_group." if stream_id.nil? raise "Missing the required parameter 'group_name' when calling update_group." if group_name.nil? raise "Missing the required parameter 'update_group_details' when calling update_group." if update_group_details.nil? raise "Parameter value for 'stream_id' must not be blank" if OCI::Internal::Util.blank_string?(stream_id) raise "Parameter value for 'group_name' must not be blank" if OCI::Internal::Util.blank_string?(group_name) path = '/streams/{streamId}/groups/{groupName}'.sub('{streamId}', stream_id.to_s).sub('{groupName}', group_name.to_s) operation_signing_strategy = :standard # rubocop:disable Style/NegatedIf # Query Params query_params = {} # Header Params header_params = {} header_params[:accept] = 'application/json' header_params[:'content-type'] = 'application/json' header_params[:'opc-request-id'] = opts[:opc_request_id] if opts[:opc_request_id] # rubocop:enable Style/NegatedIf post_body = @api_client.object_to_http_body(update_group_details) # rubocop:disable Metrics/BlockLength OCI::Retry.make_retrying_call(applicable_retry_config(opts), call_name: 'StreamClient#update_group') do @api_client.call_api( :PUT, path, endpoint, header_params: header_params, query_params: query_params, operation_signing_strategy: operation_signing_strategy, body: post_body ) end # rubocop:enable Metrics/BlockLength end
Private Instance Methods
rubocop:enable Metrics/CyclomaticComplexity, Metrics/AbcSize, Metrics/PerceivedComplexity rubocop:enable Style/IfUnlessModifier, Metrics/ParameterLists rubocop:enable Metrics/MethodLength, Layout/EmptyLines
# File lib/oci/streaming/stream_client.rb, line 575 def applicable_retry_config(opts = {}) return @retry_config unless opts.key?(:retry_config) opts[:retry_config] end