class SchemaRegistry::Client
Attributes
endpoint[R]
http_options[R]
password[R]
username[R]
Public Class Methods
connection_options(**config)
click to toggle source
Build options hash for net/http based on params provided. Primary for selectivly adding TLS config options for MTLS
# File lib/schema_registry/client.rb, line 61 def self.connection_options(**config) options = {} unless config[:verify_mode].nil? options[:verify_mode] = OpenSSL::SSL.const_get(config[:verify_mode].upcase) end unless config[:ca_certificate].nil? if File.exist?(config[:ca_certificate]) options[:ca_file] = config[:ca_certificate] else raise ArgumentError, "ca file not found [#{config[:ca_certificate]}]" end end unless config[:client_key].nil? if File.exist?(config[:client_key]) options[:key] = OpenSSL::PKey::RSA.new(File.read(config[:client_key])) else raise ArgumentError, "client key file not found [#{config[:client_key]}]" end end unless config[:client_certificate].nil? if File.exist?(config[:client_certificate]) options[:cert] = OpenSSL::X509::Certificate.new(File.read(config[:client_certificate])) else raise ArgumentError, "client cert file not found [#{config[:client_certificate]}]" end end options end
new(endpoint, username = nil, password = nil, **http_options)
click to toggle source
# File lib/schema_registry/client.rb, line 33 def initialize(endpoint, username = nil, password = nil, **http_options) @endpoint = URI(endpoint) @username, @password = username, password @http_options = http_options end
Public Instance Methods
default_compatibility_level()
click to toggle source
# File lib/schema_registry/client.rb, line 52 def default_compatibility_level request(:get, "/config")["compatibilityLevel"] end
default_compatibility_level=(level)
click to toggle source
# File lib/schema_registry/client.rb, line 56 def default_compatibility_level=(level) request(:put, "/config", compatibility: level) end
request(method, path, body = nil)
click to toggle source
# File lib/schema_registry/client.rb, line 94 def request(method, path, body = nil) # build config for http client default_options = { use_ssl: endpoint.scheme == 'https' }.merge!(@http_options) Net::HTTP.start(endpoint.host, endpoint.port, default_options) do |http| request_class = case method when :get; Net::HTTP::Get when :post; Net::HTTP::Post when :put; Net::HTTP::Put when :delete; Net::HTTP::Delete else raise ArgumentError, "Unsupported request method" end request = request_class.new(@endpoint.path + path) request.basic_auth(username, password) if username && password request['Accept'] = "application/vnd.schemaregistry.v1+json" if body request['Content-Type'] = "application/json" request.body = JSON.dump(body) end case response = http.request(request) when Net::HTTPSuccess begin JSON.parse(response.body) rescue JSON::ParserError => e raise SchemaRegistry::InvalidResponse, "Invalid JSON in response: #{e.message}" end when Net::HTTPInternalServerError raise SchemaRegistry::ServerError, "Schema registy responded with a server error: #{response.code.to_i}" when Net::HTTPForbidden message = username.nil? ? "Unauthorized" : "User `#{username}` failed to authenticate" raise SchemaRegistry::UnauthorizedRequest.new(response.code.to_i, message) else response_data = begin JSON.parse(response.body) rescue JSON::ParserError => e raise SchemaRegistry::InvalidResponse, "Invalid JSON in response: #{e.message}" end error_class = RESPONSE_ERROR_CODES[response_data['error_code']] || SchemaRegistry::ResponseError raise error_class.new(response_data['error_code'], response_data['message']) end end end
schema(id)
click to toggle source
# File lib/schema_registry/client.rb, line 39 def schema(id) request(:get, "/schemas/ids/#{id}")['schema'] end
subject(name)
click to toggle source
# File lib/schema_registry/client.rb, line 48 def subject(name) SchemaRegistry::Subject.new(self, name) end
subjects()
click to toggle source
# File lib/schema_registry/client.rb, line 43 def subjects data = request(:get, "/subjects") data.map { |subject| SchemaRegistry::Subject.new(self, subject) } end