class Quartz::GoProcess

Constants

MAX_MESSAGE_SIZE
READ_EXCEPTION

Attributes

seed[R]
socket_path[R]
temp_file_path[R]

Public Class Methods

cleanup() click to toggle source
# File lib/quartz/go_process.rb, line 163
def self.cleanup
  return unless @processes
  @processes.each(&:cleanup)
  @processes = []
end
clear_processes() click to toggle source
# File lib/quartz/go_process.rb, line 11
def self.clear_processes
  @processes = []
end
new(opts) click to toggle source
# File lib/quartz/go_process.rb, line 25
def initialize(opts)
  @seed = SecureRandom.hex
  socket_dir = opts.fetch(:socket_dir) { '/tmp' }
  @socket_path = opts[:socket_path] || File.join(socket_dir, "quartz_#{seed}.sock")
  ENV['QUARTZ_SOCKET'] = @socket_path

  if opts[:file_path]
    Quartz::Validations.check_for_go
    compile_and_run(opts[:file_path])
  elsif opts[:bin_path]
    @go_process = IO.popen(opts[:bin_path])
  elsif opts[:socket_path]
    @external_socket = true
  else
    raise Quartz::ConfigError, 'Missing go binary'
  end

  block_until_server_starts
  self.class.processes << self
end
processes() click to toggle source
# File lib/quartz/go_process.rb, line 7
def self.processes
  @processes ||= []
end

Public Instance Methods

block_until_server_starts() click to toggle source
# File lib/quartz/go_process.rb, line 68
def block_until_server_starts
  max_retries = 20
  retries = 0
  delay = 0.001 # seconds

  loop do
    return if File.exists?(@socket_path)
    raise Quartz::GoServerError, 'RPC server not starting' if retries > max_retries
    sleep(delay * 2**retries)
    retries += 1
  end
end
call(struct_name, method, args) click to toggle source
# File lib/quartz/go_process.rb, line 100
def call(struct_name, method, args)
  payload = {
    'method' => "#{struct_name}.#{method}",
    'params' => [args],
    'id' => 1
  }

  begin
    socket.send(MultiJson.dump(payload), 0)
  rescue Errno::EPIPE
    # Retry send with a new socket. We might trigger this if Go
    # process restarted.  There's a good chance that this raises the
    # exact same error.
    new_socket!
    socket.send(MultiJson.dump(payload), 0)
  end

  read
end
cleanup() click to toggle source
# File lib/quartz/go_process.rb, line 144
def cleanup
  # If we've forked, there's no need to cleanup since the parent
  # process will.
  return if @forked

  # If the Go process is managed externally, there's nothing to do.
  return if @external_socket

  unless @killed_go_process
    Process.kill('SIGTERM', pid)
    Process.wait(pid)
    @killed_go_process = true
  end

  if @temp_file_path && File.exists?(@temp_file_path)
    File.delete(@temp_file_path)
  end
end
compile_and_run(path) click to toggle source
# File lib/quartz/go_process.rb, line 46
def compile_and_run(path)
  @temp_file_path = "/tmp/quartz_runner_#{seed}"

  unless system('go', 'build', '-o', @temp_file_path, path)
    raise Quartz::ConfigError, 'Go compilation failed'
  end

  @go_process = IO.popen(@temp_file_path)
end
forked_mode!() click to toggle source
# File lib/quartz/go_process.rb, line 15
def forked_mode!
  if @forked.nil?
    @forked = true
  else
    @forked = !@forked
  end

  new_socket! if @forked
end
get_metadata() click to toggle source
# File lib/quartz/go_process.rb, line 81
def get_metadata
  payload = {
    'method' => 'Quartz.GetMetadata',
    'params' => [],
    # This parameter isn't needed because we use a different
    # connection for each thread.
    'id' => 1
  }

  socket.send(MultiJson.dump(payload), 0)
  response = read

  if response['error']
    raise Quartz::GoResponseError, "Metadata error: #{response['error']}"
  end

  response['result']
end
new_socket!() click to toggle source
# File lib/quartz/go_process.rb, line 60
def new_socket!
  Thread.current["quartz_socket_#{seed}".to_sym] = UNIXSocket.new(@socket_path)
end
pid() click to toggle source
# File lib/quartz/go_process.rb, line 56
def pid
  @go_process.pid
end
read() click to toggle source
# File lib/quartz/go_process.rb, line 128
def read
  value = ''
  loop do
    begin
      value << socket.recv_nonblock(MAX_MESSAGE_SIZE)
      break if value.end_with?("\n")
    rescue READ_EXCEPTION
      IO.select([socket], [], [])
    rescue Errno::EPIPE
      new_socket!
    end
  end

  MultiJson.load(value)
end
socket() click to toggle source
# File lib/quartz/go_process.rb, line 64
def socket
  Thread.current["quartz_socket_#{seed}".to_sym] ||= UNIXSocket.new(@socket_path)
end