Package coprs :: Package views :: Module misc
[hide private]
[frames] | no frames]

Source Code for Module coprs.views.misc

  1  import os 
  2  import base64 
  3  import datetime 
  4  import functools 
  5  from functools import wraps, partial 
  6   
  7  from netaddr import IPAddress, IPNetwork 
  8  import re 
  9  import flask 
 10  from flask import send_file 
 11   
 12  from urllib.parse import urlparse 
 13  from openid_teams.teams import TeamsRequest 
 14   
 15  from copr_common.enums import RoleEnum 
 16  from coprs import app 
 17  from coprs import db 
 18  from coprs import helpers 
 19  from coprs import models 
 20  from coprs import oid 
 21  from coprs.logic.complex_logic import ComplexLogic 
 22  from coprs.logic.users_logic import UsersLogic 
 23  from coprs.logic.coprs_logic import CoprsLogic 
24 25 26 -def create_user_wrapper(username, email, timezone=None):
27 expiration_date_token = datetime.date.today() + \ 28 datetime.timedelta( 29 days=flask.current_app.config["API_TOKEN_EXPIRATION"]) 30 31 copr64 = base64.b64encode(b"copr") + b"##" 32 user = models.User(username=username, mail=email, 33 timezone=timezone, 34 api_login=copr64.decode("utf-8") + helpers.generate_api_token( 35 app.config["API_TOKEN_LENGTH"] - len(copr64)), 36 api_token=helpers.generate_api_token( 37 app.config["API_TOKEN_LENGTH"]), 38 api_token_expiration=expiration_date_token) 39 return user
40
41 42 -def fed_raw_name(oidname):
43 oidname_parse = urlparse(oidname) 44 if not oidname_parse.netloc: 45 return oidname 46 config_parse = urlparse(app.config["OPENID_PROVIDER_URL"]) 47 return oidname_parse.netloc.replace(".{0}".format(config_parse.netloc), "")
48
49 50 -def krb_straighten_username(krb_remote_user):
51 # Input should look like 'USERNAME@REALM.TLD', strip realm. 52 username = re.sub(r'@.*', '', krb_remote_user) 53 54 # But USERNAME part can consist of USER/DOMAIN.TLD. 55 # TODO: Do we need more clever thing here? 56 username = re.sub('/', '_', username) 57 58 # Based on restrictions for project name: "letters, digits, underscores, 59 # dashes and dots", it is worth limitting the username here, too. 60 # TODO: Store this pattern on one place. 61 return username if re.match(r"^[\w.-]+$", username) else None
62
63 64 @app.before_request 65 -def set_empty_user():
66 flask.g.user = None
67
68 69 @app.before_request 70 -def lookup_current_user():
71 flask.g.user = username = None 72 if "openid" in flask.session: 73 username = fed_raw_name(flask.session["openid"]) 74 elif "krb5_login" in flask.session: 75 username = flask.session["krb5_login"] 76 77 if username: 78 flask.g.user = models.User.query.filter( 79 models.User.username == username).first()
80
81 82 @app.errorhandler(404) 83 -def page_not_found(message):
84 return flask.render_template("404.html", message=message), 404
85
86 87 @app.errorhandler(403) 88 -def access_restricted(message):
89 return flask.render_template("403.html", message=message), 403
90
91 92 -def generic_error(message, code=500, title=None):
93 """ 94 :type message: str 95 :type err: CoprHttpException 96 """ 97 return flask.render_template("_error.html", 98 message=message, 99 error_code=code, 100 error_title=title), code
101 102 103 server_error_handler = partial(generic_error, code=500, title="Internal Server Error") 104 bad_request_handler = partial(generic_error, code=400, title="Bad Request") 105 106 app.errorhandler(500)(server_error_handler) 107 app.errorhandler(400)(bad_request_handler) 108 109 misc = flask.Blueprint("misc", __name__) 110 111 112 @misc.route(app.config['KRB5_LOGIN_BASEURI'] + "<name>/", methods=["GET"])
113 -def krb5_login(name):
114 """ 115 Handle the Kerberos authentication. 116 117 Note that if we are able to get here, either the user is authenticated 118 correctly, or apache is mis-configured and it does not perform KRB 119 authentication at all. Note also, even if that can be considered ugly, we 120 are reusing oid's get_next_url feature with kerberos login. 121 """ 122 123 # Already logged in? 124 if flask.g.user is not None: 125 return flask.redirect(oid.get_next_url()) 126 127 krb_config = app.config['KRB5_LOGIN'] 128 129 found = None 130 for key in krb_config.keys(): 131 if krb_config[key]['URI'] == name: 132 found = key 133 break 134 135 if not found: 136 # no KRB5_LOGIN.<name> configured in copr.conf 137 return flask.render_template("404.html"), 404 138 139 if app.config["DEBUG"] and 'TEST_REMOTE_USER' in os.environ: 140 # For local testing (without krb5 keytab and other configuration) 141 flask.request.environ['REMOTE_USER'] = os.environ['TEST_REMOTE_USER'] 142 143 if 'REMOTE_USER' not in flask.request.environ: 144 nocred = "Kerberos authentication failed (no credentials provided)" 145 return flask.render_template("403.html", message=nocred), 403 146 147 krb_username = flask.request.environ['REMOTE_USER'] 148 app.logger.debug("krb5 login attempt: " + krb_username) 149 username = krb_straighten_username(krb_username) 150 if not username: 151 message = "invalid krb5 username: " + krb_username 152 return flask.render_template("403.html", message=message), 403 153 154 krb_login = ( 155 models.Krb5Login.query 156 .filter(models.Krb5Login.config_name == key) 157 .filter(models.Krb5Login.primary == username) 158 .first() 159 ) 160 if krb_login: 161 flask.g.user = krb_login.user 162 flask.session['krb5_login'] = krb_login.user.name 163 flask.flash(u"Welcome, {0}".format(flask.g.user.name), "success") 164 return flask.redirect(oid.get_next_url()) 165 166 # We need to create row in 'krb5_login' table 167 user = models.User.query.filter(models.User.username == username).first() 168 if not user: 169 # Even the item in 'user' table does not exist, create _now_ 170 email = username + "@" + krb_config[key]['email_domain'] 171 user = create_user_wrapper(username, email) 172 db.session.add(user) 173 174 krb_login = models.Krb5Login(user=user, primary=username, config_name=key) 175 db.session.add(krb_login) 176 db.session.commit() 177 178 flask.flash(u"Welcome, {0}".format(user.name), "success") 179 flask.g.user = user 180 flask.session['krb5_login'] = user.name 181 return flask.redirect(oid.get_next_url())
182
183 184 @misc.route("/login/", methods=["GET"]) 185 @oid.loginhandler 186 -def login():
187 if not app.config['FAS_LOGIN']: 188 if app.config['KRB5_LOGIN']: 189 return krb5_login_redirect(next=oid.get_next_url()) 190 flask.flash("No auth method available", "error") 191 return flask.redirect(flask.url_for("coprs_ns.coprs_show")) 192 193 if flask.g.user is not None: 194 return flask.redirect(oid.get_next_url()) 195 else: 196 # a bit of magic 197 team_req = TeamsRequest(["_FAS_ALL_GROUPS_"]) 198 return oid.try_login(app.config["OPENID_PROVIDER_URL"], 199 ask_for=["email", "timezone"], 200 extensions=[team_req])
201
202 203 @oid.after_login 204 -def create_or_login(resp):
205 flask.session["openid"] = resp.identity_url 206 fasusername = fed_raw_name(resp.identity_url) 207 208 # kidding me.. or not 209 if fasusername and ( 210 ( 211 app.config["USE_ALLOWED_USERS"] and 212 fasusername in app.config["ALLOWED_USERS"] 213 ) or not app.config["USE_ALLOWED_USERS"]): 214 215 username = fed_raw_name(resp.identity_url) 216 user = models.User.query.filter( 217 models.User.username == username).first() 218 if not user: # create if not created already 219 user = create_user_wrapper(username, resp.email, resp.timezone) 220 else: 221 user.mail = resp.email 222 user.timezone = resp.timezone 223 if "lp" in resp.extensions: 224 team_resp = resp.extensions['lp'] # name space for the teams extension 225 user.openid_groups = {"fas_groups": team_resp.teams} 226 227 db.session.add(user) 228 db.session.commit() 229 flask.flash(u"Welcome, {0}".format(user.name), "success") 230 flask.g.user = user 231 232 if flask.request.url_root == oid.get_next_url(): 233 return flask.redirect(flask.url_for("coprs_ns.coprs_by_user", 234 username=user.name)) 235 return flask.redirect(oid.get_next_url()) 236 else: 237 flask.flash("User '{0}' is not allowed".format(fasusername)) 238 return flask.redirect(oid.get_next_url())
239
240 241 @misc.route("/logout/") 242 -def logout():
243 flask.session.pop("openid", None) 244 flask.session.pop("krb5_login", None) 245 flask.flash(u"You were signed out") 246 return flask.redirect(oid.get_next_url())
247
248 249 -def api_login_required(f):
250 @functools.wraps(f) 251 def decorated_function(*args, **kwargs): 252 token = None 253 apt_login = None 254 if "Authorization" in flask.request.headers: 255 base64string = flask.request.headers["Authorization"] 256 base64string = base64string.split()[1].strip() 257 userstring = base64.b64decode(base64string) 258 (apt_login, token) = userstring.decode("utf-8").split(":") 259 token_auth = False 260 if token and apt_login: 261 user = UsersLogic.get_by_api_login(apt_login).first() 262 if (user and user.api_token == token and 263 user.api_token_expiration >= datetime.date.today()): 264 265 if user.proxy and "username" in flask.request.form: 266 user = UsersLogic.get(flask.request.form["username"]).first() 267 268 token_auth = True 269 flask.g.user = user 270 if not token_auth: 271 url = 'https://' + app.config["PUBLIC_COPR_HOSTNAME"] 272 url = helpers.fix_protocol_for_frontend(url) 273 274 output = { 275 "output": "notok", 276 "error": "Login invalid/expired. Please visit {0}/api to get or renew your API token.".format(url), 277 } 278 jsonout = flask.jsonify(output) 279 jsonout.status_code = 401 280 return jsonout 281 return f(*args, **kwargs)
282 return decorated_function 283
284 285 -def krb5_login_redirect(next=None):
286 krbc = app.config['KRB5_LOGIN'] 287 for key in krbc: 288 # Pick the first one for now. 289 return flask.redirect(flask.url_for("misc.krb5_login", 290 name=krbc[key]['URI'], 291 next=next)) 292 flask.flash("Unable to pick krb5 login page", "error") 293 return flask.redirect(flask.url_for("coprs_ns.coprs_show"))
294
295 296 -def login_required(role=RoleEnum("user")):
297 def view_wrapper(f): 298 @functools.wraps(f) 299 def decorated_function(*args, **kwargs): 300 if flask.g.user is None: 301 return flask.redirect(flask.url_for("misc.login", 302 next=flask.request.url)) 303 304 if role == RoleEnum("admin") and not flask.g.user.admin: 305 flask.flash("You are not allowed to access admin section.") 306 return flask.redirect(flask.url_for("coprs_ns.coprs_show")) 307 308 return f(*args, **kwargs)
309 return decorated_function 310 # hack: if login_required is used without params, the "role" parameter 311 # is in fact the decorated function, so we need to return 312 # the wrapped function, not the wrapper 313 # proper solution would be to use login_required() with parentheses 314 # everywhere, even if they"re empty - TODO 315 if callable(role): 316 return view_wrapper(role) 317 else: 318 return view_wrapper 319
320 321 # backend authentication 322 -def backend_authenticated(f):
323 @functools.wraps(f) 324 def decorated_function(*args, **kwargs): 325 auth = flask.request.authorization 326 if not auth or auth.password != app.config["BACKEND_PASSWORD"]: 327 return "You have to provide the correct password\n", 401 328 329 return f(*args, **kwargs)
330 return decorated_function 331
332 333 -def intranet_required(f):
334 @functools.wraps(f) 335 def decorated_function(*args, **kwargs): 336 ip_addr = IPAddress(flask.request.remote_addr) 337 accept_ranges = set(app.config.get("INTRANET_IPS", [])) 338 accept_ranges.add("127.0.0.1") # always accept from localhost 339 if not any(ip_addr in IPNetwork(addr_or_net) for addr_or_net in accept_ranges): 340 return ("Stats can be update only from intranet hosts, " 341 "not {}, check config\n".format(flask.request.remote_addr)), 403 342 343 return f(*args, **kwargs)
344 return decorated_function 345
346 347 -def req_with_copr(f):
348 @wraps(f) 349 def wrapper(**kwargs): 350 coprname = kwargs.pop("coprname") 351 if "group_name" in kwargs: 352 group_name = kwargs.pop("group_name") 353 copr = ComplexLogic.get_group_copr_safe(group_name, coprname, with_mock_chroots=True) 354 else: 355 username = kwargs.pop("username") 356 copr = ComplexLogic.get_copr_safe(username, coprname, with_mock_chroots=True) 357 return f(copr, **kwargs)
358 return wrapper 359
360 361 @misc.route("/migration-report/") 362 @misc.route("/migration-report/<username>") 363 -def coprs_migration_report(username=None):
364 if not username and not flask.g.user: 365 return generic_error("You are not logged in") 366 elif not username: 367 username = flask.g.user.name 368 user = UsersLogic.get(username).first() 369 370 coprs = CoprsLogic.filter_without_group_projects(CoprsLogic.get_multiple_owned_by_username(username)).all() 371 for group in UsersLogic.get_groups_by_fas_names_list(user.user_teams).all(): 372 coprs.extend(CoprsLogic.get_multiple_by_group_id(group.id).all()) 373 374 return render_migration_report(coprs, user=user)
375
376 377 @misc.route("/migration-report/g/<group_name>") 378 -def group_coprs_migration_report(group_name=None):
379 group = ComplexLogic.get_group_by_name_safe(group_name) 380 coprs = CoprsLogic.get_multiple_by_group_id(group.id) 381 return render_migration_report(coprs, group=group)
382
383 384 -def render_migration_report(coprs, user=None, group=None):
385 return flask.render_template("migration-report.html", 386 user=user, 387 group=group, 388 coprs=coprs)
389
390 391 -def send_build_icon(build):
392 if not build: 393 return send_file("static/status_images/unknown.png", 394 mimetype='image/png') 395 396 if build.state in ["importing", "pending", "starting", "running"]: 397 # The icon is about to change very soon, disable caches: 398 # https://help.github.com/articles/about-anonymized-image-urls/ 399 response = send_file("static/status_images/in_progress.png", 400 mimetype='image/png') 401 response.headers['Cache-Control'] = 'no-cache' 402 return response 403 404 if build.state in ["succeeded", "skipped"]: 405 return send_file("static/status_images/succeeded.png", 406 mimetype='image/png') 407 408 if build.state == "failed": 409 return send_file("static/status_images/failed.png", 410 mimetype='image/png') 411 412 return send_file("static/status_images/unknown.png", 413 mimetype='image/png')
414