class OML4R::Channel

Measurement Channel

Attributes

url[R]

Public Class Methods

[](name = :default, domain = :default) click to toggle source
# File lib/oml4r.rb, line 577
def self.[](name = :default, domain = :default)
  key = "#{name}:#{domain}"
  unless (@@channels.key?(key))
    # If domain != :default and we have one for :default, create a new one
    if (domain != :default)
      if (dc = @@channels["#{name}:default"])
        return self._create(key, domain, dc.url)
      end
    end
    raise OML4RException.new "OML4R: Unknown channel '#{name}'"
  end
  @@channels[key]
end
_connect(fquri) click to toggle source

Parse the given fully-qualified collection URI, and return a suitably connected objet

Supported URIs are

tcp:host:port
file:/P/A/T/H

@param fquri [String] a fully qualified collection URI @return [IO] an object suitably connected to the required URL

@raise [OML4RException] in case of an unknown scheme

# File lib/oml4r.rb, line 563
def self._connect(fquri)
  scheme, host, port = fquri.split(':')
  out = case scheme
        when 'tcp'
          out = TCPSocket.new(host, port)
        when 'file'
          # host is really a filename here
          out = (host == '-' ? $stdout : File.open(host, "w+"))
        else
          raise OML4RException.new "OML4R: Unknown scheme '#{scheme}"
        end
  out
end
_create(key, domain, url) click to toggle source
# File lib/oml4r.rb, line 548
def self._create(key, domain, url)
  @@channels[key] = self.new(url, domain)
end
close_all() click to toggle source
# File lib/oml4r.rb, line 611
def self.close_all()
  @@channels.values.each { |c| c.close }
  @@channels = {}
  MPBase.__unfreeze__()
end
create(name, url, domain = :default) click to toggle source
# File lib/oml4r.rb, line 537
def self.create(name, url, domain = :default)
  key = "#{name}:#{domain}"
  if channel = @@channels[key]
    if url != channel.url
      raise OML4RException.new "OML4R: Channel '#{name}' already defined with different url"
    end
    return channel
  end
  return self._create(key, domain, url)
end
init_all(domain, nodeID, appName, startTime, protocol) click to toggle source
# File lib/oml4r.rb, line 591
def self.init_all(domain, nodeID, appName, startTime, protocol)
  @@default_domain = domain
  @@nodeID = nodeID
  @@appName = appName
  @@startTime = startTime
  @@protocol = protocol

  MPBase.__freeze__(appName, startTime)

  # send channel header
  @@channels.values.each { |c| c.init(nodeID, appName, startTime, protocol) }

  # send schema definitions
  MPBase.each_mp do |klass, defs|
    klass.__print_meta__(appName)
  end

  MPBase.__useOML__()
end
new(url, domain) click to toggle source
# File lib/oml4r.rb, line 658
def initialize(url, domain)
  @domain = domain
  @url = url
  @index = -1
  @schemas = []
  @header_sent = false
  @queue = Queue.new
  start_runner
end

Public Instance Methods

build_schema(mp_name, add_prefix, pdefs) click to toggle source
# File lib/oml4r.rb, line 627
def build_schema(mp_name, add_prefix, pdefs)
  @index += 1
  line = [@index, (!@@appName.nil? && add_prefix)? "#{@@appName}_#{mp_name}" : mp_name]
  pdefs.each do |d|
    line << "#{d[:name]}:#{d[:type]}"
  end
  msg = line.join(' ')
  @schemas << msg
  [@index, msg]
end
close() click to toggle source
# File lib/oml4r.rb, line 652
def close()
  @queue.push nil  # indicate end of work
  @runner.join()
end
init(nodeID, appName, startTime, protocol) click to toggle source
# File lib/oml4r.rb, line 647
def init(nodeID, appName, startTime, protocol)
  @nodeID, @appName, @startTime, @protocol = nodeID, appName, startTime, protocol
  @out = _connect(@url)
end
send(msg) click to toggle source
# File lib/oml4r.rb, line 638
def send(msg)
  @queue.push msg
end
send_schema_update(msg) click to toggle source
# File lib/oml4r.rb, line 642
def send_schema_update(msg)
  @header_sent = true
  @queue.push msg
end
url=(url) click to toggle source
# File lib/oml4r.rb, line 619
def url=(url)
  return if @url == url
  if @out
    raise "Can't change channel's URL when it is already connected"
  end
  @url = url
end

Protected Instance Methods

_connect(url) click to toggle source
# File lib/oml4r.rb, line 668
def _connect(url)
  if url.start_with? 'file:'
    proto, fname = url.split(':')
    out = (fname == '-' ? $stdout : File.open(fname, "w+"))
  elsif url.start_with? 'tcp:'
    #tcp:norbit.npc.nicta.com.au:3003
    proto, host, port = url.split(':')
    port ||= DEF_SERVER_PORT
    out = TCPSocket.new(host, port)
  else
    raise OML4RException.new "OML4R: Unknown transport in server url '#{url}'"
  end
  @out = out
end
_send(msg) click to toggle source
# File lib/oml4r.rb, line 737
def _send(msg)
  begin
    unless @header_sent
      _send_protocol_header(@out)
      @header_sent = true
    end
    @out.puts msg
    @out.flush

  rescue Errno::EPIPE
    # Trying to reconnect
    OML4R.logger.info "Trying to reconnect to '#{@url}'"
    loop do
      sleep 5
      begin
        @out = _connect(@url)
        @header_sent = false
        OML4R.logger.info "Reconnected to '#{@url}'"
        return _send(msg)
      rescue Errno::ECONNREFUSED => ex
        OML4R.logger.warn "Exception while reconnect '#{@url}' (#{ex.class})"
      end
      #Errno::ECONNREFUSED
    end
  end
end
_send_protocol_header(stream) click to toggle source
# File lib/oml4r.rb, line 684
def _send_protocol_header(stream)
  header = []
  header << "protocol: #{@protocol}"
  header << "content: text"
  d = (@domain == :default) ? @@default_domain : @domain
  raise MissingArgumentException.new "Missing domain name" unless d
  case @protocol || OML4R::DEF_PROTOCOL
  when 4
    header << "domain: #{d}"
    header << "start-time: #{@startTime.tv_sec}"
    header << "sender-id: #{@nodeID}"
    header << "app-name: #{@appName}"
    @schemas.each do |s|
      header << "schema: #{s}"
    end
    header << ""
  else
    raise OML4RException.new "Unsupported protocol #{@protocol}"
  end
  stream.puts header
end
start_runner() click to toggle source
# File lib/oml4r.rb, line 706
def start_runner
  @runner = Thread.new do
    active = true
    begin
      while (active)
        msg = @queue.pop
        active = !msg.nil?
        if !@queue.empty?
          ma = [msg]
          while !@queue.empty?
            msg = @queue.pop
            if (active = !msg.nil?)
              ma << msg
            end
          end
          msg = ma.join("\n")
        end
        #$stderr.puts ">>>>>>#{@domain}: <#{msg}>"
        unless msg.nil?
          _send msg
        end
      end
      @out.close unless @out == $stdout
      @out = nil
    rescue Exception => ex
      OML4R.logger.warn "Exception while sending message to channel '#{@url}' (#{ex})"
    end
    OML4R.logger.info "Channel #{url} closed"
  end
end