module RTSP::SocatStreaming

Constants

BLOCK_SIZE
BSD_OPTIONS
H264_FMTP
H264_RTP_MAP
MP4_FMTP
MP4_RTP_MAP
RTCP_SOURCE
SOCAT_OPTIONS

Attributes

fmtp[RW]

@return [Array<String>] Media format attributes.

interface_ip[RW]

@return [String] IP address of the interface of the RTSP streamer.

pids[R]

@return [Hash] Hash of session IDs and pids.

rtcp_source_identifier[RW]

@return [String] RTCP source identifier.

rtcp_threads[R]

@return [Hash] Hash of session IDs and RTCP threads.

rtp_map[RW]

@return [Array<String>] Media type attributes.

rtp_sequence[RW]

@return [Fixnum] RTP sequence number of the source stream.

rtp_timestamp[RW]

@return [Fixnum] RTP timestamp of the source stream.

sessions[RW]

@return [Hash] Hash of session IDs and SOCAT commands.

source_ip[RW]

@return [Array<String>] IP address of the source camera.

source_port[RW]

@return [Array<Fixnum>] Port where the source camera is streaming.

Public Instance Methods

description(multicast=false, stream_index=1) click to toggle source

Returns the default stream description.

@param multicast True if the description is for a multicast stream. @param [Fixnum] stream_index Index of the stream type.

# File lib/rtsp/socat_streaming.rb, line 144
    def description(multicast=false, stream_index=1)
      rtp_map = @rtp_map[stream_index - 1] || H264_RTP_MAP
      fmtp = @fmtp[stream_index - 1] || H264_FMTP

      <<EOF
v=0\r
o=- 1345481255966282 1 IN IP4 #{@interface_ip}\r
s=Session streamed by "Streaming Server"\r
i=stream1#{multicast ? 'm' : ''}\r
t=0 0\r
a=tool:LIVE555 Streaming Media v2007.07.09\r
a=type:broadcast\r
a=control:*\r
a=range:npt=0-\r
a=x-qt-text-nam:Session streamed by "Streaming Server"\r
a=x-qt-text-inf:stream#{stream_index}#{multicast ? 'm' : ''}\r
m=video #{multicast ? @source_port[stream_index - 1] : 0} RTP/AVP 96\r
c=IN IP4 #{multicast ? "#{multicast_ip(stream_index)}/10" : "0.0.0.0"}\r
a=rtpmap:#{rtp_map}\r
a=fmtp:#{fmtp}\r
a=control:track1\r
EOF
    end
disconnect(sid) click to toggle source

Disconnects the stream matching the session ID.

@param [String] sid Session ID.

# File lib/rtsp/socat_streaming.rb, line 171
def disconnect sid
  pid = @pids[sid].to_i
  @pids.delete(sid)
  @sessions.delete(sid)
  Process.kill(9, pid) if pid > 1000
rescue Errno::ESRCH
  log "Tried to kill dead process: #{pid}"
end
generate_rtcp_source_id(friendly_name) click to toggle source

Generates a RTCP source ID based on the friendly name. This ID is used in the RTCP communication with the client. The default RTCP_SOURCE will be used if one is not provided.

@param [String] friendly_name Name to be used in the RTCP source ID. @return [String] rtcp_source_id RTCP Source ID.

# File lib/rtsp/socat_streaming.rb, line 60
def generate_rtcp_source_id friendly_name
  ["80c80006072dee6ad42c300f76c3b928377e99e5006c461ba92d8a3081ca0006072dee6a010e" +
    friendly_name.unpack("H*").first + "00000000"].pack("H*")
end
parse_sequence_number(src_ip, src_port) click to toggle source

Parses the headers from an RTP stream.

@param [String] src_ip Multicast IP address of RTP stream. @param [Fixnum] src_port Port of RTP stream. @return [Array<Fixnum>] Sequence number and timestamp

# File lib/rtsp/socat_streaming.rb, line 185
def parse_sequence_number(src_ip, src_port)
  sock = UDPSocket.new
  ip = IPAddr.new(src_ip).hton + IPAddr.new("0.0.0.0").hton
  sock.setsockopt(Socket::IPPROTO_IP, Socket::IP_ADD_MEMBERSHIP, ip)
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1)
  sock.bind(Socket::INADDR_ANY, src_port)

  begin
    data = sock.recv_nonblock(1500)
  rescue Errno::EAGAIN
    retry
  end

  sock.close
  packet = RTP::Packet.read(data)

  [packet["sequence_number"], packet["timestamp"]]
end
setup_streamer(sid, request, index=1, multicast=false) click to toggle source

Creates a RTP streamer using socat.

@param [String] sid Session ID. @param [RTSP::Requesti] reuuest RTSP request @param [Fixnum] index Stream index. @return [Fixnum] The port the streamer will stream on.

# File lib/rtsp/socat_streaming.rb, line 71
def setup_streamer(sid, request, index=1, multicast=false)
  transport_url = request.transport_url
  dest_ip, dest_port = transport_url.split ":" unless multicast

  @rtcp_source_identifier ||= RTCP_SOURCE.pack("H*")
  local_port = multicast ? @source_port[index - 1] : free_port(true)
 
  unless multicast
    @rtcp_threads[sid] = Thread.start do
      s = UDPSocket.new
      s.bind(@interface_ip, local_port+1)
 
      loop do
        begin
          _, sender = s.recvfrom(36)
          s.send(@rtcp_source_identifier, 0, sender[3], sender[1])
        end
      end
    end
  end

  @cleaner ||= Thread.start { cleanup_defunct }
  @processes ||= Sys::ProcTable.ps.map { |p| p.cmdline }

  if multicast
    if request.transport.include?('cleint_port')
      /client_port=(?<p1>\d*)-(?<p2>\d*)/ =~ request.transport
      dest_port = p1
      if dest_port == "64000"
        @sessions[sid] = :multicast
      else
        puts "Request malformed client-port not found #{request.transport}" if dest_port.nil?
        raise "Request malformed client-port not found #{request.transport}" if dest_port.nil?
        @sessions[sid] = build_socat(@source_ip[index - 1], dest_port.to_i, local_port, index)
      end
    else
      @sessions[sid] = :multicast
    end
  else
    @sessions[sid] = build_socat(dest_ip, dest_port, local_port, index)
  end

  local_port
end
start_streaming(sid) click to toggle source

Start streaming for the requested session.

@param [String] session ID.

# File lib/rtsp/socat_streaming.rb, line 123
def start_streaming sid
  spawn_socat(sid, @sessions[sid]) unless @sessions[sid] == :multicast
end
stop_streaming(sid) click to toggle source

Stop streaming for the requested session.

@param [String] session ID.

# File lib/rtsp/socat_streaming.rb, line 130
def stop_streaming sid
  if sid.nil?
    disconnect_all_streams
  else
    disconnect sid
    @rtcp_threads[sid].kill unless rtcp_threads[sid].nil?
    @rtcp_threads.delete sid
  end
end

Private Instance Methods

build_socat(target_ip, target_port, server_port, index=1) click to toggle source

@return [String] IP of the interface that would be used to talk to. @return [String] IP of the interface that would be used to talk to.

# File lib/rtsp/socat_streaming.rb, line 290
def build_socat(target_ip, target_port, server_port, index=1)
  bsd_options = BSD_OPTIONS if OS.mac?
  bsd_options ||= ""
  multicast_options = MULTICAST_OPTIONS if target_ip.start_with?("239")
  multicast_options ||= ""
 src_port = "sourceport=#{server_port},"
 src_port = "" if target_ip.start_with?("239")

  "socat -b #{BLOCK_SIZE} UDP-RECV:#{@source_port[index-1]},reuseaddr," +
    "#{bsd_options}"+ SOCAT_OPTIONS + ",ip-add-membership=#{@source_ip[index-1]}:" +
    "#{@interface_ip}#{multicast_options} UDP:#{target_ip}:#{target_port},#{src_port}" +
    SOCAT_OPTIONS + multicast_options
end
cleanup_defunct() click to toggle source

Cleans up defunct child processes

# File lib/rtsp/socat_streaming.rb, line 219
def cleanup_defunct
  loop do
    begin
      Process.wait 0
    rescue Errno::ECHILD
      sleep 10
      retry
    end
  end
end
disconnect_all_streams() click to toggle source

Disconnects all streams that are currently streaming.

# File lib/rtsp/socat_streaming.rb, line 242
def disconnect_all_streams
  @pids.values.each do |pid|
    Process.kill(9, pid.to_i) if pid.to_i > 1000 rescue Errno::ESRCH
  end

  @sessions.clear
  @pids.clear
end
find_best_interface_ipaddr(device_ip) click to toggle source

Determine the interface address that best matches an IP address. This is most useful when talking to a remote computer and needing to determine the interface that is being used for the connection.

@param [String] device_ip IP address of the remote device you want to

talk to.

@return [String] IP of the interface that would be used to talk to.

# File lib/rtsp/socat_streaming.rb, line 237
def find_best_interface_ipaddr device_ip
  UDPSocket.open { |s| s.connect(device_ip, 1); s.addr.last }
end
free_port(even=false) click to toggle source

Attempts to find a random bindable port between 50000-65500

@param [Boolean] even Return a free even port number if true. @return [Number] A random bindable port between 50000-65500 @raise [RuntimeError] When unable to locate port after 1000 attempts.

# File lib/rtsp/socat_streaming.rb, line 309
def free_port(even=false)
  1000.times do
    begin
      port = rand(15500) + 50001
      port += 1 if port % 2 != 0 && even
      socket = UDPSocket.new
      socket.bind('', port)
      return port
    rescue
      # Do nothing if bind fails; continue looping
    ensure
      socket.close
    end
  end

  raise "Unable to locate free port after 1000 attempts."
end
get_pid(cmd) click to toggle source

Gets the pid for a SOCAT command.

@param [String] cmd SOCAT command @return [Fixnum] PID of the process.

# File lib/rtsp/socat_streaming.rb, line 332
def get_pid cmd
  Sys::ProcTable.ps.each do |p|
    return p.pid.to_i if p.cmdline.include? cmd
  end
end
multicast_ip(index=1) click to toggle source

Returns the multicast IP on which the streamer will stream.

@param [Fixnum] index Stream index. @return [String] Multicast IP.

# File lib/rtsp/socat_streaming.rb, line 210
def multicast_ip index=1
  @interface_ip ||= find_best_interface_ipaddr @source_ip[index-1]
  multicast_ip = @interface_ip.split "."
  multicast_ip[0] = "239"

  multicast_ip.join "."
end
spawn_socat(sid, command) click to toggle source

Spawns an instance of Socat.

@param [String] sid The session ID of the stream. @param [String] command The SOCAT command to be spawned.

# File lib/rtsp/socat_streaming.rb, line 255
def spawn_socat(sid, command)
  @processes ||= Sys::ProcTable.ps.map { |p| p.cmdline }

  if command.nil?
    log("SOCAT command for #{sid} was nil", :warn)
    return
  end

  if @processes.include?(command)
    pid = get_pid(command)
    log "Streamer already running with pid #{pid}" if pid.is_a? Fixnum
  else
    @sessions[sid] = command

    Thread.start do
      log "Running stream spawner: #{command}"
      @processes << command
      pid = spawn command
      @pids[sid] = pid
      Thread.start { sleep 20; spawn_socat(sid, command) }
    end
  end
end