class ProcessorPool

Public Class Methods

new(agent, agent_thread_group, maximum_parallel_senders = 0) click to toggle source
# File lib/wattics-api-client/processor_pool.rb, line 2
def initialize(agent, agent_thread_group, maximum_parallel_senders = 0)
  maximum_parallel_senders > 0 ? @max_processors = maximum_parallel_senders.freeze : @max_processors = (2 * Concurrent.processor_count).freeze
  @agent = agent
  @processors = {}
  @processor_thread_group = agent_thread_group
  @mutex = Mutex.new
end

Public Instance Methods

get_processor(channel_id) click to toggle source
# File lib/wattics-api-client/processor_pool.rb, line 10
def get_processor(channel_id)
  @mutex.synchronize do
    processor = @processors[channel_id]
    return processor unless processor.nil?
    if @processors.size < @max_processors
      @processors[channel_id] = spawn_new_processor
      @processors[channel_id]
    else
      rebind_processor_to_channel_id(channel_id)
    end
  end
end

Private Instance Methods

rebind_processor_to_channel_id(new_channel_id) click to toggle source
# File lib/wattics-api-client/processor_pool.rb, line 31
def rebind_processor_to_channel_id(new_channel_id)
  idle_processor_entry = @processors.select { |_key, value| value.is_idle? }.first
  return nil if idle_processor_entry.nil?
  old_channel_id = idle_processor_entry[0]
  idle_processor = idle_processor_entry[1]
  @processors.delete(old_channel_id)
  @processors[new_channel_id] = idle_processor
  idle_processor
end
spawn_new_processor() click to toggle source
# File lib/wattics-api-client/processor_pool.rb, line 25
def spawn_new_processor
  processor = Processor.new(@agent)
  @processor_thread_group.add(Thread.new { processor.run })
  processor
end