class RFlow::Message::Data

Should proxy most methods to {data_object} that we can serialize to Avro using the schema. Extensions should use extended hook to apply immediate changes.

Attributes

data_object[RW]

The data object for the message. @return [Object]

schema[R]

Avro parsed version of the schema the data follows @return [::Avro::Schema]

schema_string[R]

The string form of the schema the data follows. @return [String]

serialization_type[R]

Serialization type. Currently, always avro. @return [String]

Public Class Methods

new(schema_string, serialization_type = 'avro', serialized_data = nil) click to toggle source
# File lib/rflow/message.rb, line 192
def initialize(schema_string, serialization_type = 'avro', serialized_data = nil)
  raise ArgumentError, 'Only Avro serialization_type supported at the moment' unless serialization_type.to_s == 'avro'

  @schema_string = schema_string
  @serialization_type = serialization_type.to_s

  begin
    @schema = ::Avro::Schema.parse(schema_string)
    @writer = ::Avro::IO::DatumWriter.new(@schema)
  rescue Exception => e
    raise ArgumentError, "Invalid schema '#{@schema_string}': #{e}: #{e.message}"
  end

  if serialized_data
    serialized_data.force_encoding 'BINARY'
    @data_object = RFlow::Avro.decode(::Avro::IO::DatumReader.new(schema, schema), serialized_data)
  end
end

Public Instance Methods

method_missing(method_sym, *args, &block) click to toggle source

Proxy methods down to the underlying {data_object}, probably a Hash. Hopefully an extension will provide any additional functionality so this won't be called unless needed. @return [void]

# File lib/rflow/message.rb, line 227
def method_missing(method_sym, *args, &block)
  @data_object.send(method_sym, *args, &block)
end
to_avro() click to toggle source

Encode the message out to real Avro. @return [String]

# File lib/rflow/message.rb, line 219
def to_avro
  RFlow::Avro.encode @writer, @data_object
end
valid?() click to toggle source

Is the message valid per the Avro schema? @return [boolean]

# File lib/rflow/message.rb, line 213
def valid?
  ::Avro::Schema.validate @schema, @data_object
end