class Processor
Public Class Methods
new(agent)
click to toggle source
# File lib/wattics-api-client/processor.rb, line 5 def initialize(agent) @agent = agent @measurements_with_config = PriorityBlockingQueue.new @semaphore = Concurrent::Semaphore.new(0) @is_sending = false @mutex = Mutex.new @logger = Logger.new(STDOUT) @logger.level = Logger::WARN end
Public Instance Methods
is_idle?()
click to toggle source
# File lib/wattics-api-client/processor.rb, line 24 def is_idle? @mutex.synchronize do @measurements_with_config.is_empty? && !@is_sending end end
process(measurement_with_config)
click to toggle source
# File lib/wattics-api-client/processor.rb, line 15 def process(measurement_with_config) @measurements_with_config << measurement_with_config if measurement_with_config.is_a?(Array) @semaphore.release(measurement_with_config.size) else @semaphore.release end end
run()
click to toggle source
# File lib/wattics-api-client/processor.rb, line 30 def run client = ClientFactory.get_instance.create_client loop do @semaphore.acquire @mutex.synchronize do @measurement_with_config = @measurements_with_config.pop @is_sending = true end @measurement = @measurement_with_config.measurement @config = @measurement_with_config.config loop do begin @response = client.send(@measurement, @config) if !@agent.nil? && @response.code < 400 @agent.report_sent_measurement(@measurement, @response) end if !@agent.nil? && @response.code >= 400 @agent.report_sent_measurement(@measurement, @response) if defined?(Rails).nil? @logger.error("Could not send #{@measurement}, Server Response: #{@response.body}") else Rails.logger.error("Could not send #{@measurement}, Server Response: #{@response.body}") puts "Could not send #{@measurement}, Server Response: #{@response.body}" end end break rescue StandardError => e if defined?(Rails).nil? @logger.error("Could not send #{@measurement}, Error: #{e}") else Rails.logger.error("Could not send #{@measurement}, Error: #{e}") puts "Could not send #{@measurement}, Error: #{e}" end sleep 60 end end @mutex.synchronize do @is_sending = false end end rescue StandardError => e if defined?(Rails).nil? @logger.error("Thread stopped unexpectedly: #{e.message}") else Rails.logger.error("Thread stopped unexpectedly: #{e.message}") puts "Thread stopped unexpectedly: #{e.message}" end end