Source code for x2go.rforward

# Copyright (C) 2010-2023 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de>
#
# Python X2Go is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Python X2Go is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.

"""\
X2Go reverse SSH/Paramiko tunneling provides X2Go sound, X2Go printing and
X2Go sshfs for folder sharing and mounting remote devices in X2Go terminal
server sessions.

"""
__NAME__ = 'x2gorevtunnel-pylib'

__package__ = 'x2go'
__name__    = 'x2go.rforward'

# modules
import copy
import threading
import gevent
import paramiko

# gevent/greenlet
from gevent import select, socket, Timeout

# Python X2Go modules
from . import log


[docs] def x2go_transport_tcp_handler(chan, origin, server): """\ An X2Go customized TCP handler for the Paramiko/SSH ``Transport()`` class. Incoming channels will be put into Paramiko's default accept queue. This corresponds to the default behaviour of Paramiko's ``Transport`` class. However, additionally this handler function checks the server port of the incoming channel and detects if there are Paramiko/SSH reverse forwarding tunnels waiting for the incoming channels. The Paramiko/SSH reverse forwarding tunnels are initiated by an :class:`x2go.session.X2GoSession` instance (currently supported: reverse tunneling auf audio data, reverse tunneling of SSH requests). If the server port of an incoming Paramiko/SSH channel matches the configured port of an :class:`x2go.rforward.X2GoRevFwTunnel` instance, this instance gets notified of the incoming channel and a new :class:`x2go.rforward.X2GoRevFwChannelThread` is started. This :class:`x2go.rforward.X2GoRevFwChannelThread` then takes care of the new channel's incoming data stream. :param chan: a Paramiko channel object :type chan: ``paramiko.Channel`` object :param origin: host/port tuple where a connection originates from :type origin: ``tuple`` :param server: host/port tuple where to connect to :type server: ``tuple`` """ (origin_addr, origin_port) = origin (server_addr, server_port) = server transport = chan.get_transport() transport._queue_incoming_channel(chan) rev_tuns = transport.reverse_tunnels for session_name in list(rev_tuns.keys()): if int(server_port) in [ int(tunnel[0]) for tunnel in list(rev_tuns[session_name].values()) ]: if rev_tuns[session_name]['snd'] is not None and int(server_port) == int(rev_tuns[session_name]['snd'][0]): rev_tuns[session_name]['snd'][1].notify() elif rev_tuns[session_name]['sshfs'] is not None and int(server_port) == int(rev_tuns[session_name]['sshfs'][0]): rev_tuns[session_name]['sshfs'][1].notify()
[docs] class X2GoRevFwTunnel(threading.Thread): """\ :class:`x2go.rforward.X2GoRevFwTunnel` class objects are used to reversely tunnel X2Go audio, X2Go printing and X2Go folder sharing / device mounting through Paramiko/SSH. """ def __init__(self, server_port, remote_host, remote_port, ssh_transport, session_instance=None, logger=None, loglevel=log.loglevel_DEFAULT): """\ Setup a reverse tunnel through Paramiko/SSH. After the reverse tunnel has been setup up with :func:`X2GoRevFwTunnel.start() <x2go.rforward.X2GoRevFwTunnel.start()>` it waits for notification from :func:`X2GoRevFwTunnel.notify() <x2go.rforward.X2GoRevFwTunnel.notify()>` to accept incoming channels. This notification (:func:`X2GoRevFwTunnel.notify() <x2go.rforward.X2GoRevFwTunnel.notify()>` gets called from within the transport's TCP handler function :func:`x2go_transport_tcp_handler()` of the :class:`x2go.session.X2GoSession` instance. :param server_port: the TCP/IP port on the X2Go server (starting point of the tunnel), normally some number above 30000 :type server_port: int :param remote_host: the target address for reversely tunneled traffic. With X2Go this should always be set to the localhost (IPv4) address. :type remote_host: str :param remote_port: the TCP/IP port on the X2Go client (end point of the tunnel), normally an application's standard port (22 for SSH, 4713 for pulse audio, etc.) :type remote_port: int :param ssh_transport: the :class:`x2go.session.X2GoSession`'s Paramiko/SSH transport instance :type ssh_transport: ``paramiko.Transport`` instance :param logger: you can pass an :class:`x2go.log.X2GoLogger` object to the :class:`x2go.rforward.X2GoRevFwTunnel` constructor :type logger: :class:`x2go.log.X2GoLogger` instance :param loglevel: if no :class:`x2go.log.X2GoLogger` object has been supplied a new one will be constructed with the given loglevel :type loglevel: int """ if logger is None: self.logger = log.X2GoLogger(loglevel=loglevel) else: self.logger = copy.deepcopy(logger) self.logger.tag = __NAME__ self.server_port = server_port self.remote_host = remote_host self.remote_port = remote_port self.ssh_transport = ssh_transport self.session_instance = session_instance self.open_channels = {} self.incoming_channel = threading.Condition() threading.Thread.__init__(self) self.daemon = True self._accept_channels = True def __del__(self): """\ Class destructor. """ self.stop_thread() self.cancel_port_forward('', self.server_port)
[docs] def cancel_port_forward(self, address, port): """\ Cancel a port forwarding request. This cancellation request is sent to the server and on the server the port forwarding should be unregistered. :param address: remote server address :type address: ``str`` :param port: remote port :type port: ``int`` """ timeout = Timeout(10) timeout.start() try: self.ssh_transport.global_request('cancel-tcpip-forward', (address, port), wait=True) except: pass finally: timeout.cancel()
[docs] def pause(self): """\ Prevent acceptance of new incoming connections through the Paramiko/SSH reverse forwarding tunnel. Also, any active connection on this :class:`x2go.rforward.X2GoRevFwTunnel` instance will be closed immediately, if this method is called. """ if self._accept_channels == True: self.cancel_port_forward('', self.server_port) self._accept_channels = False self.logger('paused thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
[docs] def resume(self): """\ Resume operation of the Paramiko/SSH reverse forwarding tunnel and continue accepting new incoming connections. """ if self._accept_channels == False: self._accept_channels = True self._requested_port = self.ssh_transport.request_port_forward('127.0.0.1', self.server_port, handler=x2go_transport_tcp_handler) self.logger('resumed thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
[docs] def notify(self): """\ Notify an :class:`x2go.rforward.X2GoRevFwTunnel` instance of an incoming Paramiko/SSH channel. If an incoming reverse tunnel channel appropriate for this instance has been detected, this method gets called from the :class:`x2go.session.X2GoSession`'s transport TCP handler. The sent notification will trigger a ``thread.Condition()`` waiting for notification in :func:`X2GoRevFwTunnel.run() <x2go.rforward.X2GoRevFwTunnel.run()>`. """ self.incoming_channel.acquire() self.logger('notifying thread of incoming channel: %s' % repr(self), loglevel=log.loglevel_DEBUG) self.incoming_channel.notify() self.incoming_channel.release()
[docs] def stop_thread(self): """\ Stops this :class:`x2go.rforward.X2GoRevFwTunnel` thread completely. """ self.pause() self._keepalive = False self.logger('stopping thread: %s' % repr(self), loglevel=log.loglevel_DEBUG) self.notify()
def _request_port_forwarding(self): try: self._requested_port = self.ssh_transport.request_port_forward('127.0.0.1', self.server_port, handler=x2go_transport_tcp_handler) except paramiko.SSHException: # if port forward request fails, we try to tell the server to cancel all foregoing port forward requests on # self.server_port self.cancel_port_forward('', self.server_port) gevent.sleep(1) try: self._requested_port = self.ssh_transport.request_port_forward('127.0.0.1', self.server_port, handler=x2go_transport_tcp_handler) except paramiko.SSHException as e: if self.session_instance: self.session_instance.HOOK_rforward_request_denied(server_port=self.server_port) else: self.logger('Encountered SSHException: %s (for reverse TCP port forward with local destination port %s' % (str(e), self.server_port), loglevel=log.loglevel_WARN)
[docs] def run(self): """\ This method gets run once an :class:`x2go.rforward.X2GoRevFwTunnel` has been started with its :func:`start()` method. Use :class:`x2go.rforward.X2GoRevFwTunnel`.stop_thread() to stop the reverse forwarding tunnel again. You can also temporarily lock the tunnel down with :func:`X2GoRevFwTunnel.pause() <x2go.rforward.X2GoRevFwTunnel.pause()>` and :func:`X2GoRevFwTunnel.resume() <x2go.rforward.X2GoRevFwTunnel.resume()>`). :func:`X2GoRevFwTunnel.run() <x2go.rforward.X2GoRevFwTunnel.run()>` waits for notifications of an appropriate incoming Paramiko/SSH channel (issued by :func:`X2GoRevFwTunnel.notify() <x2go.rforward.X2GoRevFwTunnel.notify()>`). Appropriate in this context means, that its start point on the X2Go server matches the class's property ``server_port``. Once a new incoming channel gets announced by the :func:`notify()` method, a new :class:`x2go.rforward.X2GoRevFwChannelThread` instance will be initialized. As a data stream handler, the function :func:`x2go_rev_forward_channel_handler()` will be used. The channel will last till the connection gets dropped on the X2Go server side or until the tunnel gets paused by an :func:`X2GoRevFwTunnel.pause() <x2go.rforward.X2GoRevFwTunnel.pause()>` call or stopped via the :func:`X2GoRevFwTunnel.stop_thread() <x2go.rforward.X2GoRevFwTunnel.stop_thread()>` method. """ self._request_port_forwarding() self._keepalive = True while self._keepalive: self.incoming_channel.acquire() self.logger('waiting for incoming data channel on X2Go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG) self.incoming_channel.wait() if self._keepalive: self.logger('detected incoming data channel on X2Go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG) _chan = self.ssh_transport.accept() self.logger('data channel %s for server port [127.0.0.1]:%s is up' % (_chan, self.server_port), loglevel=log.loglevel_DEBUG) else: self.logger('closing down rev forwarding tunnel on remote end [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG) self.incoming_channel.release() if self._accept_channels and self._keepalive: _new_chan_thread = X2GoRevFwChannelThread(_chan, (self.remote_host, self.remote_port), target=x2go_rev_forward_channel_handler, kwargs={ 'chan': _chan, 'addr': self.remote_host, 'port': self.remote_port, 'parent_thread': self, 'logger': self.logger, } ) _new_chan_thread.start() self.open_channels['[%s]:%s' % _chan.origin_addr] = _new_chan_thread
[docs] def x2go_rev_forward_channel_handler(chan=None, addr='', port=0, parent_thread=None, logger=None, ): """\ Handle the data stream of a requested channel that got set up by a :class:`x2go.rforward.X2GoRevFwTunnel` (Paramiko/SSH reverse forwarding tunnel). The channel (and the corresponding connections) close either ... - ... if the connecting application closes the connection and thus, drops the channel, or - ... if the :class:`x2go.rforward.X2GoRevFwTunnel` parent thread gets paused. The call of :func:`X2GoRevFwTunnel.pause() <x2go.rforward.X2GoRevFwTunnel.pause()>` on the instance can be used to shut down all incoming tunneled SSH connections associated to this :class:`x2go.rforward.X2GoRevFwTunnel` instance from within a Python X2Go application. :param chan: channel (Default value = None) :type chan: ``class`` :param addr: bind address (Default value = '') :type addr: ``str`` :param port: bind port (Default value = 0) :type port: ``int`` :param parent_thread: the calling :class:`x2go.rforward.X2GoRevFwTunnel` instance (Default value = None) :type parent_thread: :class:`x2go.rforward.X2GoRevFwTunnel` instance :param logger: you can pass an :class:`x2go.log.X2GoLogger` object to the :class:`x2go.rforward.X2GoRevFwTunnel` constructor (Default value = None) :type logger: :class:`x2go.log.X2GoLogger` instance """ fw_socket = socket.socket() fw_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) if logger is None: def _dummy_logger(msg, l): pass logger = _dummy_logger try: fw_socket.connect((addr, port)) except Exception as e: logger('Reverse forwarding request to %s:%d failed: %r' % (addr, port, e), loglevel=log.loglevel_INFO) return logger('Connected! Reverse tunnel open %r -> %r -> %r' % (chan.origin_addr, chan.getpeername(), (addr, port)), loglevel=log.loglevel_INFO) n = 0 while parent_thread._accept_channels: r, w, x = select.select([fw_socket, chan], [], []) try: if fw_socket in r: data = fw_socket.recv(1024) if len(data) == 0: break chan.send(data) if chan in r: data = chan.recv(1024) if len(data) == 0: break fw_socket.send(data) except socket.error as e: logger('Reverse tunnel %s encoutered socket error: %s' % (chan, str(e)), loglevel=log.loglevel_WARN) n += 1 if n >= 1024: break chan.close() fw_socket.close() logger('Reverse tunnel %s closed from %r' % (chan, chan.origin_addr,), loglevel=log.loglevel_INFO)
[docs] class X2GoRevFwChannelThread(threading.Thread): """\ Starts a thread for each incoming Paramiko/SSH data channel trough the reverse forwarding tunnel. """ def __init__(self, channel, remote=None, **kwargs): """\ Initializes a reverse forwarding channel thread. :param channel: incoming Paramiko/SSH channel from the :class:`x2go.session.X2GoSession`'s transport accept queue :type channel: class :param remote: tuple (addr, port) that specifies the data endpoint of the channel :type remote: ``tuple(str, int)`` """ self.channel = channel if remote is not None: self.remote_host = remote[0] self.remote_port = remote[1] threading.Thread.__init__(self, **kwargs) self.daemon = True