class Ractor::Wrapper::Server

This is the backend implementation of a wrapper. A Server runs within a Ractor, and manages a shared object. It handles communication with clients, translating those messages into method calls on the object. It runs worker threads internally to handle actual method calls.

See the {#run} method for an overview of the Server implementation and lifecycle.

@private

Public Instance Methods

run() click to toggle source

Handle the server lifecycle, running through the following phases:

  • init: Setup and spawning of worker threads.

  • running: Normal operation, until a stop request is received.

  • stopping: Waiting for worker threads to terminate.

  • cleanup: Clearing out of any lingering meessages.

The server returns the wrapped object, allowing one client Ractor to take it.

# File lib/ractor/wrapper.rb, line 460
def run
  init_phase
  running_phase
  stopping_phase
  cleanup_phase
  @object
rescue ::StandardError => e
  maybe_log("Unexpected error: #{e.inspect}")
  @object
end

Private Instance Methods

cleanup_phase() click to toggle source

In the **cleanup phase**, The Server closes its inbox, and iterates through one final time to ensure it has responded to all remaining requests with a refusal. It also makes another pass through the pending requests; if there are any left, it probably means a worker thread died without responding to it preoprly, so we send back an error message.

# File lib/ractor/wrapper.rb, line 586
def cleanup_phase
  ::Ractor.current.close_incoming
  maybe_log("Checking message queue for cleanup")
  loop do
    message = ::Ractor.receive
    refuse_method(message) if message.is_a?(Message) && message.type == :call
  end
  maybe_log("Checking current calls for cleanup")
  @current_calls.each_value do |request|
    refuse_method(request)
  end
rescue ::Ractor::ClosedError
  maybe_log("Message queue is empty")
end
handle_method(worker_num, request) click to toggle source

This is called within a worker thread to handle a method call request. It calls the method on the wrapped object, and then sends back a response to the caller. If an exception was raised, it sends back an error response. It tries very hard always to send a response of some kind; if an error occurs while constructing or sending a response, it will catch the exception and try to send a simpler response.

# File lib/ractor/wrapper.rb, line 609
def handle_method(worker_num, request)
  method_name, args, kwargs = request.data
  transaction = request.transaction
  sender = request.sender
  maybe_worker_log(worker_num, "Running method #{method_name} (transaction=#{transaction})")
  begin
    result = @object.send(method_name, *args, **kwargs)
    maybe_worker_log(worker_num, "Sending result (transaction=#{transaction})")
    sender.send(Message.new(:result, data: result, transaction: transaction),
                move: (@method_settings[method_name] || @method_settings[nil]).move_return?)
  rescue ::Exception => e # rubocop:disable Lint/RescueException
    maybe_worker_log(worker_num, "Sending exception (transaction=#{transaction})")
    begin
      sender.send(Message.new(:error, data: e, transaction: transaction))
    rescue ::StandardError
      safe_error = begin
        ::StandardError.new(e.inspect)
      rescue ::StandardError
        ::StandardError.new("Unknown error")
      end
      sender.send(Message.new(:error, data: safe_error, transaction: transaction))
    end
  end
end
init_phase() click to toggle source

In the **init phase**, the Server:

  • Receives an initial message providing the object to wrap, and server configuration such as thread count and communications settings.

  • Initializes the job queue and the pending request list.

  • Spawns worker threads.

# File lib/ractor/wrapper.rb, line 482
def init_phase
  opts = ::Ractor.receive
  @object = opts[:object]
  @logging = opts[:logging]
  @name = opts[:name]
  @method_settings = opts[:method_settings]
  @thread_count = opts[:threads]
  @queue = ::Queue.new
  @mutex = ::Mutex.new
  @current_calls = {}
  maybe_log("Spawning #{@thread_count} threads")
  (1..@thread_count).map do |worker_num|
    ::Thread.new { worker_thread(worker_num) }
  end
  maybe_log("Server initialized")
end
maybe_log(str) click to toggle source
# File lib/ractor/wrapper.rb, line 657
def maybe_log(str)
  return unless @logging
  time = ::Time.now.utc.strftime("%Y-%m-%dT%H:%M:%S.%L")
  $stderr.puts("[#{time} Ractor::Wrapper/#{@name} Server]: #{str}")
  $stderr.flush
end
maybe_worker_log(worker_num, str) click to toggle source
# File lib/ractor/wrapper.rb, line 664
def maybe_worker_log(worker_num, str)
  return unless @logging
  time = ::Time.now.utc.strftime("%Y-%m-%dT%H:%M:%S.%L")
  $stderr.puts("[#{time} Ractor::Wrapper/#{@name} Worker/#{worker_num}]: #{str}")
  $stderr.flush
end
refuse_method(request) click to toggle source

This is called from the main Ractor thread to report to a caller that the wrapper cannot handle a requested method call, likely because the wrapper is shutting down.

# File lib/ractor/wrapper.rb, line 639
def refuse_method(request)
  maybe_log("Refusing method call (transaction=#{message.transaction})")
  error = ::Ractor::ClosedError.new
  request.sender.send(Message.new(:error, data: error, transaction: message.transaction))
end
register_call(request) click to toggle source
# File lib/ractor/wrapper.rb, line 645
def register_call(request)
  @mutex.synchronize do
    @current_calls[request.transaction] = request
  end
end
running_phase() click to toggle source

In the **running phase**, the Server listens on the Ractor's inbox and handles messages for normal operation:

  • If it receives a `call` request, it adds it to the job queue from which a worker thread will pick it up. It also adds the request to a list of pending requests.

  • If it receives a `stop` request, we proceed to the stopping phase.

  • If it receives a `thread_stopped` message, that indicates one of the worker threads has unexpectedly stopped. We don't expect this to happen until the stopping phase, so if we do see it here, we conclude that something has gone wrong, and we proceed to the stopping phase.

# File lib/ractor/wrapper.rb, line 535
def running_phase
  loop do
    maybe_log("Waiting for message")
    request = ::Ractor.receive
    next unless request.is_a?(Message)
    case request.type
    when :call
      @queue.enq(request)
      register_call(request)
      maybe_log("Queued method #{request.data.first} (transaction=#{request.transaction})")
    when :thread_stopped
      maybe_log("Thread unexpectedly stopped: #{request.data}")
      @thread_count -= 1
      break
    when :stop
      maybe_log("Received stop")
      break
    end
  end
end
stopping_phase() click to toggle source

In the **stopping phase**, we close the job queue, which signals to all worker threads that they should finish their current task and then terminate. We then wait for acknowledgement messages from all workers before proceeding to the next phase. Any `call` requests received during stopping are refused (i.e. we send back an error response.) Any further `stop` requests are ignored.

# File lib/ractor/wrapper.rb, line 564
def stopping_phase
  @queue.close
  while @thread_count.positive?
    maybe_log("Waiting for message while stopping")
    message = ::Ractor.receive
    next unless request.is_a?(Message)
    case message.type
    when :call
      refuse_method(message)
    when :thread_stopped
      @thread_count -= 1
    end
  end
end
unregister_call(transaction) click to toggle source
# File lib/ractor/wrapper.rb, line 651
def unregister_call(transaction)
  @mutex.synchronize do
    @current_calls.delete(transaction)
  end
end
worker_thread(worker_num) click to toggle source

A worker thread repeatedly pulls a method call requests off the job queue, handles it, and sends back a response. It also removes the request from the pending request list to signal that it has responded. If no job is available, the thread blocks while waiting. If the queue is closed, the worker will send an acknowledgement message and then terminate.

# File lib/ractor/wrapper.rb, line 507
def worker_thread(worker_num)
  maybe_worker_log(worker_num, "Starting")
  loop do
    maybe_worker_log(worker_num, "Waiting for job")
    request = @queue.deq
    break if request.nil?
    handle_method(worker_num, request)
    unregister_call(request.transaction)
  end
ensure
  maybe_worker_log(worker_num, "Stopping")
  ::Ractor.current.send(Message.new(:thread_stopped, data: worker_num), move: true)
end