class Arachni::Reactor
Reactor
scheduler and and resource factory.
You're probably interested in:
* Getting access to a shared and {.global globally accessible Reactor} -- that's probably what you want. * Rest of the class methods can be used to manage it. * Creating resources like: * Cross-thread, non-blocking {#create_queue Queues}. * Asynchronous, concurrent {#create_iterator Iterators}. * Network connections to: * {#connect Connect} to a server. * {#listen Listen} for clients. * Tasks to be scheduled: * {#schedule As soon as possible}. * {#on_tick On every loop iteration}. * {#delay After a configured delay}. * {#at_interval Every few seconds}. * {#on_shutdown During shutdown}.
@author Tasos “Zapotek” Laskos <tasos.laskos@gmail.com>
Constants
- DEFAULT_OPTIONS
- VERSION
Attributes
@return [Array<Connection>]
{#attach Attached} connections.
@return [Integer,nil]
Amount of time to wait for a connection.
@return [Integer]
Amount of ticks.
Public Class Methods
@return [Reactor]
Lazy-loaded, globally accessible Reactor.
# File lib/arachni/reactor.rb, line 91 def global @reactor ||= Global.instance end
# File lib/arachni/reactor.rb, line 118 def jruby? RUBY_PLATFORM == 'java' end
@param [Hash] options @option options [Integer,nil] :max_tick_interval (0.02)
How long to wait for each tick when no connections are available for processing.
@option options [Integer] :select_timeout (0.02)
How long to wait for connection activity before continuing to the next tick.
# File lib/arachni/reactor.rb, line 130 def initialize( options = {} ) options = DEFAULT_OPTIONS.merge( options ) @max_tick_interval = options[:max_tick_interval] @select_timeout = options[:select_timeout] # Socket => Connection @connections = {} @stop = false @ticks = 0 @thread = nil @tasks = Tasks.new @error_handlers = Tasks.new @shutdown_tasks = Tasks.new @done_signal = ::Queue.new end
Stops the {.global global Reactor} instance and destroys it. The next call to {.global} will return a new instance.
# File lib/arachni/reactor.rb, line 97 def stop return if !@reactor global.stop rescue Error::NotRunning # Admittedly not the cleanest solution, but that's the only way to # force a Singleton to re-initialize -- and we want the Singleton to # cleanly implement the pattern in a Thread-safe way. global.class.instance_variable_set(:@singleton__instance__, nil) @reactor = nil end
# File lib/arachni/reactor.rb, line 110 def supports_unix_sockets? return false if jruby? !!UNIXSocket rescue NameError false end
Public Instance Methods
@note Time accuracy cannot be guaranteed.
@param [Float] interval
Time in seconds.
@param [Block] block
Schedules a {Tasks::Periodic task} to be run at every `interval` seconds.
@raise (see fail_if_not_running
)
# File lib/arachni/reactor.rb, line 454 def at_interval( interval, &block ) fail_if_not_running @tasks << Tasks::Periodic.new( interval, &block ) nil end
@note Will call {Connection::Callbacks#on_attach}.
{Connection#attach Attaches} a connection to the {Reactor} loop.
@param [Connection] connection
@raise (see fail_if_not_running
)
# File lib/arachni/reactor.rb, line 497 def attach( connection ) return if attached? connection schedule do connection.reactor = self @connections[connection.to_io] = connection connection.on_attach end end
@return [Bool]
`true` if the connection is attached, `false` otherwise.
# File lib/arachni/reactor.rb, line 526 def attached?( connection ) @connections.include? connection.to_io end
@note {Connection::Error Connection
errors} will be passed to the `handler`'s
{Connection::Callbacks#on_close} method as a `reason` argument.
Connects to a peer.
@overload connect( host, port, handler = Connection
, *handler_options )
@param [String] host @param [Integer] port @param [Connection] handler Connection handler, should be a subclass of {Connection}. @param [Hash] handler_options Options to pass to the `#initialize` method of the `handler`.
@overload connect( unix_socket, handler = Connection
, *handler_options )
@param [String] unix_socket Path to the UNIX socket to connect. @param [Connection] handler Connection handler, should be a subclass of {Connection}. @param [Hash] handler_options Options to pass to the `#initialize` method of the `handler`.
@return [Connection]
Connected instance of `handler`.
@raise (see fail_if_not_running
) @raise (see fail_if_non_unix
)
# File lib/arachni/reactor.rb, line 190 def connect( *args, &block ) fail_if_not_running options = determine_connection_options( *args ) connection = options[:handler].new( *options[:handler_options] ) connection.reactor = self block.call connection if block_given? begin Connection::Error.translate do socket = options[:unix_socket] ? connect_unix( options[:unix_socket] ) : connect_tcp connection.configure options.merge( socket: socket, role: :client ) attach connection end rescue Connection::Error => e connection.close e end connection end
@return [Reactor::Iterator]
New {Reactor::Iterator} with `self` as the scheduler.
@param [#to_a] list
List to iterate.
@param [Integer] concurrency
Parallel workers to spawn.
# File lib/arachni/reactor.rb, line 154 def create_iterator( list, concurrency = 1 ) Reactor::Iterator.new( self, list, concurrency ) end
@return [Reactor::Queue]
New {Reactor::Queue} with `self` as the scheduler.
# File lib/arachni/reactor.rb, line 160 def create_queue Reactor::Queue.new self end
@note Time accuracy cannot be guaranteed.
@param [Float] time
Time in seconds.
@param [Block] block
Schedules a {Tasks::Delayed task} to be run in `time` seconds.
@raise (see fail_if_not_running
)
# File lib/arachni/reactor.rb, line 468 def delay( time, &block ) fail_if_not_running @tasks << Tasks::Delayed.new( time, &block ) nil end
@note Will call {Connection::Callbacks#on_detach}.
{Connection#detach Detaches} a connection from the {Reactor} loop.
@param [Connection] connection
@raise (see fail_if_not_running
)
# File lib/arachni/reactor.rb, line 514 def detach( connection ) return if !attached?( connection ) schedule do connection.on_detach @connections.delete connection.to_io connection.reactor = nil end end
@return [Bool]
`true` if the caller is in the same {#thread} as the {#run reactor loop}, `false` otherwise.
@raise (see fail_if_not_running
)
# File lib/arachni/reactor.rb, line 485 def in_same_thread? fail_if_not_running Thread.current == thread end
@note {Connection::Error Connection
errors} will be passed to the `handler`'s
{Connection::Callbacks#on_close} method as a `reason` argument.
Listens for incoming connections.
@overload listen( host, port, handler = Connection
, *handler_options )
@param [String] host @param [Integer] port @param [Connection] handler Connection handler, should be a subclass of {Connection}. @param [Hash] handler_options Options to pass to the `#initialize` method of the `handler`. @raise [Connection::Error::HostNotFound] If the `host` is invalid. @raise [Connection::Error::Permission] If the `port` could not be opened due to a permission error.
@overload listen( unix_socket, handler = Connection
, *handler_options )
@param [String] unix_socket Path to the UNIX socket to create. @param [Connection] handler Connection handler, should be a subclass of {Connection}. @param [Hash] handler_options Options to pass to the `#initialize` method of the `handler`. @raise [Connection::Error::Permission] If the `unix_socket` file could not be created due to a permission error.
@return [Connection]
Listening instance of `handler`.
@raise (see fail_if_not_running
) @raise (see fail_if_non_unix
)
# File lib/arachni/reactor.rb, line 248 def listen( *args, &block ) fail_if_not_running options = determine_connection_options( *args ) server_handler = proc do c = options[:handler].new( *options[:handler_options] ) c.reactor = self block.call c if block_given? c end server = server_handler.call begin Connection::Error.translate do socket = options[:unix_socket] ? listen_unix( options[:unix_socket] ) : listen_tcp( options[:host], options[:port] ) server.configure options.merge( socket: socket, role: :server, server_handler: server_handler ) attach server end rescue Connection::Error => e server.close e end server end
@param [Block] block
Schedules a {Tasks::OneOff task} to be run at the next tick.
@raise (see fail_if_not_running
)
# File lib/arachni/reactor.rb, line 440 def next_tick( &block ) fail_if_not_running @tasks << Tasks::OneOff.new( &block ) nil end
@param [Block] block
Passes exceptions raised in the Reactor {#thread} to a {Tasks::Persistent task}.
@raise (see fail_if_not_running
)
# File lib/arachni/reactor.rb, line 392 def on_error( &block ) fail_if_not_running @error_handlers << Tasks::Persistent.new( &block ) nil end
@param [Block] block
Schedules a {Tasks::OneOff task} to be run at {#stop shutdown}.
@raise (see fail_if_not_running
)
# File lib/arachni/reactor.rb, line 430 def on_shutdown( &block ) fail_if_not_running @shutdown_tasks << Tasks::OneOff.new( &block ) nil end
@param [Block] block
Schedules a {Tasks::Persistent task} to be run at each tick.
@raise (see fail_if_not_running
)
# File lib/arachni/reactor.rb, line 402 def on_tick( &block ) fail_if_not_running @tasks << Tasks::Persistent.new( &block ) nil end
Starts the {Reactor} loop and blocks the current {#thread} until {#stop} is called.
@param [Block] block
Block to call right before initializing the loop.
@raise (see fail_if_running
)
# File lib/arachni/reactor.rb, line 298 def run( &block ) fail_if_running @done_signal.clear @thread = Thread.current block.call if block_given? loop do begin @tasks.call rescue => e @error_handlers.call( e ) end break if @stop begin process_connections rescue => e @error_handlers.call( e ) end break if @stop @ticks += 1 end @tasks.clear close_connections @shutdown_tasks.call @ticks = 0 @thread = nil @done_signal << nil end
Starts the {#run Reactor
loop}, blocks the current {#thread} while the given `block` executes and then {#stop}s it.
@param [Block] block
Block to call.
@raise (see fail_if_running
)
# File lib/arachni/reactor.rb, line 377 def run_block( &block ) fail ArgumentError, 'Missing block.' if !block_given? fail_if_running run do block.call next_tick { stop } end end
{#run Runs} the Reactor
in a thread and blocks until it is {#running?}.
@param (see run
)
@return [Thread]
{Reactor#thread}
@raise (see fail_if_running
)
# File lib/arachni/reactor.rb, line 344 def run_in_thread( &block ) fail_if_running Thread.new do begin run(&block) rescue => e @error_handlers.call( e ) end end sleep 0.1 while !running? thread end
@return [Bool]
`true` if the {Reactor} is {#run running}, `false` otherwise.
# File lib/arachni/reactor.rb, line 280 def running? thread && thread.alive? end
@param [Block] block
Schedules a task to be run as soon as possible, either immediately if the caller is {#in_same_thread? in the same thread}, or at the {#next_tick} otherwise.
@raise (see fail_if_not_running
)
# File lib/arachni/reactor.rb, line 414 def schedule( &block ) fail_if_not_running if running? && in_same_thread? block.call else next_tick(&block) end nil end
Stops the {Reactor} {#run loop} {#schedule as soon as possible}.
@raise (see fail_if_not_running
)
# File lib/arachni/reactor.rb, line 287 def stop schedule { @stop = true } end
@return [Thread, nil]
Thread of the {#run loop}, `nil` if not running.
# File lib/arachni/reactor.rb, line 476 def thread @thread end
Waits for the Reactor
to stop {#running?}.
@raise (see fail_if_not_running
)
# File lib/arachni/reactor.rb, line 363 def wait fail_if_not_running @done_signal.pop true end
Private Instance Methods
# File lib/arachni/reactor.rb, line 715 def all_sockets @connections.values.map(&:socket) end
Closes all client connections, both ingress and egress.
# File lib/arachni/reactor.rb, line 633 def close_connections @connections.values.each(&:close) end
@return [Socket]
Connected socket.
# File lib/arachni/reactor.rb, line 608 def connect_tcp socket = Socket.new( Socket::Constants::AF_INET, Socket::Constants::SOCK_STREAM, Socket::Constants::IPPROTO_IP ) socket.do_not_reverse_lookup = true socket end
@return [UNIXSocket]
Connected socket.
# File lib/arachni/reactor.rb, line 600 def connect_unix( unix_socket ) fail_if_non_unix UNIXSocket.new( unix_socket ) end
# File lib/arachni/reactor.rb, line 723 def connection_from_socket( socket ) @connections[socket.to_io] end
# File lib/arachni/reactor.rb, line 719 def connections_from_sockets( sockets ) sockets.map { |s| connection_from_socket( s ) } end
# File lib/arachni/reactor.rb, line 577 def determine_connection_options( *args ) options = {} host = port = unix_socket = nil if args[1].is_a? Integer options[:host], options[:port], options[:handler], *handler_options = *args else options[:unix_socket], options[:handler], *handler_options = *args end if !options[:unix_socket].is_a?( String ) && (!options[:host].is_a?( String ) || !options[:port].is_a?( Integer )) fail ArgumentError, 'Either a UNIX socket path or a host and port combination are required.' end options[:handler] ||= Connection options[:handler_options] = handler_options options end
@raise [Error::UNIXSocketsNotSupported]
If trying to use UNIX-domain sockets on a host OS that does not support them.
# File lib/arachni/reactor.rb, line 547 def fail_if_non_unix return if self.class.supports_unix_sockets? fail Error::UNIXSocketsNotSupported, 'The host OS does not support UNIX-domain sockets.' end
@raise [Error::NotRunning]
If the Reactor is not {#running?}.
# File lib/arachni/reactor.rb, line 534 def fail_if_not_running fail Error::NotRunning, 'Reactor is not running.' if !running? end
@raise [Error::NotRunning]
If the Reactor is already {#running?}.
# File lib/arachni/reactor.rb, line 540 def fail_if_running fail Error::AlreadyRunning, 'Reactor is already running.' if running? end
@return [TCPServer]
Listening server socket.
# File lib/arachni/reactor.rb, line 620 def listen_tcp( host, port ) server = TCPServer.new( host, port ) server.do_not_reverse_lookup = true server end
@return [UNIXServer]
Listening server socket.
# File lib/arachni/reactor.rb, line 628 def listen_unix( unix_socket ) UNIXServer.new( unix_socket ) end
# File lib/arachni/reactor.rb, line 554 def process_connections if @connections.empty? sleep @max_tick_interval return end # Required for OSX as it connects immediately and then #select returns # nothing as there's no activity, given that, OpenSSL doesn't get a chance # to do its handshake so explicitly connect pending sockets, bypassing #select. @connections.each do |_, connection| connection._connect if !connection.connected? end # Get connections with available events - :read, :write, :error. selected = select_connections # Close connections that have errors. [selected.delete(:error)].flatten.compact.each(&:close) # Call the corresponding event on the connections. selected.each { |event, connections| connections.each(&"_#{event}".to_sym) } end
@return [Array<Socket>]
Sockets of all connections, we want to be ready to read at any time.
# File lib/arachni/reactor.rb, line 694 def read_sockets s = [] @connections.each do |_, connection| next if !connection.listener? && !connection.connected? s << connection.socket end s end
@return [Hash]
Connections grouped by their available events: * `:read` -- Ready for reading (i.e. with data in their incoming buffer). * `:write` -- Ready for writing (i.e. with data in their {Connection#has_outgoing_data? outgoing buffer). * `:error`
# File lib/arachni/reactor.rb, line 645 def select_connections readables = read_sockets selected_sockets = begin Connection::Error.translate do select( readables, write_sockets, all_sockets, @select_timeout ) end rescue Connection::Error => e nil end selected_sockets ||= [[],[],[]] # SSL sockets maintain their own buffer whose state can't be checked by # Kernel.select, leading to cases where the SSL buffer isn't empty, # even though Kernel.select says that there's nothing to read. # # So force a read for SSL sockets to cover all our bases. # # This is apparent especially on JRuby. if readables.size != selected_sockets[0].size (readables - selected_sockets[0]).each do |socket| next if !socket.is_a?( OpenSSL::SSL::SSLSocket ) selected_sockets[0] << socket end end if selected_sockets[0].empty? && selected_sockets[1].empty? && selected_sockets[2].empty? return {} end { # Since these will be processed in order, it's better have the write # ones first to flush the buffers ASAP. write: connections_from_sockets( selected_sockets[1] ), read: connections_from_sockets( selected_sockets[0] ), error: connections_from_sockets( selected_sockets[2] ) } end
@return [Array<Socket>]
Sockets of connections with {Connection#has_outgoing_data? outgoing data}.
# File lib/arachni/reactor.rb, line 706 def write_sockets s = [] @connections.each do |_, connection| next if connection.connected? && !connection.has_outgoing_data? s << connection.socket end s end