class SchemaRegistry::Client

Attributes

endpoint[R]
http_options[R]
password[R]
username[R]

Public Class Methods

connection_options(**config) click to toggle source

Build options hash for net/http based on params provided. Primary for selectivly adding TLS config options for MTLS

# File lib/schema_registry/client.rb, line 61
def self.connection_options(**config)
  options = {}

  unless config[:verify_mode].nil?
    options[:verify_mode] = OpenSSL::SSL.const_get(config[:verify_mode].upcase)
  end

  unless config[:ca_certificate].nil?
    if File.exist?(config[:ca_certificate])
      options[:ca_file] = config[:ca_certificate]
    else
      raise ArgumentError, "ca file not found [#{config[:ca_certificate]}]"
    end
  end

  unless config[:client_key].nil?
    if File.exist?(config[:client_key])
      options[:key] = OpenSSL::PKey::RSA.new(File.read(config[:client_key]))
    else
      raise ArgumentError, "client key file not found [#{config[:client_key]}]"
    end
  end

  unless config[:client_certificate].nil?
    if File.exist?(config[:client_certificate])
      options[:cert] = OpenSSL::X509::Certificate.new(File.read(config[:client_certificate]))
    else
      raise ArgumentError, "client cert file not found [#{config[:client_certificate]}]"
    end
  end
  options
end
new(endpoint, username = nil, password = nil, **http_options) click to toggle source
# File lib/schema_registry/client.rb, line 33
def initialize(endpoint, username = nil, password = nil, **http_options)
  @endpoint = URI(endpoint)
  @username, @password = username, password
  @http_options = http_options
end

Public Instance Methods

default_compatibility_level() click to toggle source
# File lib/schema_registry/client.rb, line 52
def default_compatibility_level
  request(:get, "/config")["compatibilityLevel"]
end
default_compatibility_level=(level) click to toggle source
# File lib/schema_registry/client.rb, line 56
def default_compatibility_level=(level)
  request(:put, "/config", compatibility: level)
end
request(method, path, body = nil) click to toggle source
# File lib/schema_registry/client.rb, line 94
def request(method, path, body = nil)

  # build config for http client
  default_options = {
    use_ssl: endpoint.scheme == 'https'
  }.merge!(@http_options)

  Net::HTTP.start(endpoint.host, endpoint.port, default_options) do |http|
    request_class = case method
      when :get;    Net::HTTP::Get
      when :post;   Net::HTTP::Post
      when :put;    Net::HTTP::Put
      when :delete; Net::HTTP::Delete
      else raise ArgumentError, "Unsupported request method"
    end

    request = request_class.new(@endpoint.path + path)
    request.basic_auth(username, password) if username && password
    request['Accept'] = "application/vnd.schemaregistry.v1+json"
    if body
      request['Content-Type'] = "application/json"
      request.body = JSON.dump(body)
    end

    case response = http.request(request)
    when Net::HTTPSuccess
      begin
        JSON.parse(response.body)
      rescue JSON::ParserError => e
        raise SchemaRegistry::InvalidResponse, "Invalid JSON in response: #{e.message}"
      end

    when Net::HTTPInternalServerError
      raise SchemaRegistry::ServerError, "Schema registy responded with a server error: #{response.code.to_i}"

    when Net::HTTPForbidden
      message = username.nil? ? "Unauthorized" : "User `#{username}` failed to authenticate"
      raise SchemaRegistry::UnauthorizedRequest.new(response.code.to_i, message)

    else
      response_data = begin
        JSON.parse(response.body)
      rescue JSON::ParserError => e
        raise SchemaRegistry::InvalidResponse, "Invalid JSON in response: #{e.message}"
      end

      error_class = RESPONSE_ERROR_CODES[response_data['error_code']] || SchemaRegistry::ResponseError
      raise error_class.new(response_data['error_code'], response_data['message'])
    end
  end
end
schema(id) click to toggle source
# File lib/schema_registry/client.rb, line 39
def schema(id)
  request(:get, "/schemas/ids/#{id}")['schema']
end
subject(name) click to toggle source
# File lib/schema_registry/client.rb, line 48
def subject(name)
  SchemaRegistry::Subject.new(self, name)
end
subjects() click to toggle source
# File lib/schema_registry/client.rb, line 43
def subjects
  data = request(:get, "/subjects")
  data.map { |subject| SchemaRegistry::Subject.new(self, subject) }
end