class OneApm::Support::ForkedProcessChannel::Listener
Attributes
pipes[RW]
select_timeout[RW]
thread[R]
timeout[RW]
Public Class Methods
new()
click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 101 def initialize @pipes = {} @pipes_lock = Mutex.new @timeout = 360 @select_timeout = 60 end
Public Instance Methods
close_all_pipes()
click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 166 def close_all_pipes @pipes_lock.synchronize do @pipes.each do |id, pipe| pipe.close if pipe end @pipes = {} end end
register_pipe(id)
click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 113 def register_pipe(id) @pipes_lock.synchronize do @pipes[id] = Pipe.new end wakeup end
start()
click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 121 def start return if @started == true @started = true @thread = OneApm::Agent::Threading::AgentThread.create('Pipe Channel Manager') do now = nil loop do clean_up_pipes pipes_to_listen_to = @pipes_lock.synchronize do @pipes.values.map{|pipe| pipe.out} + [wake.out] end OneApm::Manager.record_metric('Supportability/Listeners', (Time.now - now).to_f) if now if ready = IO.select(pipes_to_listen_to, [], [], @select_timeout) now = Time.now ready_pipes = ready[0] ready_pipes.each do |pipe| merge_data_from_pipe(pipe) unless pipe == wake.out end wake.out.read(1) if ready_pipes.include?(wake.out) end break unless should_keep_listening? end end sleep 0.001 end
started?()
click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 179 def started? @started end
stop()
click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 158 def stop return unless @started == true stop_listener_thread close_all_pipes @wake.close @wake = nil end
stop_listener_thread()
click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 152 def stop_listener_thread @started = false wakeup @thread.join end
wake()
click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 175 def wake @wake ||= Pipe.new end
wakeup()
click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 109 def wakeup wake.in << '.' end
Protected Instance Methods
clean_up_pipes()
click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 217 def clean_up_pipes @pipes_lock.synchronize do @pipes.values.each do |pipe| if pipe.last_read.to_f + @timeout < Time.now.to_f pipe.close unless pipe.closed? end end @pipes.reject! {|id, pipe| pipe.out.closed? } end end
find_pipe_for_handle(out_handle)
click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 228 def find_pipe_for_handle(out_handle) @pipes_lock.synchronize do @pipes.values.find{|pipe| pipe.out == out_handle } end end
merge_data_from_pipe(pipe_handle)
click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 185 def merge_data_from_pipe(pipe_handle) pipe = find_pipe_for_handle(pipe_handle) raw_payload = pipe.read if raw_payload && !raw_payload.empty? if raw_payload == Pipe::READY_MARKER pipe.after_fork_in_parent else payload = unmarshal(raw_payload) if payload endpoint, items = payload OneApm::Manager.agent.merge_data_for_endpoint(endpoint, items) end end end pipe.close if pipe.eof? end
should_keep_listening?()
click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 213 def should_keep_listening? @started || @pipes_lock.synchronize { @pipes.values.find{|pipe| !pipe.in.closed?} } end
unmarshal(data)
click to toggle source
# File lib/one_apm/support/forked_process_channel.rb, line 203 def unmarshal(data) OneApm::LanguageSupport.with_cautious_gc do Marshal.load(data) end rescue StandardError => e OneApm::Manager.logger.error "Failure unmarshalling message from Resque child process", e OneApm::Manager.logger.debug Base64.encode64(data) nil end