class Rex::IO::RingBuffer
Attributes
Public Class Methods
Create a new ring buffer
# File lib/ssl_scan/io/ring_buffer.rb, line 31 def initialize(socket, opts={}) self.size = opts[:size] || (1024 * 4) self.fd = socket self.seq = 0 self.beg = 0 self.cur = 0 self.queue = Array.new( self.size ) self.mutex = Mutex.new end
Public Instance Methods
The base_sequence
method returns the earliest sequence number in the queue. This is zero until all slots are filled and the ring rotates.
# File lib/ssl_scan/io/ring_buffer.rb, line 179 def base_sequence self.mutex.synchronize do return 0 if not self.queue[self.beg] return self.queue[self.beg][0] end end
The clear_data
method wipes the ring buffer
# File lib/ssl_scan/io/ring_buffer.rb, line 88 def clear_data self.mutex.synchronize do self.seq = 0 self.beg = 0 self.cur = 0 self.queue = Array.new( self.size ) end end
The create_steam method assigns a IO::Socket compatible object to the ringer buffer
# File lib/ssl_scan/io/ring_buffer.rb, line 197 def create_stream Stream.new(self) end
# File lib/ssl_scan/io/ring_buffer.rb, line 41 def inspect "#<Rex::IO::RingBuffer @size=#{size} @fd=#{fd} @seq=#{seq} @beg=#{beg} @cur=#{cur}>" end
The last_sequence
method returns the “next” sequence number where new data will be available.
# File lib/ssl_scan/io/ring_buffer.rb, line 190 def last_sequence self.seq end
The built-in monitor thread (normally unused with Metasploit)
# File lib/ssl_scan/io/ring_buffer.rb, line 63 def monitor_thread Thread.new do begin while self.fd buff = self.fd.get_once(-1, 1.0) next if not buff store_data(buff) end rescue ::Exception => e self.monitor_thread_error = e end end end
Push data back into the associated stream socket. Logging must occur elsewhere, this function is simply a passthrough.
# File lib/ssl_scan/io/ring_buffer.rb, line 81 def put(data, opts={}) self.fd.put(data, opts={}) end
The read_data
method returns a two element array with the new reader cursor (a sequence number) and the returned data buffer (if any). A result of nil/nil indicates that no data is available
# File lib/ssl_scan/io/ring_buffer.rb, line 126 def read_data(ptr=nil) self.mutex.synchronize do # Verify that there is data in the queue return [nil,nil] if not self.queue[self.beg] # Configure the beginning read pointer (sequence number, not index) ptr ||= self.queue[self.beg][0] return [nil,nil] if not ptr # If the pointer is below our baseline, we lost some data, so jump forward if ptr < self.queue[self.beg][0] ptr = self.queue[self.beg][0] end # Calculate how many blocks exist between the current sequence number # and the requested pointer, this becomes the number of blocks we will # need to read to satisfy the result. Due to the mutex block, we do # not need to scan to find the sequence of the starting block or # check the sequence of the ending block. dis = self.seq - ptr # If the requested sequnce number is less than our base pointer, it means # that no new data is available and we should return empty. return [nil,nil] if dis < 0 # Calculate the beginning block index and number of blocks to read off = ptr - self.queue[self.beg][0] set = (self.beg + off) % self.size # Build the buffer by reading forward by the number of blocks needed # and return the last read sequence number, plus one, as the new read # pointer. buff = "" cnt = 0 lst = ptr ptr.upto(self.seq) do |i| block = self.queue[ (set + cnt) % self.size ] lst,data = block[0],block[1] buff += data cnt += 1 end return [lst + 1, buff] end end
The select method returns when there is a chance of new data XXX: This is mostly useless and requires a rewrite to use a
real select or notify mechanism
# File lib/ssl_scan/io/ring_buffer.rb, line 206 def select ::IO.select([ self.fd ], nil, [ self.fd ], 0.10) end
Start the built-in monitor, not called when used in a larger framework
# File lib/ssl_scan/io/ring_buffer.rb, line 48 def start_monitor self.monitor = monitor_thread if not self.monitor end
Stop the built-in monitor
# File lib/ssl_scan/io/ring_buffer.rb, line 55 def stop_monitor self.monitor.kill if self.monitor self.monitor = nil end
The store_data
method is used to insert data into the ring buffer.
# File lib/ssl_scan/io/ring_buffer.rb, line 100 def store_data(data) self.mutex.synchronize do # self.cur points to the array index of queue containing the last item # adding data will result in cur + 1 being used to store said data # if cur is larger than size - 1, it will wrap back around. If cur # is *smaller* beg, beg is increemnted to cur + 1 (and wrapped if # necessary loc = 0 if self.seq > 0 loc = ( self.cur + 1 ) % self.size if loc <= self.beg self.beg = (self.beg + 1) % self.size end end self.queue[loc] = [self.seq += 1, data] self.cur = loc end end
The wait method blocks until new data is available
# File lib/ssl_scan/io/ring_buffer.rb, line 213 def wait(seq) nseq = nil while not nseq nseq,data = read_data(seq) select end end
The wait_for
method blocks until new data is available or the timeout is reached
# File lib/ssl_scan/io/ring_buffer.rb, line 224 def wait_for(seq,timeout=1) begin ::Timeout.timeout(timeout) do wait(seq) end rescue ::Timeout::Error end end