class SchemaRegistry

Constants

CONTENT_TYPE

Public Class Methods

new( url, schema_context: nil, logger: Logger.new($stdout), proxy: nil, user: nil, password: nil, ssl_ca_file: nil, client_cert: nil, client_key: nil, client_key_pass: nil, client_cert_data: nil, client_key_data: nil, path_prefix: nil, connect_timeout: nil, resolv_resolver: nil ) click to toggle source
# File lib/avro_turf/confluent_schema_registry.rb, line 6
def initialize(
  url,
  schema_context: nil,
  logger: Logger.new($stdout),
  proxy: nil,
  user: nil,
  password: nil,
  ssl_ca_file: nil,
  client_cert: nil,
  client_key: nil,
  client_key_pass: nil,
  client_cert_data: nil,
  client_key_data: nil,
  path_prefix: nil,
  connect_timeout: nil,
  resolv_resolver: nil
)
  @path_prefix = path_prefix
  @schema_context_prefix = schema_context.nil? ? '' : ":.#{schema_context}:"
  @schema_context_options = schema_context.nil? ? {} : {query: {subject: @schema_context_prefix}}
  @logger = logger
  headers = Excon.defaults[:headers].merge(
    "Content-Type" => CONTENT_TYPE
  )
  headers[:proxy] = proxy unless proxy.nil?
  @connection = Excon.new(
    url,
    headers: headers,
    user: user,
    password: password,
    ssl_ca_file: ssl_ca_file,
    client_cert: client_cert,
    client_key: client_key,
    client_key_pass: client_key_pass,
    client_cert_data: client_cert_data,
    client_key_data: client_key_data,
    connect_timeout: connect_timeout,
    resolv_resolver: resolv_resolver
  )
end

Public Instance Methods

check(subject, schema) click to toggle source

Check if a schema exists. Returns nil if not found.

# File lib/avro_turf/confluent_schema_registry.rb, line 84
def check(subject, schema)
  data = post("/subjects/#{@schema_context_prefix}#{subject}",
              expects: [200, 404],
              body: { schema: schema.to_s }.to_json,
              idempotent: true)
  data unless data.has_key?("error_code")
end
compatible?(subject, schema, version = 'latest') click to toggle source

Check if a schema is compatible with the stored version. Returns:

  • true if compatible

  • nil if the subject or version does not exist

  • false if incompatible

docs.confluent.io/3.1.2/schema-registry/docs/api.html#compatibility

# File lib/avro_turf/confluent_schema_registry.rb, line 98
def compatible?(subject, schema, version = 'latest')
  data = post("/compatibility/subjects/#{@schema_context_prefix}#{subject}/versions/#{version}",
              expects: [200, 404], body: { schema: schema.to_s }.to_json, idempotent: true)
  data.fetch('is_compatible', false) unless data.has_key?('error_code')
end
fetch(id) click to toggle source
# File lib/avro_turf/confluent_schema_registry.rb, line 47
def fetch(id)
  @logger.info "Fetching schema with id #{id}"
  data = get("/schemas/ids/#{id}", idempotent: true, **@schema_context_options, )
  data.fetch("schema")
end
global_config() click to toggle source

Get global config

# File lib/avro_turf/confluent_schema_registry.rb, line 105
def global_config
  get("/config", idempotent: true)
end
register(subject, schema) click to toggle source
# File lib/avro_turf/confluent_schema_registry.rb, line 53
def register(subject, schema)
  data = post("/subjects/#{@schema_context_prefix}#{subject}/versions", body: { schema: schema.to_s }.to_json)

  id = data.fetch("id")

  @logger.info "Registered schema for subject `#{@schema_context_prefix}#{subject}`; id = #{id}"

  id
end
schema_subject_versions(schema_id) click to toggle source

Get the subject and version for a schema id

# File lib/avro_turf/confluent_schema_registry.rb, line 79
def schema_subject_versions(schema_id)
  get("/schemas/ids/#{schema_id}/versions", idempotent: true, **@schema_context_options)
end
subject_config(subject) click to toggle source

Get config for subject

# File lib/avro_turf/confluent_schema_registry.rb, line 115
def subject_config(subject)
  get("/config/#{@schema_context_prefix}#{subject}", idempotent: true)
end
subject_version(subject, version = 'latest') click to toggle source

Get a specific version for a subject

# File lib/avro_turf/confluent_schema_registry.rb, line 74
def subject_version(subject, version = 'latest')
  get("/subjects/#{@schema_context_prefix}#{subject}/versions/#{version}", idempotent: true)
end
subject_versions(subject) click to toggle source

List all versions for a subject

# File lib/avro_turf/confluent_schema_registry.rb, line 69
def subject_versions(subject)
  get("/subjects/#{@schema_context_prefix}#{subject}/versions", idempotent: true)
end
subjects() click to toggle source

List all subjects

# File lib/avro_turf/confluent_schema_registry.rb, line 64
def subjects
  get('/subjects', idempotent: true)
end
update_global_config(config) click to toggle source

Update global config

# File lib/avro_turf/confluent_schema_registry.rb, line 110
def update_global_config(config)
  put("/config", body: config.to_json, idempotent: true)
end
update_subject_config(subject, config) click to toggle source

Update config for subject

# File lib/avro_turf/confluent_schema_registry.rb, line 120
def update_subject_config(subject, config)
  put("/config/#{@schema_context_prefix}#{subject}", body: config.to_json, idempotent: true)
end

Private Instance Methods

get(path, **options) click to toggle source
# File lib/avro_turf/confluent_schema_registry.rb, line 126
def get(path, **options)
  request(path, method: :get, **options)
end
post(path, **options) click to toggle source
# File lib/avro_turf/confluent_schema_registry.rb, line 134
def post(path, **options)
  request(path, method: :post, **options)
end
put(path, **options) click to toggle source
# File lib/avro_turf/confluent_schema_registry.rb, line 130
def put(path, **options)
  request(path, method: :put, **options)
end
request(path, **options) click to toggle source
# File lib/avro_turf/confluent_schema_registry.rb, line 138
def request(path, **options)
  options = { expects: 200 }.merge!(options)
  path = File.join(@path_prefix, path) unless @path_prefix.nil?
  response = @connection.request(path: path, **options)
  JSON.parse(response.body)
end