class Fluent::Plugin::ConfluentAvroSchemaRegistry

Public Class Methods

new(registry_url, api_key=nil, api_secret=nil) click to toggle source
# File lib/fluent/plugin/confluent_avro_schema_registry.rb, line 24
def initialize(registry_url, api_key=nil, api_secret=nil)
  @registry_url = registry_url
  @api_key = api_key
  @api_secret = api_secret
end

Public Instance Methods

get_response(uri) click to toggle source
# File lib/fluent/plugin/confluent_avro_schema_registry.rb, line 52
def get_response(uri)
  response = Net::HTTP.start(uri.host, uri.port, :use_ssl => (uri.scheme == "https")) do |http|
    request = Net::HTTP::Get.new(uri.path)
    if @api_key and @api_secret
      request.basic_auth(@api_key, @api_secret)
    end
    http.request(request)
  end
  if @api_key and @api_secret
    if response.is_a?(Net::HTTPUnauthorized)
      raise ConfluentAvroSchemaRegistryUnauthorizedError
    end
  end
  response
end
schema_with_id(schema_id, schema_key) click to toggle source
# File lib/fluent/plugin/confluent_avro_schema_registry.rb, line 41
def schema_with_id(schema_id, schema_key)
  registry_uri = URI.parse(@registry_url)
  registry_uri_with_ids = URI.join(registry_uri, "/schemas/ids/#{schema_id}")
  response = get_response(registry_uri_with_ids)
  if schema_key.nil?
    response.body
  else
    Yajl.load(response.body)[schema_key]
  end
end
subject_version(subject, schema_key, version = "latest") click to toggle source
# File lib/fluent/plugin/confluent_avro_schema_registry.rb, line 30
def subject_version(subject, schema_key, version = "latest")
  registry_uri = URI.parse(@registry_url)
  registry_uri_with_versions = URI.join(registry_uri, "/subjects/#{subject}/versions/#{version}")
  response = get_response(registry_uri_with_versions)
  if schema_key.nil?
    response.body
  else
    Yajl.load(response.body)[schema_key]
  end
end