class RFlow::Message

A message to be sent around in the RFlow framework.

Attributes

data[R]

The actual data string in the message. @return [String]

data_type_name[R]

The data type name of the message. @return [String]

properties[RW]

The message's properties information. @return [Hash]

provenance[RW]

The message's provenance information. @return [Array<ProcessingEvent>]

Public Class Methods

encode(message) click to toggle source

@!visibility private

# File lib/rflow/message.rb, line 42
def encode(message); RFlow::Avro.encode(message_writer, message); end
from_avro(bytes) click to toggle source

Take in an Avro serialization of a message and return a new Message object. Assumes the org.rflow.Message Avro schema. @!visibility private

# File lib/rflow/message.rb, line 47
def from_avro(bytes)
  message = RFlow::Avro.decode(message_reader, bytes)
  Message.new(message['data_type_name'], message['provenance'], message['properties'],
              message['data_serialization_type'], message['data_schema'],
              message['data'])
end
message_reader() click to toggle source

@!visibility private

# File lib/rflow/message.rb, line 38
def message_reader; @message_reader ||= ::Avro::IO::DatumReader.new(schema, schema); end
message_writer() click to toggle source

@!visibility private

# File lib/rflow/message.rb, line 40
def message_writer; @message_writer ||= ::Avro::IO::DatumWriter.new(schema); end
new(data_type_name, provenance = [], properties = {}, serialization_type = 'avro', schema = nil, serialized_data = nil) click to toggle source

When creating a new message as a transformation of an existing message, it's encouraged to copy the provenance and properties of the original message into the new message. This allows downstream components to potentially use these fields.

# File lib/rflow/message.rb, line 75
def initialize(data_type_name, provenance = [], properties = {}, serialization_type = 'avro', schema = nil, serialized_data = nil)
  @data_type_name = data_type_name.to_s

  # Turn the provenance array of Hashes into an array of
  # ProcessingEvents for easier access and time validation.
  # TODO: do this lazily so as not to create/destroy objects that are
  # never used
  @provenance = (provenance || []).map do |event|
    if event.is_a? ProcessingEvent
      event
    else
      ProcessingEvent.new(event['component_instance_uuid'],
                          event['started_at'], event['completed_at'],
                          event['context'])
    end
  end

  @properties = properties || {}

  # TODO: Make this better.  This check is technically
  # unnecessary, as we are able to completely deserialize the
  # message without needing to resort to the registered schema.
  registered_schema = RFlow::Configuration.available_data_types[@data_type_name][serialization_type.to_s]
  unless registered_schema
    raise ArgumentError, "Data type '#{@data_type_name}' with serialization_type '#{serialization_type}' not found"
  end

  # TODO: think about registering the schemas automatically if not
  # found in Configuration
  if schema && (registered_schema != schema)
    raise ArgumentError, "Passed schema ('#{schema}') does not equal registered schema ('#{registered_schema}') for data type '#{@data_type_name}' with serialization_type '#{serialization_type}'"
  end

  @data = Data.new(registered_schema, serialization_type.to_s, serialized_data)

  # Get the extensions and apply them to the data object to add capability
  RFlow::Configuration.available_data_extensions[@data_type_name].each do |e|
    RFlow.logger.debug "Extending '#{data_type_name}' with extension '#{e}'"
    @data.extend e
  end
end
schema() click to toggle source

@!visibility private

# File lib/rflow/message.rb, line 36
def schema; @schema ||= ::Avro::Schema.parse(File.read(File.join(File.dirname(__FILE__), '..', '..', 'schema', 'message.avsc'))); end

Public Instance Methods

to_avro() click to toggle source

Serialize the current message object to Avro using the org.rflow.Message Avro schema. Note that we have to manually set the encoding for Ruby 1.9, otherwise the stringio would use UTF-8 by default, which would not work correctly, as a serialize avro string is BINARY, not UTF-8. @return [String]

# File lib/rflow/message.rb, line 123
def to_avro
  # stringify all the properties
  string_properties = Hash[properties.map { |k,v| [k.to_s, v.to_s] }]

  Message.encode('data_type_name' => data_type_name.to_s,
                 'provenance' => provenance.map(&:to_hash),
                 'properties' => string_properties.to_hash,
                 'data_serialization_type' => data.serialization_type.to_s,
                 'data_schema' => data.schema_string,
                 'data' => data.to_avro)
end