class AvroTurf::SchemaStore
Public Class Methods
new(path: nil)
click to toggle source
# File lib/avro_turf/schema_store.rb, line 3 def initialize(path: nil) @path = path or raise "Please specify a schema path" @schemas = Hash.new @mutex = Mutex.new end
Public Instance Methods
find(name, namespace = nil)
click to toggle source
Resolves and returns a schema.
schema_name - The String
name of the schema to resolve.
Returns an Avro::Schema.
# File lib/avro_turf/schema_store.rb, line 14 def find(name, namespace = nil) fullname = Avro::Name.make_fullname(name, namespace) # Optimistic non-blocking read from @schemas # No sense to lock the resource when all the schemas already loaded return @schemas[fullname] if @schemas.key?(fullname) # Pessimistic blocking write to @schemas @mutex.synchronize do # Still need to check is the schema already loaded return @schemas[fullname] if @schemas.key?(fullname) load_schema!(fullname, @schemas.dup) end end
load_schemas!()
click to toggle source
Loads all schema definition files in the ‘schemas_dir`.
# File lib/avro_turf/schema_store.rb, line 30 def load_schemas! pattern = [@path, "**", "*.avsc"].join("/") Dir.glob(pattern) do |schema_path| # Remove the path prefix. schema_path.sub!(/^\/?#{@path}\//, "") # Replace `/` with `.` and chop off the file extension. schema_name = File.basename(schema_path.tr("/", "."), ".avsc") # Load and cache the schema. find(schema_name) end end
Protected Instance Methods
build_schema_path(fullname)
click to toggle source
# File lib/avro_turf/schema_store.rb, line 93 def build_schema_path(fullname) *namespace, schema_name = fullname.split(".") schema_path = File.join(@path, *namespace, schema_name + ".avsc") end
load_schema!(fullname, local_schemas_cache = {})
click to toggle source
Loads single schema Such method is not thread-safe, do not call it of from mutex synchronization routine
# File lib/avro_turf/schema_store.rb, line 49 def load_schema!(fullname, local_schemas_cache = {}) schema_path = build_schema_path(fullname) schema_json = JSON.parse(File.read(schema_path)) schema = Avro::Schema.real_parse(schema_json, local_schemas_cache) # Don't cache the parsed schema until after its fullname is validated if schema.respond_to?(:fullname) && schema.fullname != fullname raise AvroTurf::SchemaError, "expected schema `#{schema_path}' to define type `#{fullname}'" end # Cache only this new top-level schema by its fullname. It's critical # not to make every sub-schema resolvable at the top level here because # multiple different avsc files may define the same sub-schema, and # if we share the @schemas cache across all parsing contexts, the Avro # gem will raise an Avro::SchemaParseError when parsing another avsc # file that contains a subschema with the same fullname as one # encountered previously in a different file: # <Avro::SchemaParseError: The name "foo.bar" is already in use.> # Essentially, the only schemas that should be resolvable in @schemas # are those that have their own .avsc files on disk. @schemas[fullname] = schema schema rescue ::Avro::UnknownSchemaError => e # Try to first resolve a referenced schema from disk. # If this is successful, the Avro gem will have mutated the # local_schemas_cache, adding all the new schemas it found. load_schema!(::Avro::Name.make_fullname(e.type_name, e.default_namespace), local_schemas_cache) # Attempt to re-parse the original schema now that the dependency # has been resolved and use the now-updated local_schemas_cache to # pick up where we left off. local_schemas_cache.delete(fullname) # Ensure all sub-schemas are cleaned up to avoid conflicts when re-parsing # schema. local_schemas_cache.each_key do |schema_name| local_schemas_cache.delete(schema_name) unless File.exist?(build_schema_path(schema_name)) end load_schema!(fullname, @schemas.dup) rescue Errno::ENOENT, Errno::ENAMETOOLONG raise AvroTurf::SchemaNotFoundError, "could not find Avro schema at `#{schema_path}'" end