class P2p2::P1Worker

Public Class Methods

new( paird_host, paird_port, title, appd_host, appd_port ) click to toggle source

initialize

# File lib/p2p2/p1_worker.rb, line 7
def initialize( paird_host, paird_port, title, appd_host, appd_port )
  @paird_host = paird_host
  @paird_port = paird_port
  @title = title
  @appd_addr = Socket.sockaddr_in( appd_port, appd_host )
  @reads = []
  @writes = []
  @roles = {} # sock => :dotr / :ctl / :tun / :dst
  @dst_infos = ConcurrentHash.new
  @tun_infos = ConcurrentHash.new

  new_a_pipe
  new_a_ctl
end

Public Instance Methods

looping() click to toggle source

looping

# File lib/p2p2/p1_worker.rb, line 25
def looping
  puts "#{ Time.new } looping"
  loop_renew_ctl
  loop_check_state

  loop do
    rs, ws = IO.select( @reads, @writes )

    rs.each do | sock |
      role = @roles[ sock ]

      case role
      when :dotr then
        read_dotr( sock )
      when :ctl then
        read_ctl( sock )
      when :tun then
        read_tun( sock )
      when :dst then
        read_dst( sock )
      else
        puts "#{ Time.new } read unknown role #{ role }"
        close_sock( sock )
      end
    end

    ws.each do | sock |
      role = @roles[ sock ]

      case role
      when :tun then
        write_tun( sock )
      when :dst then
        write_dst( sock )
      else
        puts "#{ Time.new } write unknown role #{ role }"
        close_sock( sock )
      end
    end
  end
rescue Interrupt => e
  puts e.class
  quit!
end
quit!() click to toggle source

quit!

# File lib/p2p2/p1_worker.rb, line 73
def quit!
  # puts "debug exit"
  exit
end

Private Instance Methods

add_dst_rbuff( dst, data ) click to toggle source

add dst rbuff

# File lib/p2p2/p1_worker.rb, line 83
def add_dst_rbuff( dst, data )
  return if dst.nil? || dst.closed?
  dst_info = @dst_infos[ dst ]
  dst_info[ :rbuff ] << data

  if dst_info[ :rbuff ].bytesize >= WBUFF_LIMIT then
    puts "#{ Time.new } dst.rbuff full"
    close_dst( dst )
  end
end
add_dst_wbuff( dst, data ) click to toggle source

add dst wbuff

# File lib/p2p2/p1_worker.rb, line 97
def add_dst_wbuff( dst, data )
  return if dst.nil? || dst.closed?
  dst_info = @dst_infos[ dst ]
  dst_info[ :wbuff ] << data
  dst_info[ :last_recv_at ] = Time.new
  add_write( dst )

  if dst_info[ :wbuff ].bytesize >= WBUFF_LIMIT then
    tun = dst_info[ :tun ]

    if tun && !tun.closed? then
      puts "#{ Time.new } pause tun"
      @reads.delete( tun )
      tun_info = @tun_infos[ tun ]
      tun_info[ :paused ] = true
    end
  end
end
add_read( sock, role = nil ) click to toggle source

add read

# File lib/p2p2/p1_worker.rb, line 119
def add_read( sock, role = nil )
  return if sock.nil? || sock.closed? || @reads.include?( sock )
  @reads << sock

  if role then
    @roles[ sock ] = role
  end
end
add_tun_wbuff( tun, data ) click to toggle source

add tun wbuff

# File lib/p2p2/p1_worker.rb, line 131
def add_tun_wbuff( tun, data )
  return if tun.nil? || tun.closed?
  tun_info = @tun_infos[ tun ]
  tun_info[ :wbuff ] << data
  add_write( tun )

  if tun_info[ :wbuff ].bytesize >= WBUFF_LIMIT then
    dst = tun_info[ :dst ]

    if dst && !dst.closed? then
      puts "#{ Time.new } pause dst"
      @reads.delete( dst )
      dst_info = @dst_infos[ dst ]
      dst_info[ :paused ] = true
    end
  end
end
add_write( sock ) click to toggle source

add write

# File lib/p2p2/p1_worker.rb, line 152
def add_write( sock )
  return if sock.nil? || sock.closed? || @writes.include?( sock )
  @writes << sock
end
close_ctl() click to toggle source

close ctl

# File lib/p2p2/p1_worker.rb, line 160
def close_ctl
  return if @ctl.nil? || @ctl.closed?
  close_sock( @ctl )
end
close_dst( dst ) click to toggle source

close dst

# File lib/p2p2/p1_worker.rb, line 168
def close_dst( dst )
  return if dst.nil? || dst.closed?
  puts "#{ Time.new } close dst"
  close_sock( dst )
  @dst_infos.delete( dst )
end
close_read_dst( dst ) click to toggle source

close read dst

# File lib/p2p2/p1_worker.rb, line 178
def close_read_dst( dst )
  return if dst.nil? || dst.closed?
  # puts "debug close read dst"
  dst.close_read
  @reads.delete( dst )

  if dst.closed? then
    # puts "debug dst closed"
    @writes.delete( dst )
    @roles.delete( dst )
    @dst_infos.delete( dst )
  end
end
close_read_tun( tun ) click to toggle source

close read tun

# File lib/p2p2/p1_worker.rb, line 195
def close_read_tun( tun )
  return if tun.nil? || tun.closed?
  # puts "debug close read tun"
  tun.close_read
  @reads.delete( tun )

  if tun.closed? then
    # puts "debug tun closed"
    @writes.delete( tun )
    @roles.delete( tun )
    @tun_infos.delete( tun )
  end
end
close_sock( sock ) click to toggle source

close sock

# File lib/p2p2/p1_worker.rb, line 212
def close_sock( sock )
  return if sock.nil? || sock.closed?
  sock.close
  @reads.delete( sock )
  @writes.delete( sock )
  @roles.delete( sock )
end
close_tun( tun ) click to toggle source

close tun

# File lib/p2p2/p1_worker.rb, line 223
def close_tun( tun )
  return if tun.nil? || tun.closed?
  puts "#{ Time.new } close tun"
  close_sock( tun )
  @tun_infos.delete( tun )
end
close_write_dst( dst ) click to toggle source

close write dst

# File lib/p2p2/p1_worker.rb, line 233
def close_write_dst( dst )
  return if dst.nil? || dst.closed?
  # puts "debug close write dst"
  dst.close_write
  @writes.delete( dst )

  if dst.closed? then
    # puts "debug dst closed"
    @reads.delete( dst )
    @roles.delete( dst )
    @dst_infos.delete( dst )
  end
end
close_write_tun( tun ) click to toggle source

close write tun

# File lib/p2p2/p1_worker.rb, line 250
def close_write_tun( tun )
  return if tun.nil? || tun.closed?
  # puts "debug close write tun"
  tun.close_write
  @writes.delete( tun )

  if tun.closed? then
    # puts "debug tun closed"
    @reads.delete( tun )
    @roles.delete( tun )
    @tun_infos.delete( tun )
  end
end
loop_check_state() click to toggle source

loop check state

# File lib/p2p2/p1_worker.rb, line 267
def loop_check_state
  Thread.new do
    loop do
      sleep CHECK_STATE_INTERVAL
      now = Time.new

      @dst_infos.select{ | dst, _ | !dst.closed? }.each do | dst, dst_info |
        last_recv_at = dst_info[ :last_recv_at ] || dst_info[ :created_at ]
        last_sent_at = dst_info[ :last_sent_at ] || dst_info[ :created_at ]
        is_expire = ( now - last_recv_at >= EXPIRE_AFTER ) && ( now - last_sent_at >= EXPIRE_AFTER )

        if is_expire then
          puts "#{ Time.new } expire dst"
          dst_info[ :closing ] = true
          next_tick
        elsif dst_info[ :paused ] then
          tun = dst_info[ :tun ]

          if tun && !tun.closed? then
            tun_info = @tun_infos[ tun ]

            if tun_info[ :wbuff ].bytesize < RESUME_BELOW then
              puts "#{ Time.new } resume dst"
              add_read( dst )
              dst_info[ :paused ] = false
              next_tick
            end
          end
        end
      end

      @tun_infos.select{ | tun, info | !tun.closed? && info[ :paused ] }.each do | tun, tun_info |
        dst = tun_info[ :dst ]

        if dst && !dst.closed? then
          dst_info = @dst_infos[ dst ]

          if dst_info[ :wbuff ].bytesize < RESUME_BELOW then
            puts "#{ Time.new } resume tun"
            add_read( tun )
            tun_info[ :paused ] = false
            next_tick
          end
        end
      end
    end
  end
end
loop_renew_ctl() click to toggle source

loop renew ctl

# File lib/p2p2/p1_worker.rb, line 319
def loop_renew_ctl
  Thread.new do
    loop do
      sleep RENEW_CTL_INTERVAL
      set_ctl_closing
    end
  end
end
new_a_ctl() click to toggle source

new a ctl

# File lib/p2p2/p1_worker.rb, line 331
def new_a_ctl
  ctl = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 )
  ctl.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 )

  if RUBY_PLATFORM.include?( 'linux' ) then
    ctl.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEPORT, 1 )
  end

  paird_port = @paird_port + 10.times.to_a.sample
  paird_addr = Socket.sockaddr_in( paird_port, @paird_host )

  @ctl = ctl
  @ctl_info = {
    paird_addr: paird_addr,
    peer_addr: nil,
    dst: nil,
    closing: false
  }

  add_read( ctl, :ctl )

  puts "#{ Time.new } #{ @title.inspect } #{ Addrinfo.new( @ctl_info[ :paird_addr ] ).inspect }"
  send_title
end
new_a_dst() click to toggle source

new a dst

# File lib/p2p2/p1_worker.rb, line 359
def new_a_dst
  dst = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 )
  dst.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 )

  begin
    dst.connect_nonblock( @appd_addr )
  rescue IO::WaitWritable
  rescue Exception => e
    puts "#{ Time.new } dst connect appd addr #{ e.class }"
    dst.close
    renew_ctl
    return nil
  end

  @dst_infos[ dst ] = {
    rbuff: '',
    wbuff: '',
    closing_write: false,
    closing: false,
    paused: false,
    created_at: Time.new,
    last_recv_at: nil,
    last_sent_at: nil,
    tun: nil,
    punch_times: 0
  }

  puts "#{ Time.new } dst infos #{ @dst_infos.size }"
  add_read( dst, :dst )
  dst
end
new_a_pipe() click to toggle source

new a pipe

# File lib/p2p2/p1_worker.rb, line 394
def new_a_pipe
  dotr, dotw = IO.pipe
  @dotw = dotw
  add_read( dotr, :dotr )
end
new_a_tun() click to toggle source

new a tun

# File lib/p2p2/p1_worker.rb, line 403
def new_a_tun
  return if @ctl.nil? || @ctl.closed? || @ctl_info[ :peer_addr ].nil?
  dst = @ctl_info[ :dst ]
  return if dst.nil? || dst.closed?
  tun = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 )
  tun.setsockopt( Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 )
  tun.bind( @ctl.local_address )

  begin
    tun.connect_nonblock( @ctl_info[ :peer_addr ] )
  rescue IO::WaitWritable
  rescue Exception => e
    puts "#{ Time.new } connect peer addr #{ e.class }"
    tun.close
    renew_ctl
    return nil
  end

  @tun_infos[ tun ] = {
    connected: false,
    wbuff: '',
    closing_write: false,
    paused: false,
    dst: dst
  }

  add_read( tun, :tun )
  add_write( tun )
  dst_info = @dst_infos[ dst ]
  dst_info[ :tun ] = tun
  dst_info[ :punch_times ] += 1
  puts "#{ Time.new } #{ tun.local_address.inspect } connect #{ Addrinfo.new( @ctl_info[ :peer_addr ] ).inspect } tun infos #{ @tun_infos.size }"
  tun
end
next_tick() click to toggle source

next tick

# File lib/p2p2/p1_worker.rb, line 441
def next_tick
  @dotw.write( '.' )
end
read_ctl( ctl ) click to toggle source

read ctl

# File lib/p2p2/p1_worker.rb, line 528
def read_ctl( ctl )
  if ctl.closed? then
    puts "#{ Time.new } read ctl but ctl closed?"
    return
  end

  data, addrinfo, rflags, *controls = ctl.recvmsg

  if @ctl_info[ :peer_addr ] then
    puts "#{ Time.new } peer addr already exist"
    return
  end

  if addrinfo.to_sockaddr != @ctl_info[ :paird_addr ] then
    puts "#{ Time.new } paird addr not match #{ addrinfo.inspect } #{ Addrinfo.new( @ctl_info[ :paird_addr ] ).inspect }"
    return
  end

  puts "#{ Time.new } read ctl #{ data.inspect }"
  @ctl_info[ :peer_addr ] = data
  @ctl_info[ :dst ] = new_a_dst
  new_a_tun
end
read_dotr( dotr ) click to toggle source

read dotr

# File lib/p2p2/p1_worker.rb, line 509
def read_dotr( dotr )
  dotr.read_nonblock( READ_SIZE )

  if @ctl && !@ctl.closed? && @ctl_info[ :closing ] then
    renew_ctl
  end

  @dst_infos.select{ | _, info | info[ :closing ] }.keys.each do | dst |
    dst_info = close_dst( dst )

    if dst_info then
      close_tun( dst_info[ :tun ] )
    end
  end
end
read_dst( dst ) click to toggle source

read dst

# File lib/p2p2/p1_worker.rb, line 606
def read_dst( dst )
  if dst.closed? then
    puts "#{ Time.new } read dst but dst closed?"
    return
  end

  dst_info = @dst_infos[ dst ]
  tun = dst_info[ :tun ]

  begin
    data = dst.read_nonblock( READ_SIZE )
  rescue Exception => e
    puts "#{ Time.new } read dst #{ e.class }"
    close_read_dst( dst )
    set_tun_closing_write( tun )
    renew_paired_ctl
    return
  end

  if tun && !tun.closed? && @tun_infos[ tun ][ :connected ] then
    add_tun_wbuff( tun, data )
  else
    puts "#{ Time.new } tun not connected, save data to dst.rbuff #{ data.inspect }"
    add_dst_rbuff( dst, data )
  end
end
read_tun( tun ) click to toggle source

read tun

# File lib/p2p2/p1_worker.rb, line 555
def read_tun( tun )
  if tun.closed? then
    puts "#{ Time.new } read tun but tun closed?"
    return
  end

  tun_info = @tun_infos[ tun ]
  dst = tun_info[ :dst ]

  if dst.nil? || dst.closed? then
    puts "#{ Time.new } read tun but dst already closed"
    close_tun( tun )
    return
  end

  begin
    data = tun.read_nonblock( READ_SIZE )
  rescue Errno::ECONNREFUSED => e
    dst_info = @dst_infos[ dst ]

    if dst_info[ :punch_times ] >= PUNCH_LIMIT then
      puts "#{ Time.new } out of limit"
      close_tun( tun )
      close_dst( dst )
      renew_ctl
      return
    end

    puts "#{ Time.new } read tun #{ e.class } #{ dst_info[ :punch_times ] }"
    close_tun( tun )

    unless new_a_tun then
      close_dst( dst )
      renew_ctl
    end

    return
  rescue Exception => e
    puts "#{ Time.new } read tun #{ e.class }"
    close_read_tun( tun )
    set_dst_closing_write( dst )
    renew_paired_ctl
    return
  end

  add_dst_wbuff( dst, data )
end
renew_ctl() click to toggle source

renew ctl

# File lib/p2p2/p1_worker.rb, line 448
def renew_ctl
  close_ctl
  new_a_ctl
end
renew_paired_ctl() click to toggle source

renew paired ctl

# File lib/p2p2/p1_worker.rb, line 456
def renew_paired_ctl
  if @ctl && !@ctl.closed? && @ctl_info[ :peer_addr ] then
    puts "#{ Time.new } renew paired ctl"
    renew_ctl
  end
end
send_title() click to toggle source

send title

# File lib/p2p2/p1_worker.rb, line 466
def send_title
  begin
    @ctl.sendmsg( @title, 0, @ctl_info[ :paird_addr ] )
  rescue Exception => e
    puts "#{ Time.new } ctl sendmsg #{ e.class }"
    set_ctl_closing
  end
end
set_ctl_closing() click to toggle source

set ctl closing

# File lib/p2p2/p1_worker.rb, line 478
def set_ctl_closing
  return if @ctl.nil? || @ctl.closed? || @ctl_info[ :closing ]
  @ctl_info[ :closing ] = true
  next_tick
end
set_dst_closing_write( dst ) click to toggle source

set dst closing write

# File lib/p2p2/p1_worker.rb, line 487
def set_dst_closing_write( dst )
  return if dst.nil? || dst.closed?
  dst_info = @dst_infos[ dst ]
  return if dst_info[ :closing_write ]
  dst_info[ :closing_write ] = true
  add_write( dst )
end
set_tun_closing_write( tun ) click to toggle source

set tun closing write

# File lib/p2p2/p1_worker.rb, line 498
def set_tun_closing_write( tun )
  return if tun.nil? || tun.closed?
  tun_info = @tun_infos[ tun ]
  return if tun_info[ :closing_write ]
  tun_info[ :closing_write ] = true
  add_write( tun )
end
write_dst( dst ) click to toggle source

write dst

# File lib/p2p2/p1_worker.rb, line 692
def write_dst( dst )
  if dst.closed? then
    puts "#{ Time.new } write dst but dst closed?"
    return
  end

  dst_info = @dst_infos[ dst ]
  data = dst_info[ :wbuff ]

  # 写前为空,处理关闭写
  if data.empty? then
    if dst_info[ :closing_write ] then
      close_write_dst( dst )
    else
      @writes.delete( dst )
    end

    return
  end

  # 写入
  begin
    written = dst.write_nonblock( data )
  rescue Exception => e
    puts "#{ Time.new } write dst #{ e.class }"
    close_write_dst( dst )
    close_read_tun( dst_info[ :tun ] )
    renew_paired_ctl
    return
  end

  data = data[ written..-1 ]
  dst_info[ :wbuff ] = data
end
write_tun( tun ) click to toggle source

write tun

# File lib/p2p2/p1_worker.rb, line 636
def write_tun( tun )
  if tun.closed? then
    puts "#{ Time.new } write tun but tun closed?"
    return
  end

  tun_info = @tun_infos[ tun ]
  dst = tun_info[ :dst ]
  dst_info = @dst_infos[ dst ]

  unless tun_info[ :connected ] then
    puts "#{ Time.new } connected"
    tun_info[ :connected ] = true

    if dst && !dst.closed? then
      tun_info[ :wbuff ] << dst_info[ :rbuff ]
    end

    renew_ctl
  end

  data = tun_info[ :wbuff ]

  # 写前为空,处理关闭写
  if data.empty? then
    if tun_info[ :closing_write ] then
      close_write_tun( tun )
    else
      @writes.delete( tun )
    end

    return
  end

  # 写入
  begin
    written = tun.write_nonblock( data )
  rescue Exception => e
    puts "#{ Time.new } write tun #{ e.class }"
    close_write_tun( tun )
    close_read_dst( dst )
    renew_paired_ctl
    return
  end

  data = data[ written..-1 ]
  tun_info[ :wbuff ] = data

  if dst && !dst.closed? then
    dst_info[ :last_sent_at ] = Time.new
  end
end