class SensorStream::Stream

The simple stream class is the base class for complex streams

Attributes

description[R]

Each of the qualities of the stream are immutable. The sensor stream API does not support changing stream attributes.

device[R]

Each of the qualities of the stream are immutable. The sensor stream API does not support changing stream attributes.

elements[R]

Each of the qualities of the stream are immutable. The sensor stream API does not support changing stream attributes.

name[R]

Each of the qualities of the stream are immutable. The sensor stream API does not support changing stream attributes.

streamid[R]

Each of the qualities of the stream are immutable. The sensor stream API does not support changing stream attributes.

Public Class Methods

new(newDevice, newStreamid, newName, newDescription, newElements) click to toggle source

Initalize the fields of the stream object

# File lib/SensorStream.rb, line 244
def initialize(newDevice, newStreamid, newName, newDescription, newElements)
  @device         = newDevice
  @streamid       = newStreamid
  @name           = newName
  @elements       = newElements
  @description    = newDescription
  @deferredEvents = []
end

Public Instance Methods

get_events(count = 1, start_time = nil, end_time = nil) click to toggle source

Download a list of events from the server from this stream

# File lib/SensorStream.rb, line 326
def get_events(count = 1, start_time = nil, end_time = nil)
  base = "/api/GetData/#{@streamid}?"
  base += "count=#{count}"
  base += "&start=#{start_time.strftime("%FT%T.%N%:z")}" unless start_time.nil?
  base += "&end=#{    end_time.strftime("%FT%T.%N%:z")}" unless end_time.nil?

  resp = SensorStream.make_http_get(base, {"Accept-Encoding" => "gzip"})

  if resp.code != "200"
    STDERR.puts  "Unable to load events from stream: #{resp.body}"
    return nil
  end
  
  if (resp["Content-Encoding"] == "gzip")
    puts "Content was zipped, inflating"
    content = Zlib::Inflate.inflate(resp.body)
    puts resp.body
  elsif
    content = resp.body
  end
  
  #puts content
  
  # We're only interested in the values and their timestamps
  # but the server gives us the device information anyway
  # Return only the Data array from the dictionary
  streams = (JSON.parse(content)["Streams"])
  data = nil;
  streams.each { |stream|
    if (stream["StreamID"] == @streamid)
      data = stream["Data"]
    end
  }

  if (data.nil?)
    STDERR.puts "Can't find data in get events for this stream"
    return nil
  end

  data.each { |event_dict|
    # Replace the time string with Ruby time objects
    event_dict["time"] = DateTime.strptime(event_dict["time"],"%FT%T.%N%:z")
  }
  return data
end
publish_deferred_events() click to toggle source

Publish all the deferred events (if there are any)

# File lib/SensorStream.rb, line 299
def publish_deferred_events
  # Ensure that there are deferred messages to send
  if(@deferred_events.nil? || @deferred_events.empty?)
     @deferred_events = [] unless !@deferred_events.nil?
    puts "No events to publish" 
    if (@deferred_events.nil?)
      puts "Deferred events nil."
    elsif (@deferred_events.empty?)
      puts "Deferred events empty."
    end
    return 0
  end

  resp = SensorStream.make_http_post("/api/AddData", @deferred_events, { "key" => @device.key })
  
  # If the send failed, keep the messages
  if resp.code != "200"
    puts "Unable to publish deferred events, keeping events."
    return 0
  else
    count = @deferred_events.count
    @deferred_events = []
    return count
  end
end
publish_event(values, time=nil) click to toggle source

Publish an event immediately. If it doesn't succeed, add to the deferred queue.

# File lib/SensorStream.rb, line 269
def publish_event(values, time=nil)    
  dict = {"streamid" => @streamid, "values" => values};
  dict["time"] = time.strftime("%FT%T.%N%:z") unless time.nil?
  attemptTime = Time.now

  resp = SensorStream.make_http_post("/api/AddData", [dict], { "key" => @device.key })

  if (resp.code != "200")
    STDERR.puts "Error publishing SensorStream event! (#{resp.code})\n#{resp.body} -- deferring message"
    publish_event_deferred(values, attemptTime)
    return false
  else
    return true
  end
end
publish_event_deferred(values, time=nil) click to toggle source

Add an event to the deferred message queue

# File lib/SensorStream.rb, line 286
def publish_event_deferred(values, time=nil)
  time ||= Time.now

  if(@deferred_events.nil?)
    @deferred_events = []
  end

  @deferred_events << { "streamid" => @streamid,
                        "values"   => values,
                        "time"     => time.strftime("%FT%T.%N%:z") }
end
to_s() click to toggle source

Create a useful (to a human) representation of the device for printing

# File lib/SensorStream.rb, line 254
def to_s
  elementsString = "";
  @elements.each do |element|
    elementsString += "\n\t\t#{element["name"]}" +
                      "\n\t\t\tTypes: #{element["type"]}" +
                      "\n\t\t\tUnits: #{element["units"]}"
  end
  
  "Name: #{@name} \n\tStreamID: #{@streamid} " + 
                 "\n\tElements: #{elementsString} " + 
                 "\n\tDescription: #{@description}" +
                 "\n\tDevice: #{device.device_name}"
end