class OneApm::Support::ForkedProcessChannel::Listener

Attributes

pipes[RW]
select_timeout[RW]
thread[R]
timeout[RW]

Public Class Methods

new() click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 101
def initialize
  @pipes = {}
  @pipes_lock = Mutex.new

  @timeout = 360
  @select_timeout = 60
end

Public Instance Methods

close_all_pipes() click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 166
def close_all_pipes
  @pipes_lock.synchronize do
    @pipes.each do |id, pipe|
      pipe.close if pipe
    end
    @pipes = {}
  end
end
register_pipe(id) click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 113
def register_pipe(id)
  @pipes_lock.synchronize do
    @pipes[id] = Pipe.new
  end

  wakeup
end
start() click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 121
def start
  return if @started == true
  @started = true
  @thread = OneApm::Agent::Threading::AgentThread.create('Pipe Channel Manager') do
    now = nil
    loop do
      clean_up_pipes

      pipes_to_listen_to = @pipes_lock.synchronize do
        @pipes.values.map{|pipe| pipe.out} + [wake.out]
      end

      OneApm::Manager.record_metric('Supportability/Listeners', (Time.now - now).to_f) if now

      if ready = IO.select(pipes_to_listen_to, [], [], @select_timeout)
        now = Time.now

        ready_pipes = ready[0]
        ready_pipes.each do |pipe|
          merge_data_from_pipe(pipe) unless pipe == wake.out
        end

        wake.out.read(1) if ready_pipes.include?(wake.out)
      end

      break unless should_keep_listening?
    end
  end
  sleep 0.001
end
started?() click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 179
def started?
  @started
end
stop() click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 158
def stop
  return unless @started == true
  stop_listener_thread
  close_all_pipes
  @wake.close
  @wake = nil
end
stop_listener_thread() click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 152
def stop_listener_thread
  @started = false
  wakeup
  @thread.join
end
wake() click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 175
def wake
  @wake ||= Pipe.new
end
wakeup() click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 109
def wakeup
  wake.in << '.'
end

Protected Instance Methods

clean_up_pipes() click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 217
def clean_up_pipes
  @pipes_lock.synchronize do
    @pipes.values.each do |pipe|
      if pipe.last_read.to_f + @timeout < Time.now.to_f
        pipe.close unless pipe.closed?
      end
    end
    @pipes.reject! {|id, pipe| pipe.out.closed? }
  end
end
find_pipe_for_handle(out_handle) click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 228
def find_pipe_for_handle(out_handle)
  @pipes_lock.synchronize do
    @pipes.values.find{|pipe| pipe.out == out_handle }
  end
end
merge_data_from_pipe(pipe_handle) click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 185
def merge_data_from_pipe(pipe_handle)
  pipe = find_pipe_for_handle(pipe_handle)
  raw_payload = pipe.read
  if raw_payload && !raw_payload.empty?
    if raw_payload == Pipe::READY_MARKER
      pipe.after_fork_in_parent
    else
      payload = unmarshal(raw_payload)
      if payload
        endpoint, items = payload
        OneApm::Manager.agent.merge_data_for_endpoint(endpoint, items)
      end
    end
  end

  pipe.close if pipe.eof?
end
should_keep_listening?() click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 213
def should_keep_listening?
  @started || @pipes_lock.synchronize { @pipes.values.find{|pipe| !pipe.in.closed?} }
end
unmarshal(data) click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 203
def unmarshal(data)
  OneApm::LanguageSupport.with_cautious_gc do
    Marshal.load(data)
  end
rescue StandardError => e
  OneApm::Manager.logger.error "Failure unmarshalling message from Resque child process", e
  OneApm::Manager.logger.debug Base64.encode64(data)
  nil
end