class Fluent::Plugin::ConfluentAvroSchemaRegistry
Public Class Methods
new(registry_url, api_key=nil, api_secret=nil)
click to toggle source
# File lib/fluent/plugin/confluent_avro_schema_registry.rb, line 24 def initialize(registry_url, api_key=nil, api_secret=nil) @registry_url = registry_url @api_key = api_key @api_secret = api_secret end
Public Instance Methods
get_response(uri)
click to toggle source
# File lib/fluent/plugin/confluent_avro_schema_registry.rb, line 52 def get_response(uri) response = Net::HTTP.start(uri.host, uri.port, :use_ssl => (uri.scheme == "https")) do |http| request = Net::HTTP::Get.new(uri.path) if @api_key and @api_secret request.basic_auth(@api_key, @api_secret) end http.request(request) end if @api_key and @api_secret if response.is_a?(Net::HTTPUnauthorized) raise ConfluentAvroSchemaRegistryUnauthorizedError end end response end
schema_with_id(schema_id, schema_key)
click to toggle source
# File lib/fluent/plugin/confluent_avro_schema_registry.rb, line 41 def schema_with_id(schema_id, schema_key) registry_uri = URI.parse(@registry_url) registry_uri_with_ids = URI.join(registry_uri, "/schemas/ids/#{schema_id}") response = get_response(registry_uri_with_ids) if schema_key.nil? response.body else Yajl.load(response.body)[schema_key] end end
subject_version(subject, schema_key, version = "latest")
click to toggle source
# File lib/fluent/plugin/confluent_avro_schema_registry.rb, line 30 def subject_version(subject, schema_key, version = "latest") registry_uri = URI.parse(@registry_url) registry_uri_with_versions = URI.join(registry_uri, "/subjects/#{subject}/versions/#{version}") response = get_response(registry_uri_with_versions) if schema_key.nil? response.body else Yajl.load(response.body)[schema_key] end end