class Ractor::Wrapper::Server
This is the backend implementation of a wrapper. A Server
runs within a Ractor
, and manages a shared object. It handles communication with clients, translating those messages into method calls on the object. It runs worker threads internally to handle actual method calls.
See the {#run} method for an overview of the Server
implementation and lifecycle.
@private
Public Instance Methods
Handle the server lifecycle, running through the following phases:
-
init: Setup and spawning of worker threads.
-
running: Normal operation, until a stop request is received.
-
stopping: Waiting for worker threads to terminate.
-
cleanup: Clearing out of any lingering meessages.
The server returns the wrapped object, allowing one client Ractor
to take it.
# File lib/ractor/wrapper.rb, line 460 def run init_phase running_phase stopping_phase cleanup_phase @object rescue ::StandardError => e maybe_log("Unexpected error: #{e.inspect}") @object end
Private Instance Methods
In the **cleanup phase**, The Server
closes its inbox, and iterates through one final time to ensure it has responded to all remaining requests with a refusal. It also makes another pass through the pending requests; if there are any left, it probably means a worker thread died without responding to it preoprly, so we send back an error message.
# File lib/ractor/wrapper.rb, line 586 def cleanup_phase ::Ractor.current.close_incoming maybe_log("Checking message queue for cleanup") loop do message = ::Ractor.receive refuse_method(message) if message.is_a?(Message) && message.type == :call end maybe_log("Checking current calls for cleanup") @current_calls.each_value do |request| refuse_method(request) end rescue ::Ractor::ClosedError maybe_log("Message queue is empty") end
This is called within a worker thread to handle a method call request. It calls the method on the wrapped object, and then sends back a response to the caller. If an exception was raised, it sends back an error response. It tries very hard always to send a response of some kind; if an error occurs while constructing or sending a response, it will catch the exception and try to send a simpler response.
# File lib/ractor/wrapper.rb, line 609 def handle_method(worker_num, request) method_name, args, kwargs = request.data transaction = request.transaction sender = request.sender maybe_worker_log(worker_num, "Running method #{method_name} (transaction=#{transaction})") begin result = @object.send(method_name, *args, **kwargs) maybe_worker_log(worker_num, "Sending result (transaction=#{transaction})") sender.send(Message.new(:result, data: result, transaction: transaction), move: (@method_settings[method_name] || @method_settings[nil]).move_return?) rescue ::Exception => e # rubocop:disable Lint/RescueException maybe_worker_log(worker_num, "Sending exception (transaction=#{transaction})") begin sender.send(Message.new(:error, data: e, transaction: transaction)) rescue ::StandardError safe_error = begin ::StandardError.new(e.inspect) rescue ::StandardError ::StandardError.new("Unknown error") end sender.send(Message.new(:error, data: safe_error, transaction: transaction)) end end end
In the **init phase**, the Server:
-
Receives an initial message providing the object to wrap, and server configuration such as thread count and communications settings.
-
Initializes the job queue and the pending request list.
-
Spawns worker threads.
# File lib/ractor/wrapper.rb, line 482 def init_phase opts = ::Ractor.receive @object = opts[:object] @logging = opts[:logging] @name = opts[:name] @method_settings = opts[:method_settings] @thread_count = opts[:threads] @queue = ::Queue.new @mutex = ::Mutex.new @current_calls = {} maybe_log("Spawning #{@thread_count} threads") (1..@thread_count).map do |worker_num| ::Thread.new { worker_thread(worker_num) } end maybe_log("Server initialized") end
# File lib/ractor/wrapper.rb, line 657 def maybe_log(str) return unless @logging time = ::Time.now.utc.strftime("%Y-%m-%dT%H:%M:%S.%L") $stderr.puts("[#{time} Ractor::Wrapper/#{@name} Server]: #{str}") $stderr.flush end
# File lib/ractor/wrapper.rb, line 664 def maybe_worker_log(worker_num, str) return unless @logging time = ::Time.now.utc.strftime("%Y-%m-%dT%H:%M:%S.%L") $stderr.puts("[#{time} Ractor::Wrapper/#{@name} Worker/#{worker_num}]: #{str}") $stderr.flush end
This is called from the main Ractor
thread to report to a caller that the wrapper cannot handle a requested method call, likely because the wrapper is shutting down.
# File lib/ractor/wrapper.rb, line 639 def refuse_method(request) maybe_log("Refusing method call (transaction=#{message.transaction})") error = ::Ractor::ClosedError.new request.sender.send(Message.new(:error, data: error, transaction: message.transaction)) end
# File lib/ractor/wrapper.rb, line 645 def register_call(request) @mutex.synchronize do @current_calls[request.transaction] = request end end
In the **running phase**, the Server
listens on the Ractor's inbox and handles messages for normal operation:
-
If it receives a `call` request, it adds it to the job queue from which a worker thread will pick it up. It also adds the request to a list of pending requests.
-
If it receives a `stop` request, we proceed to the stopping phase.
-
If it receives a `thread_stopped` message, that indicates one of the worker threads has unexpectedly stopped. We don't expect this to happen until the stopping phase, so if we do see it here, we conclude that something has gone wrong, and we proceed to the stopping phase.
# File lib/ractor/wrapper.rb, line 535 def running_phase loop do maybe_log("Waiting for message") request = ::Ractor.receive next unless request.is_a?(Message) case request.type when :call @queue.enq(request) register_call(request) maybe_log("Queued method #{request.data.first} (transaction=#{request.transaction})") when :thread_stopped maybe_log("Thread unexpectedly stopped: #{request.data}") @thread_count -= 1 break when :stop maybe_log("Received stop") break end end end
In the **stopping phase**, we close the job queue, which signals to all worker threads that they should finish their current task and then terminate. We then wait for acknowledgement messages from all workers before proceeding to the next phase. Any `call` requests received during stopping are refused (i.e. we send back an error response.) Any further `stop` requests are ignored.
# File lib/ractor/wrapper.rb, line 564 def stopping_phase @queue.close while @thread_count.positive? maybe_log("Waiting for message while stopping") message = ::Ractor.receive next unless request.is_a?(Message) case message.type when :call refuse_method(message) when :thread_stopped @thread_count -= 1 end end end
# File lib/ractor/wrapper.rb, line 651 def unregister_call(transaction) @mutex.synchronize do @current_calls.delete(transaction) end end
A worker thread repeatedly pulls a method call requests off the job queue, handles it, and sends back a response. It also removes the request from the pending request list to signal that it has responded. If no job is available, the thread blocks while waiting. If the queue is closed, the worker will send an acknowledgement message and then terminate.
# File lib/ractor/wrapper.rb, line 507 def worker_thread(worker_num) maybe_worker_log(worker_num, "Starting") loop do maybe_worker_log(worker_num, "Waiting for job") request = @queue.deq break if request.nil? handle_method(worker_num, request) unregister_call(request.transaction) end ensure maybe_worker_log(worker_num, "Stopping") ::Ractor.current.send(Message.new(:thread_stopped, data: worker_num), move: true) end