class Agent

Attributes

thread[R]

Public Class Methods

dispose() click to toggle source
# File lib/wattics-api-client/agent.rb, line 24
def self.dispose
  @@mutex.synchronize do
    unless @@instance.nil?
      @@instance.agent_thread_group.list.each(&:kill)
      @@instance = nil
    end
  end
end
get_instance(maximum_parallel_senders = 0) click to toggle source
# File lib/wattics-api-client/agent.rb, line 18
def self.get_instance(maximum_parallel_senders = 0)
  @@mutex.synchronize do
    @@instance ||= new(maximum_parallel_senders)
  end
end

Private Class Methods

new(maximum_parallel_senders = 0) click to toggle source
# File lib/wattics-api-client/agent.rb, line 6
def initialize(maximum_parallel_senders = 0)
  @agent_thread_group = ThreadGroup.new
  @processor_pool = ProcessorPool.new(self, @agent_thread_group, maximum_parallel_senders)
  @enqueued_measurements_with_config = Hash.new { |h, k| h[k] = [] }
  @sent_measurements_with_context = BlockingQueue.new
  @measurement_sent_handler_list = Concurrent::Array.new
  start_processor_feeder
  start_measurement_sent_handler_dispatcher
  @wait_semaphore = Concurrent::Semaphore.new(0)
end

Public Instance Methods

add_measurement_sent_handler() { || ... } click to toggle source
# File lib/wattics-api-client/agent.rb, line 111
def add_measurement_sent_handler
  @measurement_sent_handler_list << yield
end
report_sent_measurement(measurement, response) click to toggle source
# File lib/wattics-api-client/agent.rb, line 105
def report_sent_measurement(measurement, response)
  #Dont need to send to the measurement handler, <=400 erros are reported on the processor
  @sent_measurements_with_context << [measurement, response] if response.code <= 400
  @wait_semaphore.acquire
end
send(measurement, config) click to toggle source
# File lib/wattics-api-client/agent.rb, line 80
def send(measurement, config)
  if measurement.is_a?(Array)
    @wait_semaphore.release(measurement.size)
    measurement_groups = measurement.group_by(&:id)
    measurement_groups.each do |channel_id, measurements_for_channel_id|
      measurements_with_config = measurements_for_channel_id.map { |measurement| MeasurementWithConfig.new(measurement, config) }
      @processor_already_bound_to_channel_id = @processor_pool.get_processor(channel_id)
      if @processor_already_bound_to_channel_id.nil?
        @enqueued_measurements_with_config[channel_id] += measurements_with_config
      else
        @processor_already_bound_to_channel_id.process(measurements_with_config)
      end
    end
  else
    @wait_semaphore.release
    measurement_with_config = MeasurementWithConfig.new(measurement, config)
    @processor_already_bound_to_channel_id = @processor_pool.get_processor(measurement.id)
    if @processor_already_bound_to_channel_id.nil?
      @enqueued_measurements_with_config[measurement.id] << measurement_with_config
    else
      @processor_already_bound_to_channel_id.process(measurement_with_config)
    end
  end
end
sleep_fix() click to toggle source
# File lib/wattics-api-client/agent.rb, line 76
def sleep_fix
  sleep 0.1
end
start_measurement_sent_handler_dispatcher() click to toggle source
# File lib/wattics-api-client/agent.rb, line 61
def start_measurement_sent_handler_dispatcher
  @agent_thread_group.add(Thread.new do
    begin
      loop do
        array = @sent_measurements_with_context.pop
        next if array.nil?
        measurement = array[0]
        response = array[1]
        @measurement_sent_handler_list.each { |handler| handler.call(measurement, response) }
      end
    rescue ThreadError
    end
  end)
end
start_processor_feeder() click to toggle source
# File lib/wattics-api-client/agent.rb, line 39
def start_processor_feeder
  @agent_thread_group.add(Thread.new do
    begin
      loop do
        key, values = @enqueued_measurements_with_config.first
        if @enqueued_measurements_with_config.empty?
          sleep_fix
          next
        end
        processor = @processor_pool.get_processor(key)
        if processor.nil?
          sleep_fix
          next
        end
        @enqueued_measurements_with_config.delete(key)
        processor.process(values)
      end
    rescue ThreadError
    end
  end)
end
wait_until_last() click to toggle source
# File lib/wattics-api-client/agent.rb, line 33
def wait_until_last
  Thread.new do
    sleep 0.01 while @wait_semaphore.available_permits != 0
  end.join
end