class P2p2::P2Worker

Public Class Methods

new( paird_host, paird_port, title, shadow_host, shadow_port ) click to toggle source

initialize

# File lib/p2p2/p2_worker.rb, line 7
def initialize( paird_host, paird_port, title, shadow_host, shadow_port )
  @paird_host = paird_host
  @paird_port = paird_port
  @title = title
  @shadow_addr = Socket.sockaddr_in( shadow_port, shadow_host )
  @reads = []
  @writes = []
  @roles = {} # sock => :dotr / :shadow / :ctl / :tun / :src
  @src_infos = ConcurrentHash.new
  @tun_infos = ConcurrentHash.new

  new_a_pipe
  new_a_shadow
end

Public Instance Methods

looping() click to toggle source

looping

# File lib/p2p2/p2_worker.rb, line 25
def looping
  puts "#{ Time.new } looping"
  loop_check_state

  loop do
    rs, ws = IO.select( @reads, @writes )

    rs.each do | sock |
      role = @roles[ sock ]

      case role
      when :dotr then
        read_dotr( sock )
      when :shadow then
        read_shadow( sock )
      when :src then
        read_src( sock )
      when :ctl then
        read_ctl( sock )
      when :tun then
        read_tun( sock )
      else
        puts "#{ Time.new } read unknown role #{ role }"
        close_sock( sock )
      end
    end

    ws.each do | sock |
      role = @roles[ sock ]

      case role
      when :tun then
        write_tun( sock )
      when :src then
        write_src( sock )
      else
        puts "#{ Time.new } write unknown role #{ role }"
        close_sock( sock )
      end
    end
  end
rescue Interrupt => e
  puts e.class
  quit!
end
quit!() click to toggle source

quit!

# File lib/p2p2/p2_worker.rb, line 74
def quit!
  # puts "debug exit"
  exit
end

Private Instance Methods

add_read( sock, role = nil ) click to toggle source

add read

# File lib/p2p2/p2_worker.rb, line 84
def add_read( sock, role = nil )
  return if sock.nil? || sock.closed? || @reads.include?( sock )
  @reads << sock

  if role then
    @roles[ sock ] = role
  end
end
add_src_rbuff( src, data ) click to toggle source

add src rbuff

# File lib/p2p2/p2_worker.rb, line 96
def add_src_rbuff( src, data )
  return if src.nil? || src.closed?
  src_info = @src_infos[ src ]
  src_info[ :rbuff ] << data

  if src_info[ :rbuff ].bytesize >= WBUFF_LIMIT then
    puts "#{ Time.new } src.rbuff full"
    close_src( src )
  end
end
add_src_wbuff( src, data ) click to toggle source

add src wbuff

# File lib/p2p2/p2_worker.rb, line 110
def add_src_wbuff( src, data )
  return if src.nil? || src.closed?
  src_info = @src_infos[ src ]
  src_info[ :wbuff ] << data
  src_info[ :last_recv_at ] = Time.new
  add_write( src )

  if src_info[ :wbuff ].bytesize >= WBUFF_LIMIT then
    tun = src_info[ :tun ]

    if tun && !tun.closed? then
      puts "#{ Time.new } pause tun"
      @reads.delete( tun )
      tun_info = @tun_infos[ tun ]
      tun_info[ :paused ] = true
    end
  end
end
add_tun_wbuff( tun, data ) click to toggle source

add tun wbuff

# File lib/p2p2/p2_worker.rb, line 132
def add_tun_wbuff( tun, data )
  return if tun.nil? || tun.closed?
  tun_info = @tun_infos[ tun ]
  tun_info[ :wbuff ] << data
  add_write( tun )

  if tun_info[ :wbuff ].bytesize >= WBUFF_LIMIT then
    src = tun_info[ :src ]

    if src && !src.closed? then
      puts "#{ Time.new } pause src"
      @reads.delete( src )
      src_info = @src_infos[ src ]
      src_info[ :paused ] = true
    end
  end
end
add_write( sock ) click to toggle source

add write

# File lib/p2p2/p2_worker.rb, line 153
def add_write( sock )
  return if sock.nil? || sock.closed? || @writes.include?( sock )
  @writes << sock
end
close_ctl() click to toggle source

close ctl

# File lib/p2p2/p2_worker.rb, line 161
def close_ctl
  return if @ctl.nil? || @ctl.closed?
  close_sock( @ctl )
end
close_read_src( src ) click to toggle source

close read src

# File lib/p2p2/p2_worker.rb, line 169
def close_read_src( src )
  return if src.nil? || src.closed?
  # puts "debug close read src"
  src.close_read
  @reads.delete( src )

  if src.closed? then
    # puts "debug src closed"
    @writes.delete( src )
    @roles.delete( src )
    @src_infos.delete( src )
  end
end
close_read_tun( tun ) click to toggle source

close read tun

# File lib/p2p2/p2_worker.rb, line 186
def close_read_tun( tun )
  return if tun.nil? || tun.closed?
  # puts "debug close read tun"
  tun.close_read
  @reads.delete( tun )

  if tun.closed? then
    # puts "debug tun closed"
    @writes.delete( tun )
    @roles.delete( tun )
    @tun_infos.delete( tun )
  end
end
close_sock( sock ) click to toggle source

close sock

# File lib/p2p2/p2_worker.rb, line 203
def close_sock( sock )
  return if sock.nil? || sock.closed?
  sock.close
  @reads.delete( sock )
  @writes.delete( sock )
  @roles.delete( sock )
end
close_src( src ) click to toggle source

close src

# File lib/p2p2/p2_worker.rb, line 214
def close_src( src )
  return if src.nil? || src.closed?
  puts "#{ Time.new } close src"
  close_sock( src )
  @src_infos.delete( src )
end
close_tun( tun ) click to toggle source

close tun

# File lib/p2p2/p2_worker.rb, line 224
def close_tun( tun )
  return if tun.nil? || tun.closed?
  puts "#{ Time.new } close tun"
  close_sock( tun )
  @tun_infos.delete( tun )
end
close_write_src( src ) click to toggle source

close write src

# File lib/p2p2/p2_worker.rb, line 234
def close_write_src( src )
  return if src.nil? || src.closed?
  # puts "debug close write src"
  src.close_write
  @writes.delete( src )

  if src.closed? then
    # puts "debug src closed"
    @reads.delete( src )
    @roles.delete( src )
    @src_infos.delete( src )
  end
end
close_write_tun( tun ) click to toggle source

close write tun

# File lib/p2p2/p2_worker.rb, line 251
def close_write_tun( tun )
  return if tun.nil? || tun.closed?
  # puts "debug close write tun"
  tun.close_write
  @writes.delete( tun )

  if tun.closed? then
    # puts "debug tun closed"
    @reads.delete( tun )
    @roles.delete( tun )
    @tun_infos.delete( tun )
  end
end
loop_check_state() click to toggle source

loop check state

# File lib/p2p2/p2_worker.rb, line 268
def loop_check_state
  Thread.new do
    loop do
      sleep CHECK_STATE_INTERVAL
      now = Time.new

      @src_infos.select{ | src, _ | !src.closed? }.each do | src, src_info |
        last_recv_at = src_info[ :last_recv_at ] || src_info[ :created_at ]
        last_sent_at = src_info[ :last_sent_at ] || src_info[ :created_at ]
        is_expire = ( now - last_recv_at >= EXPIRE_AFTER ) && ( now - last_sent_at >= EXPIRE_AFTER )

        if is_expire then
          puts "#{ Time.new } expire src"
          src_info[ :closing ] = true
          next_tick
        elsif src_info[ :paused ] then
          tun = src_info[ :tun ]

          if tun && !tun.closed? then
            tun_info = @tun_infos[ tun ]

            if tun_info[ :wbuff ].bytesize < RESUME_BELOW then
              puts "#{ Time.new } resume src"
              add_read( src )
              src_info[ :paused ] = false
              next_tick
            end
          end
        end
      end

      @tun_infos.select{ | tun, info | !tun.closed? && info[ :paused ] }.each do | tun, tun_info |
        src = tun_info[ :src ]

        if src && !src.closed? then
          src_info = @src_infos[ src ]

          if src_info[ :wbuff ].bytesize < RESUME_BELOW then
            puts "#{ Time.new } resume tun"
            add_read( tun )
            tun_info[ :paused ] = false
            next_tick
          end
        end
      end
    end
  end
end
new_a_ctl( src ) click to toggle source

new a ctl

# File lib/p2p2/p2_worker.rb, line 320
def new_a_ctl( src )
  ctl = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 )
  ctl.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 )

  if RUBY_PLATFORM.include?( 'linux' ) then
    ctl.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, 1 )
  end

  paird_port = @paird_port + 10.times.to_a.sample
  paird_addr = Socket.sockaddr_in( paird_port, @paird_host )

  @ctl = ctl
  @ctl_info = {
    paird_addr: paird_addr,
    peer_addr: nil,
    src: src
  }

  add_read( ctl, :ctl )

  puts "#{ Time.new } find #{ @title.inspect } #{ Addrinfo.new( @ctl_info[ :paird_addr ] ).inspect }"
  send_title
end
new_a_pipe() click to toggle source

new a pipe

# File lib/p2p2/p2_worker.rb, line 347
def new_a_pipe
  dotr, dotw = IO.pipe
  @dotw = dotw
  add_read( dotr, :dotr )
end
new_a_shadow() click to toggle source

new a shadow

# File lib/p2p2/p2_worker.rb, line 356
def new_a_shadow
  shadow = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 )
  shadow.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 )
  shadow.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 )

  if RUBY_PLATFORM.include?( 'linux' )
    shadow.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, 1 )
  end

  shadow.bind( @shadow_addr )
  shadow.listen( 127 )
  puts "#{ Time.new } shadow listen on #{ shadow.local_address.ip_port }"
  add_read( shadow, :shadow )
end
new_a_tun() click to toggle source

new a tun

# File lib/p2p2/p2_worker.rb, line 374
def new_a_tun
  return if @ctl.nil? || @ctl.closed? || @ctl_info[ :peer_addr ].nil?
  src = @ctl_info[ :src ]
  return if src.nil? || src.closed?
  tun = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 )
  tun.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 )
  tun.bind( @ctl.local_address )

  begin
    tun.connect_nonblock( @ctl_info[ :peer_addr ] )
  rescue IO::WaitWritable
  rescue Exception => e
    puts "#{ Time.new } connect peer addr #{ e.class }"
    tun.close
    close_ctl
    return nil
  end

  @tun_infos[ tun ] = {
    connected: false,
    wbuff: '',
    closing_write: false,
    paused: false,
    src: src
  }

  add_read( tun, :tun )
  add_write( tun )
  src_info = @src_infos[ src ]
  src_info[ :tun ] = tun
  src_info[ :punch_times ] += 1
  puts "#{ Time.new } #{ tun.local_address.inspect } connect #{ Addrinfo.new( @ctl_info[ :peer_addr ] ).inspect } tun infos #{ @tun_infos.size }"
  tun
end
next_tick() click to toggle source

next tick

# File lib/p2p2/p2_worker.rb, line 412
def next_tick
  @dotw.write( '.' )
end
read_ctl( ctl ) click to toggle source

read ctl

# File lib/p2p2/p2_worker.rb, line 527
def read_ctl( ctl )
  if ctl.closed? then
    puts "#{ Time.new } read ctl but ctl closed?"
    return
  end

  data, addrinfo, rflags, *controls = ctl.recvmsg

  if @ctl_info[ :peer_addr ] then
    puts "#{ Time.new } peer addr already exist"
    return
  end

  if addrinfo.to_sockaddr != @ctl_info[ :paird_addr ] then
    puts "#{ Time.new } paird addr not match #{ addrinfo.inspect } #{ Addrinfo.new( @ctl_info[ :paird_addr ] ).inspect }"
    return
  end

  puts "#{ Time.new } read ctl #{ data.inspect }"
  @ctl_info[ :peer_addr ] = data
  new_a_tun
end
read_dotr( dotr ) click to toggle source

read dotr

# File lib/p2p2/p2_worker.rb, line 453
def read_dotr( dotr )
  dotr.read_nonblock( READ_SIZE )

  @src_infos.select{ | _, info | info[ :closing ] }.keys.each do | src |
    src_info = close_src( src )

    if src_info then
      close_tun( src_info[ :tun ] )
    end
  end
end
read_shadow( shadow ) click to toggle source

read shadow

# File lib/p2p2/p2_worker.rb, line 468
def read_shadow( shadow )
  begin
    src, addrinfo = shadow.accept_nonblock
  rescue IO::WaitReadable, Errno::EINTR => e
    puts "accept #{ e.class }"
    return
  end

  @src_infos[ src ] = {
    rbuff: '',
    wbuff: '',
    closing_write: false,
    closing: false,
    paused: false,
    created_at: Time.new,
    last_recv_at: nil,
    last_sent_at: nil,
    tun: nil,
    punch_times: 0
  }

  puts "#{ Time.new } accept a src #{ addrinfo.inspect } src infos #{ @src_infos.size }"
  add_read( src, :src )
  close_ctl
  new_a_ctl( src )
end
read_src( src ) click to toggle source

read src

# File lib/p2p2/p2_worker.rb, line 498
def read_src( src )
  if src.closed? then
    puts "#{ Time.new } read src but src closed?"
    return
  end

  src_info = @src_infos[ src ]
  tun = src_info[ :tun ]

  begin
    data = src.read_nonblock( READ_SIZE )
  rescue Exception => e
    puts "#{ Time.new } read src #{ e.class }"
    close_read_src( src )
    set_tun_closing_write( tun )
    return
  end

  if tun && !tun.closed? && @tun_infos[ tun ][ :connected ] then
    add_tun_wbuff( tun, data )
  else
    puts "#{ Time.new } tun not connected, save data to src.rbuff #{ data.inspect }"
    add_src_rbuff( src, data )
  end
end
read_tun( tun ) click to toggle source

read tun

# File lib/p2p2/p2_worker.rb, line 553
def read_tun( tun )
  if tun.closed? then
    puts "#{ Time.new } read tun but tun closed?"
    return
  end

  tun_info = @tun_infos[ tun ]
  src = tun_info[ :src ]

  begin
    data = tun.read_nonblock( READ_SIZE )
  rescue Errno::ECONNREFUSED => e
    src_info = @src_infos[ src ]

    if src_info[ :punch_times ] >= PUNCH_LIMIT then
      puts "#{ Time.new } out of limit"
      close_tun( tun )
      close_src( src )
      return
    end

    puts "#{ Time.new } read tun #{ e.class } #{ src_info[ :punch_times ] }"
    close_tun( tun )

    unless new_a_tun then
      close_src( src )
    end

    return
  rescue Exception => e
    puts "#{ Time.new } read tun #{ e.class }"
    close_read_tun( tun )
    set_src_closing_write( src )
    return
  end

  add_src_wbuff( src, data )
end
send_title() click to toggle source

send title

# File lib/p2p2/p2_worker.rb, line 419
def send_title
  begin
    @ctl.sendmsg( "#{ TO }#{ @title }", 0, @ctl_info[ :paird_addr ] )
  rescue Exception => e
    puts "#{ Time.new } ctl sendmsg #{ e.class }"
    close_ctl
  end
end
set_src_closing_write( src ) click to toggle source

set src closing write

# File lib/p2p2/p2_worker.rb, line 431
def set_src_closing_write( src )
  return if src.nil? || src.closed?
  src_info = @src_infos[ src ]
  return if src_info[ :closing_write ]
  src_info[ :closing_write ] = true
  add_write( src )
end
set_tun_closing_write( tun ) click to toggle source

set tun closing write

# File lib/p2p2/p2_worker.rb, line 442
def set_tun_closing_write( tun )
  return if tun.nil? || tun.closed?
  tun_info = @tun_infos[ tun ]
  return if tun_info[ :closing_write ]
  tun_info[ :closing_write ] = true
  add_write( tun )
end
write_src( src ) click to toggle source

write src

# File lib/p2p2/p2_worker.rb, line 648
def write_src( src )
  if src.closed? then
    puts "#{ Time.new } write src but src closed?"
    return
  end

  src_info = @src_infos[ src ]
  data = src_info[ :wbuff ]

  # 写前为空,处理关闭写
  if data.empty? then
    if src_info[ :closing_write ] then
      close_write_src( src )
    else
      @writes.delete( src )
    end

    return
  end

  # 写入
  begin
    written = src.write_nonblock( data )
  rescue Exception => e
    puts "#{ Time.new } write src #{ e.class }"
    close_write_src( src )
    close_read_tun( src_info[ :tun ] )
    return
  end

  data = data[ written..-1 ]
  src_info[ :wbuff ] = data
end
write_tun( tun ) click to toggle source

write tun

# File lib/p2p2/p2_worker.rb, line 595
def write_tun( tun )
  if tun.closed? then
    puts "#{ Time.new } write tun but tun closed?"
    return
  end

  tun_info = @tun_infos[ tun ]
  src = tun_info[ :src ]
  src_info = @src_infos[ src ]

  unless tun_info[ :connected ] then
    puts "#{ Time.new } connected"
    tun_info[ :connected ] = true

    if src && !src.closed? then
      tun_info[ :wbuff ] << src_info[ :rbuff ]
    end
  end

  data = tun_info[ :wbuff ]

  # 写前为空,处理关闭写
  if data.empty? then
    if tun_info[ :closing_write ] then
      close_write_tun( tun )
    else
      @writes.delete( tun )
    end

    return
  end

  # 写入
  begin
    written = tun.write_nonblock( data )
  rescue Exception => e
    puts "#{ Time.new } write tun #{ e.class }"
    close_write_tun( tun )
    close_read_src( src )
    return
  end

  data = data[ written..-1 ]
  tun_info[ :wbuff ] = data

  if src && !src.closed? then
    src_info[ :last_sent_at ] = Time.new
  end
end