module ShadowsocksRuby::Connections::Connection

Mixed-in code to provide fiber enabled asynchronously receive function and pressure controled send_data to EventMachine::Connection

User code should define process_hook which hopefully implement a state machine .

Note: User code should not override post_init and receive_data, it is by design.

@example

class DummyConnection < EventMachine::Connection
  include ShadowsocksRuby::Connections::Connection
  def process_hook
    @i ||= 0
    @i += 1
    puts "I'm now in a fiber enabled context: #{@fiber}"
    Fiber.yield if @i >= 3
  end
end

Constants

PressureLevel

512K, used to pause plexer when plexer.get_outbound_data_size > this value

Attributes

logger[W]

you can set logger in test code

plexer[RW]

It is where to relay peer's traffic to For a server connection, plexer is backend connection. For a backend connection, plexer is server connection. @return [Connection]

Public Instance Methods

async_recv(n) click to toggle source

Asynchronously receive n bytes from @buffer @param [Integer] n Bytes to receive, if n = -1 returns all data in @buffer @return [String] Returned n bytes data

# File lib/shadowsocks_ruby/connections/connection.rb, line 95
def async_recv n
  # wait n bytes
  if n == -1 && @buffer.size == 0 || @buffer.size < n
    @wait_length = n
    Fiber.yield
  end
  # read n bytes from buffer
  if n == -1
    s, @buffer = @buffer, String.new('', encoding: Encoding::ASCII_8BIT)
    return s
  else
    return @buffer.slice!(0, n)
  end
end
async_recv_until(str) click to toggle source

Asynchronously receive data until str (eg: ā€œ\r\nr\nā€) appears. @param [String] str Desired endding str @raise BufferOversizeError raise if cannot find str in first 65536 bytes (64K bytes)of @buffer,

enough for a HTTP request head.

@return [String] Returned data, with str at end

# File lib/shadowsocks_ruby/connections/connection.rb, line 115
def async_recv_until str
  # wait for str
  pos = @buffer =~ Regexp.new(str)
  while pos == nil
    @wait_length = -1
    Fiber.yield
    pos = @buffer =~ Regexp.new(str)
    raise BufferOversizeError, "oversized async_recv_until read" if @buffer.size > 65536
  end
  # read until str from buffer
  return @buffer.slice!(0, pos + str.length)
end
logger() click to toggle source

get the logger object, the defautl logger is App.instance.logger

# File lib/shadowsocks_ruby/connections/connection.rb, line 52
def logger
  @logger ||= App.instance.logger
end
peer() click to toggle source
# File lib/shadowsocks_ruby/connections/connection.rb, line 84
def peer
  @peer ||=
  begin
    port, ip = Socket.unpack_sockaddr_in(get_peername)
    "#{ip}:#{port}"
  end
end
post_init() click to toggle source

Initialize a fiber context and enter the process loop normally, a child class should not override post_init, it is by design @private

# File lib/shadowsocks_ruby/connections/connection.rb, line 73
def post_init
  @buffer = String.new('', encoding: Encoding::ASCII_8BIT)
  @fiber = Fiber.new do
    # poor man's state machine
    while true
      process
    end
  end
  @fiber.resume
end
pressure_control() click to toggle source

if peer receving data is too slow, pause plexer sending data to me ,prevent memery usage to be too high @private

# File lib/shadowsocks_ruby/connections/connection.rb, line 159
def pressure_control
  @plexer ||= nil
  if @plexer != nil
    if get_outbound_data_size >= PressureLevel
      @plexer.pause unless @plexer.paused?
      EventMachine.next_tick self.method(:pressure_control)
    else
      @plexer.resume if @plexer.paused?
    end
  end
end
process() click to toggle source

Call process_hook, which should be defined in user code @private

# File lib/shadowsocks_ruby/connections/connection.rb, line 66
def process
  process_hook
end
receive_data(data) click to toggle source

Provide fiber enabled data receiving, should be always be called in a fiber context.

Normally, client class should not call receive_data directlly, instead should call async_recv or async_recv_until

@param [String] data @raise OutOfFiberConextError

@private

# File lib/shadowsocks_ruby/connections/connection.rb, line 139
def receive_data data
  if @fiber.alive?
    @buffer << data
    if @wait_length == -1 || @buffer.size >= @wait_length
      @fiber.resume
    end
  else
    raise OutOfFiberContextError, "should not go here"
  end
rescue MyErrorModule => e
  logger.info {e.message}
  close_connection
rescue Exception => e
  logger.info {e.class.to_s + " " + e.message + e.backtrace.join("\n")}
  close_connection
end
send_data(data) click to toggle source

send_data with pressure control @param data Data to send asynchronously

Calls superclass method
# File lib/shadowsocks_ruby/connections/connection.rb, line 59
def send_data data
  pressure_control
  super data
end
tcp_receive_from_client(n)
Alias for: async_recv
tcp_receive_from_destination(n)
Alias for: async_recv
tcp_receive_from_localbackend(n)
Alias for: async_recv
tcp_receive_from_remoteserver(n)
Alias for: async_recv
tcp_send_to_client(data)
Alias for: send_data
tcp_send_to_destination(data)
Alias for: send_data
tcp_send_to_localbackend(data)
Alias for: send_data
tcp_send_to_remoteserver(data)
Alias for: send_data
udp_receive_from_client(n)
Alias for: async_recv
udp_receive_from_destination(n)
Alias for: async_recv
udp_receive_from_localbackend(n)
Alias for: async_recv
udp_receive_from_remoteserver(n)
Alias for: async_recv
udp_send_to_client(data)
Alias for: send_data
udp_send_to_destination(data)
Alias for: send_data
udp_send_to_localbackend(data)
Alias for: send_data
udp_send_to_remoteserver(data)
Alias for: send_data
unbind() click to toggle source

Close plexer first if it exists

# File lib/shadowsocks_ruby/connections/connection.rb, line 172
def unbind
  @plexer ||= nil
  @plexer.close_connection_after_writing if @plexer != nil
end