class Quartz::GoProcess
Constants
- MAX_MESSAGE_SIZE
- READ_EXCEPTION
Attributes
seed[R]
socket_path[R]
temp_file_path[R]
Public Class Methods
cleanup()
click to toggle source
# File lib/quartz/go_process.rb, line 163 def self.cleanup return unless @processes @processes.each(&:cleanup) @processes = [] end
clear_processes()
click to toggle source
# File lib/quartz/go_process.rb, line 11 def self.clear_processes @processes = [] end
new(opts)
click to toggle source
# File lib/quartz/go_process.rb, line 25 def initialize(opts) @seed = SecureRandom.hex socket_dir = opts.fetch(:socket_dir) { '/tmp' } @socket_path = opts[:socket_path] || File.join(socket_dir, "quartz_#{seed}.sock") ENV['QUARTZ_SOCKET'] = @socket_path if opts[:file_path] Quartz::Validations.check_for_go compile_and_run(opts[:file_path]) elsif opts[:bin_path] @go_process = IO.popen(opts[:bin_path]) elsif opts[:socket_path] @external_socket = true else raise Quartz::ConfigError, 'Missing go binary' end block_until_server_starts self.class.processes << self end
processes()
click to toggle source
# File lib/quartz/go_process.rb, line 7 def self.processes @processes ||= [] end
Public Instance Methods
block_until_server_starts()
click to toggle source
# File lib/quartz/go_process.rb, line 68 def block_until_server_starts max_retries = 20 retries = 0 delay = 0.001 # seconds loop do return if File.exists?(@socket_path) raise Quartz::GoServerError, 'RPC server not starting' if retries > max_retries sleep(delay * 2**retries) retries += 1 end end
call(struct_name, method, args)
click to toggle source
# File lib/quartz/go_process.rb, line 100 def call(struct_name, method, args) payload = { 'method' => "#{struct_name}.#{method}", 'params' => [args], 'id' => 1 } begin socket.send(MultiJson.dump(payload), 0) rescue Errno::EPIPE # Retry send with a new socket. We might trigger this if Go # process restarted. There's a good chance that this raises the # exact same error. new_socket! socket.send(MultiJson.dump(payload), 0) end read end
cleanup()
click to toggle source
# File lib/quartz/go_process.rb, line 144 def cleanup # If we've forked, there's no need to cleanup since the parent # process will. return if @forked # If the Go process is managed externally, there's nothing to do. return if @external_socket unless @killed_go_process Process.kill('SIGTERM', pid) Process.wait(pid) @killed_go_process = true end if @temp_file_path && File.exists?(@temp_file_path) File.delete(@temp_file_path) end end
compile_and_run(path)
click to toggle source
# File lib/quartz/go_process.rb, line 46 def compile_and_run(path) @temp_file_path = "/tmp/quartz_runner_#{seed}" unless system('go', 'build', '-o', @temp_file_path, path) raise Quartz::ConfigError, 'Go compilation failed' end @go_process = IO.popen(@temp_file_path) end
forked_mode!()
click to toggle source
# File lib/quartz/go_process.rb, line 15 def forked_mode! if @forked.nil? @forked = true else @forked = !@forked end new_socket! if @forked end
get_metadata()
click to toggle source
# File lib/quartz/go_process.rb, line 81 def get_metadata payload = { 'method' => 'Quartz.GetMetadata', 'params' => [], # This parameter isn't needed because we use a different # connection for each thread. 'id' => 1 } socket.send(MultiJson.dump(payload), 0) response = read if response['error'] raise Quartz::GoResponseError, "Metadata error: #{response['error']}" end response['result'] end
new_socket!()
click to toggle source
# File lib/quartz/go_process.rb, line 60 def new_socket! Thread.current["quartz_socket_#{seed}".to_sym] = UNIXSocket.new(@socket_path) end
pid()
click to toggle source
# File lib/quartz/go_process.rb, line 56 def pid @go_process.pid end
read()
click to toggle source
# File lib/quartz/go_process.rb, line 128 def read value = '' loop do begin value << socket.recv_nonblock(MAX_MESSAGE_SIZE) break if value.end_with?("\n") rescue READ_EXCEPTION IO.select([socket], [], []) rescue Errno::EPIPE new_socket! end end MultiJson.load(value) end
socket()
click to toggle source
# File lib/quartz/go_process.rb, line 64 def socket Thread.current["quartz_socket_#{seed}".to_sym] ||= UNIXSocket.new(@socket_path) end