class Arborist::Monitor::ConnectionBatching::BatchRunner
An object that manages batching of connections and gathering results.
Attributes
The maximum number of connections to have running at any time.
An index of the current batch's connection hashes by connection.
The batch of connection hashes that are currently being selected, ordered from oldest to newest.
The Enumerator that yields connection hashes
The results hash
The Time the batch runner started.
The connection timeout from the monitor, in seconds
Public Class Methods
Create a new BatchRunner
for the specified enum
(an Enumerator)
# File lib/arborist/monitor/connection_batching.rb, line 31 def initialize( enum, batch_size, timeout ) @enum = enum @results = {} @current_batch = [] @connection_hashes = {} @start = nil @batch_size = batch_size || DEFAULT_BATCH_SIZE @timeout = timeout || DEFAULT_TIMEOUT end
Public Instance Methods
Add a new conn_hash to the currrent batch. If the conn_hash
's connection is an exception, don't add it and just add an error status for it built from the exception.
# File lib/arborist/monitor/connection_batching.rb, line 107 def add_connection( conn_hash ) if conn_hash[:conn].is_a?( ::Exception ) self.log.debug "Adding an error result for %{identifier}." % conn_hash self.results[ conn_hash[:identifier] ] = { error: conn_hash[:conn].message } else self.log.debug "Added connection for %{identifier} to the batch." % conn_hash self.current_batch.push( conn_hash ) self.connection_hashes[ conn_hash[:conn] ] = conn_hash end end
Returns true
if the current batch is at capacity.
# File lib/arborist/monitor/connection_batching.rb, line 84 def batch_full? return self.current_batch.length >= self.batch_size end
Fill the current_batch
if it's not yet at capacity and there are more connections to be made.
# File lib/arborist/monitor/connection_batching.rb, line 138 def fill_batch # If the enum is not nil and the array isn't full, fetch a new connection while self.enum && !self.batch_full? self.log.debug "Adding connections to the queue." conn_hash = self.next_connection or break self.add_connection( conn_hash ) end end
Returns true
if the runner has been run and all connections have been handled.
# File lib/arborist/monitor/connection_batching.rb, line 78 def finished? return self.start && self.enum.nil? && self.current_batch.empty? end
Fetch the next connection from the Enumerator, unsetting the enumerator and returning nil
when it reaches the end.
# File lib/arborist/monitor/connection_batching.rb, line 91 def next_connection conn_hash = self.enum.next conn_hash[:start] = Time.now conn_hash[:timeout_at] = conn_hash[:start] + self.timeout return conn_hash rescue StopIteration self.log.debug "Reached the end of the connections enum." self.enum = nil return nil end
Remove the specified conn_hash
from the current batch.
# File lib/arborist/monitor/connection_batching.rb, line 120 def remove_connection( conn_hash ) self.current_batch.delete( conn_hash ) self.connection_hashes.delete( conn_hash[:conn] ) end
Remove the connection hash for the specified socket
from the current batch and return it (if it was in the batch).
# File lib/arborist/monitor/connection_batching.rb, line 128 def remove_socket( socket ) conn_hash = self.connection_hashes.delete( socket ) self.current_batch.delete( conn_hash ) return conn_hash end
Shift any connections which have timed out off of the current batch and return the timeout of the oldest non-timed-out connection.
# File lib/arborist/monitor/connection_batching.rb, line 150 def remove_timedout_connections expired = self.current_batch.take_while do |conn_hash| conn_hash[ :timeout_at ].past? end wait_seconds = if self.current_batch.empty? 1 else self.current_batch.first[:timeout_at] - Time.now end expired.each do |conn_hash| self.remove_connection( conn_hash ) self.log.debug "Discarding timed-out socket for %{identifier}." % conn_hash elapsed = conn_hash[:timeout_at] - conn_hash[:start] self.results[ conn_hash[:identifier] ] = { error: "Timeout after %0.3fs" % [ elapsed ] } end return wait_seconds.abs end
Run the batch runner, yielding to the specified block
as each connection becomes ready.
# File lib/arborist/monitor/connection_batching.rb, line 192 def run( &block ) self.start = Time.now until self.finished? self.log.debug "Getting the status of %d connections." % [ self.current_batch.length ] self.fill_batch wait_seconds = self.remove_timedout_connections ready = self.wait_for_ready_connections( wait_seconds ) # If the select returns ready sockets # Build successful status for each ready socket now = Time.now ready.each do |sock| conn_hash = self.remove_socket( sock ) or raise "Ready socket %p was not in the current batch!" % [ sock ] identifier, start = conn_hash.values_at( :identifier, :start ) duration = now - start results[ identifier ] = block.call( conn_hash, duration ) end if ready end return Time.now - self.start end
Wait at most wait_seconds
for one of the sockets in the current batch to become ready. If any are ready before the wait_seconds
have elapsed, returns them as an Array. If wait_seconds
goes by without any sockets becoming ready, or if there were no sockets to wait on, returns nil
.
# File lib/arborist/monitor/connection_batching.rb, line 179 def wait_for_ready_connections( wait_seconds ) sockets = self.connection_hashes.keys ready = nil self.log.debug "Selecting on %d sockets." % [ sockets.length ] _, ready, _ = IO.select( nil, sockets, nil, wait_seconds ) unless sockets.empty? return ready end