class RFlow::Message
A message to be sent around in the RFlow
framework.
Attributes
The actual data string in the message. @return [String]
The data type name of the message. @return [String]
The message's properties information. @return [Hash]
The message's provenance information. @return [Array<ProcessingEvent>]
Public Class Methods
@!visibility private
# File lib/rflow/message.rb, line 42 def encode(message); RFlow::Avro.encode(message_writer, message); end
Take in an Avro
serialization of a message and return a new Message
object. Assumes the org.rflow.Message Avro
schema. @!visibility private
# File lib/rflow/message.rb, line 47 def from_avro(bytes) message = RFlow::Avro.decode(message_reader, bytes) Message.new(message['data_type_name'], message['provenance'], message['properties'], message['data_serialization_type'], message['data_schema'], message['data']) end
@!visibility private
# File lib/rflow/message.rb, line 38 def message_reader; @message_reader ||= ::Avro::IO::DatumReader.new(schema, schema); end
@!visibility private
# File lib/rflow/message.rb, line 40 def message_writer; @message_writer ||= ::Avro::IO::DatumWriter.new(schema); end
When creating a new message as a transformation of an existing message, it's encouraged to copy the provenance and properties of the original message into the new message. This allows downstream components to potentially use these fields.
# File lib/rflow/message.rb, line 75 def initialize(data_type_name, provenance = [], properties = {}, serialization_type = 'avro', schema = nil, serialized_data = nil) @data_type_name = data_type_name.to_s # Turn the provenance array of Hashes into an array of # ProcessingEvents for easier access and time validation. # TODO: do this lazily so as not to create/destroy objects that are # never used @provenance = (provenance || []).map do |event| if event.is_a? ProcessingEvent event else ProcessingEvent.new(event['component_instance_uuid'], event['started_at'], event['completed_at'], event['context']) end end @properties = properties || {} # TODO: Make this better. This check is technically # unnecessary, as we are able to completely deserialize the # message without needing to resort to the registered schema. registered_schema = RFlow::Configuration.available_data_types[@data_type_name][serialization_type.to_s] unless registered_schema raise ArgumentError, "Data type '#{@data_type_name}' with serialization_type '#{serialization_type}' not found" end # TODO: think about registering the schemas automatically if not # found in Configuration if schema && (registered_schema != schema) raise ArgumentError, "Passed schema ('#{schema}') does not equal registered schema ('#{registered_schema}') for data type '#{@data_type_name}' with serialization_type '#{serialization_type}'" end @data = Data.new(registered_schema, serialization_type.to_s, serialized_data) # Get the extensions and apply them to the data object to add capability RFlow::Configuration.available_data_extensions[@data_type_name].each do |e| RFlow.logger.debug "Extending '#{data_type_name}' with extension '#{e}'" @data.extend e end end
@!visibility private
# File lib/rflow/message.rb, line 36 def schema; @schema ||= ::Avro::Schema.parse(File.read(File.join(File.dirname(__FILE__), '..', '..', 'schema', 'message.avsc'))); end
Public Instance Methods
Serialize the current message object to Avro
using the org.rflow.Message Avro
schema. Note that we have to manually set the encoding for Ruby 1.9, otherwise the stringio would use UTF-8 by default, which would not work correctly, as a serialize avro string is BINARY, not UTF-8. @return [String]
# File lib/rflow/message.rb, line 123 def to_avro # stringify all the properties string_properties = Hash[properties.map { |k,v| [k.to_s, v.to_s] }] Message.encode('data_type_name' => data_type_name.to_s, 'provenance' => provenance.map(&:to_hash), 'properties' => string_properties.to_hash, 'data_serialization_type' => data.serialization_type.to_s, 'data_schema' => data.schema_string, 'data' => data.to_avro) end