Package coprs :: Package logic :: Module coprs_logic
[hide private]
[frames] | no frames]

Source Code for Module coprs.logic.coprs_logic

  1  import os 
  2  import time 
  3   
  4  from sqlalchemy import and_ 
  5  from sqlalchemy.sql import func 
  6  from sqlalchemy import asc 
  7  from sqlalchemy.event import listen 
  8  from sqlalchemy.orm.attributes import NEVER_SET 
  9  from sqlalchemy.orm.exc import NoResultFound 
 10  from sqlalchemy.orm.attributes import get_history 
 11   
 12  from copr_common.enums import ActionTypeEnum, BackendResultEnum 
 13  from coprs import db 
 14  from coprs import exceptions 
 15  from coprs import helpers 
 16  from coprs import models 
 17  from coprs.exceptions import MalformedArgumentException 
 18  from coprs.logic import users_logic 
 19  from coprs.whoosheers import CoprWhoosheer 
 20  from coprs.helpers import fix_protocol_for_backend 
 21   
 22  from coprs.logic.actions_logic import ActionsLogic 
 23  from coprs.logic.users_logic import UsersLogic 
24 25 26 -class CoprsLogic(object):
27 """ 28 Used for manipulating Coprs. 29 30 All methods accept user object as a first argument, 31 as this may be needed in future. 32 """ 33 34 @classmethod
35 - def get_all(cls):
36 """ Return all coprs without those which are deleted. """ 37 query = (db.session.query(models.Copr) 38 .join(models.Copr.user) 39 .options(db.contains_eager(models.Copr.user)) 40 .filter(models.Copr.deleted == False)) 41 return query
42 43 @classmethod
44 - def get_by_id(cls, copr_id):
45 return cls.get_all().filter(models.Copr.id == copr_id)
46 47 @classmethod
48 - def attach_build(cls, query):
49 query = (query.outerjoin(models.Copr.builds) 50 .options(db.contains_eager(models.Copr.builds)) 51 .order_by(models.Build.submitted_on.desc())) 52 return query
53 54 @classmethod
55 - def attach_mock_chroots(cls, query):
56 query = (query.outerjoin(*models.Copr.mock_chroots.attr) 57 .options(db.contains_eager(*models.Copr.mock_chroots.attr)) 58 .order_by(models.MockChroot.os_release.asc()) 59 .order_by(models.MockChroot.os_version.asc()) 60 .order_by(models.MockChroot.arch.asc())) 61 return query
62 63 @classmethod
64 - def get_multiple_by_username(cls, username, **kwargs):
65 with_builds = kwargs.get("with_builds", False) 66 with_mock_chroots = kwargs.get("with_mock_chroots", False) 67 68 query = ( 69 cls.get_all() 70 .filter(models.User.username == username) 71 ) 72 73 if with_builds: 74 query = cls.attach_build(query) 75 76 if with_mock_chroots: 77 query = cls.attach_mock_chroots(query) 78 79 return query
80 81 @classmethod
82 - def get_multiple_by_group_id(cls, group_id, **kwargs):
83 with_builds = kwargs.get("with_builds", False) 84 with_mock_chroots = kwargs.get("with_mock_chroots", False) 85 86 query = ( 87 cls.get_all() 88 .filter(models.Copr.group_id == group_id) 89 ) 90 91 if with_builds: 92 query = cls.attach_build(query) 93 94 if with_mock_chroots: 95 query = cls.attach_mock_chroots(query) 96 97 return query
98 99 @classmethod
100 - def get(cls, username, coprname, **kwargs):
101 query = cls.get_multiple_by_username(username, **kwargs) 102 query = query.filter(models.Copr.name == coprname) 103 return query
104 105 @classmethod
106 - def get_by_group_id(cls, group_id, coprname, **kwargs):
107 query = cls.get_multiple_by_group_id(group_id, **kwargs) 108 query = query.filter(models.Copr.name == coprname) 109 return query
110 111 @classmethod
112 - def get_multiple(cls, include_deleted=False, include_unlisted_on_hp=True):
113 query = ( 114 db.session.query(models.Copr) 115 .join(models.Copr.user) 116 .outerjoin(models.Group) 117 .options(db.contains_eager(models.Copr.user)) 118 ) 119 120 if not include_deleted: 121 query = query.filter(models.Copr.deleted.is_(False)) 122 123 if not include_unlisted_on_hp: 124 query = query.filter(models.Copr.unlisted_on_hp.is_(False)) 125 126 return query
127 128 @classmethod
129 - def set_query_order(cls, query, desc=False):
130 if desc: 131 query = query.order_by(models.Copr.id.desc()) 132 else: 133 query = query.order_by(models.Copr.id.asc()) 134 return query
135 136 # user_relation="owned", username=username, with_mock_chroots=False 137 @classmethod
138 - def get_multiple_owned_by_username(cls, username):
139 query = cls.get_multiple() 140 return query.filter(models.User.username == username)
141 142 @classmethod
143 - def filter_by_name(cls, query, name):
144 return query.filter(models.Copr.name == name)
145 146 @classmethod
147 - def filter_by_user_name(cls, query, username):
148 # should be already joined with the User table 149 return query.filter(models.User.username == username)
150 151 @classmethod
152 - def filter_by_group_name(cls, query, group_name):
153 # should be already joined with the Group table 154 return query.filter(models.Group.name == group_name)
155 156 @classmethod
157 - def filter_without_group_projects(cls, query):
158 return query.filter(models.Copr.group_id.is_(None))
159 160 @classmethod
161 - def join_builds(cls, query):
162 return (query.outerjoin(models.Copr.builds) 163 .options(db.contains_eager(models.Copr.builds)) 164 .order_by(models.Build.submitted_on.desc()))
165 166 @classmethod
167 - def join_mock_chroots(cls, query):
168 return (query.outerjoin(*models.Copr.mock_chroots.attr) 169 .options(db.contains_eager(*models.Copr.mock_chroots.attr)) 170 .order_by(models.MockChroot.os_release.asc()) 171 .order_by(models.MockChroot.os_version.asc()) 172 .order_by(models.MockChroot.arch.asc()))
173 174 @classmethod
175 - def get_playground(cls):
176 return cls.get_all().filter(models.Copr.playground == True)
177 178 @classmethod
179 - def set_playground(cls, user, copr):
180 if user.admin: 181 db.session.add(copr) 182 pass 183 else: 184 raise exceptions.InsufficientRightsException( 185 "User is not a system admin")
186 187 @classmethod
188 - def get_multiple_fulltext(cls, search_string):
189 query = (models.Copr.query.join(models.User) 190 .filter(models.Copr.deleted == False)) 191 if "/" in search_string: # copr search by its full name 192 if search_string[0] == '@': # searching for @group/project 193 group_name = "%{}%".format(search_string.split("/")[0][1:]) 194 project = "%{}%".format(search_string.split("/")[1]) 195 query = query.filter(and_(models.Group.name.ilike(group_name), 196 models.Copr.name.ilike(project), 197 models.Group.id == models.Copr.group_id)) 198 query = query.order_by(asc(func.length(models.Group.name)+func.length(models.Copr.name))) 199 else: # searching for user/project 200 user_name = "%{}%".format(search_string.split("/")[0]) 201 project = "%{}%".format(search_string.split("/")[1]) 202 query = query.filter(and_(models.User.username.ilike(user_name), 203 models.Copr.name.ilike(project), 204 models.User.id == models.Copr.user_id)) 205 query = query.order_by(asc(func.length(models.User.username)+func.length(models.Copr.name))) 206 else: # fulltext search 207 query = query.whooshee_search(search_string, whoosheer=CoprWhoosheer, order_by_relevance=100) 208 return query
209 210 @classmethod
211 - def add(cls, user, name, selected_chroots, repos=None, description=None, 212 instructions=None, check_for_duplicates=False, group=None, persistent=False, 213 auto_prune=True, use_bootstrap_container=False, follow_fedora_branching=False, **kwargs):
214 215 if not user.admin and persistent: 216 raise exceptions.NonAdminCannotCreatePersistentProject() 217 218 if not user.admin and not auto_prune: 219 raise exceptions.NonAdminCannotDisableAutoPrunning() 220 221 # form validation checks for duplicates 222 cls.new(user, name, group, check_for_duplicates=check_for_duplicates) 223 224 copr = models.Copr(name=name, 225 repos=repos or u"", 226 user=user, 227 description=description or u"", 228 instructions=instructions or u"", 229 created_on=int(time.time()), 230 persistent=persistent, 231 auto_prune=auto_prune, 232 use_bootstrap_container=use_bootstrap_container, 233 follow_fedora_branching=follow_fedora_branching, 234 **kwargs) 235 236 237 if group is not None: 238 UsersLogic.raise_if_not_in_group(user, group) 239 copr.group = group 240 241 copr_dir = models.CoprDir( 242 main=True, 243 name=name, 244 copr=copr) 245 246 db.session.add(copr_dir) 247 db.session.add(copr) 248 249 CoprChrootsLogic.new_from_names( 250 copr, selected_chroots) 251 252 db.session.flush() 253 ActionsLogic.send_create_gpg_key(copr) 254 255 return copr
256 257 @classmethod
258 - def new(cls, user, copr_name, group=None, check_for_duplicates=True):
259 if check_for_duplicates: 260 if group is None and cls.exists_for_user(user, copr_name).all(): 261 raise exceptions.DuplicateException( 262 "Copr: '{0}/{1}' already exists".format(user.name, copr_name)) 263 elif group: 264 if cls.exists_for_group(group, copr_name).all(): 265 db.session.rollback() 266 raise exceptions.DuplicateException( 267 "Copr: '@{0}/{1}' already exists".format(group.name, copr_name))
268 269 @classmethod
270 - def update(cls, user, copr):
271 # we should call get_history before other requests, otherwise 272 # the changes would be forgotten 273 if get_history(copr, "name").has_changes(): 274 raise MalformedArgumentException("Change name of the project is forbidden") 275 276 users_logic.UsersLogic.raise_if_cant_update_copr( 277 user, copr, "Only owners and admins may update their projects.") 278 279 if not user.admin and not copr.auto_prune: 280 raise exceptions.NonAdminCannotDisableAutoPrunning() 281 282 db.session.add(copr)
283 284 @classmethod
285 - def delete_unsafe(cls, user, copr):
286 """ 287 Deletes copr without termination of ongoing builds. 288 """ 289 cls.raise_if_cant_delete(user, copr) 290 # TODO: do we want to dump the information somewhere, so that we can 291 # search it in future? 292 cls.raise_if_unfinished_blocking_action( 293 copr, "Can't delete this project," 294 " another operation is in progress: {action}") 295 296 ActionsLogic.send_delete_copr(copr) 297 CoprDirsLogic.delete_all_by_copr(copr) 298 299 copr.deleted = True 300 return copr
301 302 @classmethod
303 - def exists_for_user(cls, user, coprname, incl_deleted=False):
304 existing = (models.Copr.query 305 .filter(models.Copr.name == coprname) 306 .filter(models.Copr.user_id == user.id)) 307 308 if not incl_deleted: 309 existing = existing.filter(models.Copr.deleted == False) 310 311 return cls.filter_without_group_projects(existing)
312 313 @classmethod
314 - def exists_for_group(cls, group, coprname, incl_deleted=False):
315 existing = (models.Copr.query 316 .filter(models.Copr.name == coprname) 317 .filter(models.Copr.group_id == group.id)) 318 319 if not incl_deleted: 320 existing = existing.filter(models.Copr.deleted == False) 321 322 return existing
323 324 @classmethod
325 - def unfinished_blocking_actions_for(cls, copr):
326 blocking_actions = [ActionTypeEnum("delete")] 327 328 actions = (models.Action.query 329 .filter(models.Action.object_type == "copr") 330 .filter(models.Action.object_id == copr.id) 331 .filter(models.Action.result == 332 BackendResultEnum("waiting")) 333 .filter(models.Action.action_type.in_(blocking_actions))) 334 335 return actions
336 337 @classmethod
338 - def get_yum_repos(cls, copr, empty=False):
339 repos = {} 340 release_tmpl = "{chroot.os_release}-{chroot.os_version}-{chroot.arch}" 341 build = models.Build.query.filter(models.Build.copr_id == copr.id).first() 342 if build or empty: 343 for chroot in copr.active_chroots: 344 release = release_tmpl.format(chroot=chroot) 345 repos[release] = fix_protocol_for_backend( 346 os.path.join(copr.repo_url, release + '/')) 347 return repos
348 349 @classmethod
350 - def raise_if_unfinished_blocking_action(cls, copr, message):
351 """ 352 Raise ActionInProgressException if given copr has an unfinished 353 action. Return None otherwise. 354 """ 355 356 unfinished_actions = cls.unfinished_blocking_actions_for(copr).all() 357 if unfinished_actions: 358 raise exceptions.ActionInProgressException( 359 message, unfinished_actions[0])
360 361 @classmethod
362 - def raise_if_cant_delete(cls, user, copr):
363 """ 364 Raise InsufficientRightsException if given copr cant be deleted 365 by given user. Return None otherwise. 366 """ 367 368 if not user.admin and user != copr.user: 369 raise exceptions.InsufficientRightsException( 370 "Only owners may delete their projects.")
371
372 373 -class CoprPermissionsLogic(object):
374 @classmethod
375 - def get(cls, copr, searched_user):
376 query = (models.CoprPermission.query 377 .filter(models.CoprPermission.copr == copr) 378 .filter(models.CoprPermission.user == searched_user)) 379 380 return query
381 382 @classmethod
383 - def get_for_copr(cls, copr):
384 query = models.CoprPermission.query.filter( 385 models.CoprPermission.copr == copr) 386 387 return query
388 389 @classmethod
390 - def new(cls, copr_permission):
391 db.session.add(copr_permission)
392 393 @classmethod
394 - def update_permissions(cls, user, copr, copr_permission, 395 new_builder, new_admin):
396 397 users_logic.UsersLogic.raise_if_cant_update_copr( 398 user, copr, "Only owners and admins may update" 399 " their projects permissions.") 400 401 (models.CoprPermission.query 402 .filter(models.CoprPermission.copr_id == copr.id) 403 .filter(models.CoprPermission.user_id == copr_permission.user_id) 404 .update({"copr_builder": new_builder, 405 "copr_admin": new_admin}))
406 407 @classmethod
408 - def update_permissions_by_applier(cls, user, copr, copr_permission, new_builder, new_admin):
409 if copr_permission: 410 # preserve approved permissions if set 411 if (not new_builder or 412 copr_permission.copr_builder != helpers.PermissionEnum("approved")): 413 414 copr_permission.copr_builder = new_builder 415 416 if (not new_admin or 417 copr_permission.copr_admin != helpers.PermissionEnum("approved")): 418 419 copr_permission.copr_admin = new_admin 420 else: 421 perm = models.CoprPermission( 422 user=user, 423 copr=copr, 424 copr_builder=new_builder, 425 copr_admin=new_admin) 426 427 cls.new(perm)
428 429 @classmethod
430 - def delete(cls, copr_permission):
431 db.session.delete(copr_permission)
432
433 434 -class CoprDirsLogic(object):
435 @classmethod
436 - def get_or_create(cls, copr, dirname, main=False):
437 copr_dir = cls.get_by_copr(copr, dirname).first() 438 439 if copr_dir: 440 return copr_dir 441 442 copr_dir = models.CoprDir( 443 name=dirname, copr=copr, main=main) 444 445 db.session.add(copr_dir) 446 return copr_dir
447 448 @classmethod
449 - def get_by_copr(cls, copr, dirname):
450 return (db.session.query(models.CoprDir) 451 .join(models.Copr) 452 .filter(models.Copr.id==copr.id) 453 .filter(models.CoprDir.name==dirname))
454 455 @classmethod
456 - def get_by_ownername(cls, ownername, dirname):
457 return (db.session.query(models.CoprDir) 458 .filter(models.CoprDir.name==dirname) 459 .filter(models.CoprDir.ownername==ownername))
460 461 @classmethod
462 - def delete(cls, copr_dir):
463 db.session.delete(copr_dir)
464 465 @classmethod
466 - def delete_all_by_copr(cls, copr):
467 for copr_dir in copr.dirs: 468 db.session.delete(copr_dir)
469
470 471 -def on_auto_createrepo_change(target_copr, value_acr, old_value_acr, initiator):
472 """ Emit createrepo action when auto_createrepo re-enabled""" 473 if old_value_acr == NEVER_SET: 474 # created new copr, not interesting 475 return 476 if not old_value_acr and value_acr: 477 # re-enabled 478 ActionsLogic.send_createrepo(target_copr)
479 480 481 listen(models.Copr.auto_createrepo, 'set', on_auto_createrepo_change, 482 active_history=True, retval=False)
483 484 485 -class BranchesLogic(object):
486 @classmethod
487 - def get_or_create(cls, name, session=db.session):
488 item = session.query(models.DistGitBranch).filter_by(name=name).first() 489 if item: 490 return item 491 492 branch = models.DistGitBranch() 493 branch.name = name 494 session.add(branch) 495 return branch
496
497 498 -class CoprChrootsLogic(object):
499 @classmethod
500 - def mock_chroots_from_names(cls, names):
501 502 db_chroots = models.MockChroot.query.all() 503 mock_chroots = [] 504 for ch in db_chroots: 505 if ch.name in names: 506 mock_chroots.append(ch) 507 508 return mock_chroots
509 510 @classmethod
511 - def get_by_name(cls, copr, chroot_name):
512 mc = MockChrootsLogic.get_from_name(chroot_name, active_only=True).one() 513 query = ( 514 models.CoprChroot.query.join(models.MockChroot) 515 .filter(models.CoprChroot.copr_id == copr.id) 516 .filter(models.MockChroot.id == mc.id) 517 ) 518 return query
519 520 @classmethod
521 - def get_by_name_safe(cls, copr, chroot_name):
522 """ 523 :rtype: models.CoprChroot 524 """ 525 try: 526 return cls.get_by_name(copr, chroot_name).one() 527 except NoResultFound: 528 return None
529 530 @classmethod
531 - def new(cls, mock_chroot):
532 db.session.add(mock_chroot)
533 534 @classmethod
535 - def new_from_names(cls, copr, names):
539 540 @classmethod
541 - def create_chroot(cls, user, copr, mock_chroot, 542 buildroot_pkgs=None, repos=None, comps=None, comps_name=None, module_md=None, module_md_name=None, with_opts="", without_opts=""):
543 """ 544 :type user: models.User 545 :type mock_chroot: models.MockChroot 546 """ 547 if buildroot_pkgs is None: 548 buildroot_pkgs = "" 549 if repos is None: 550 repos = "" 551 UsersLogic.raise_if_cant_update_copr( 552 user, copr, 553 "Only owners and admins may update their projects.") 554 555 chroot = models.CoprChroot(copr=copr, mock_chroot=mock_chroot) 556 cls._update_chroot(buildroot_pkgs, repos, comps, comps_name, module_md, module_md_name, chroot, with_opts, without_opts) 557 return chroot
558 559 @classmethod
560 - def update_chroot(cls, user, copr_chroot, 561 buildroot_pkgs=None, repos=None, comps=None, comps_name=None, module_md=None, module_md_name=None, with_opts="", without_opts=""):
562 """ 563 :type user: models.User 564 :type copr_chroot: models.CoprChroot 565 """ 566 UsersLogic.raise_if_cant_update_copr( 567 user, copr_chroot.copr, 568 "Only owners and admins may update their projects.") 569 570 cls._update_chroot(buildroot_pkgs, repos, comps, comps_name, module_md, module_md_name, copr_chroot, with_opts, without_opts) 571 return copr_chroot
572 573 @classmethod
574 - def _update_chroot(cls, buildroot_pkgs, repos, comps, comps_name, module_md, module_md_name, copr_chroot, with_opts, without_opts):
575 if buildroot_pkgs is not None: 576 copr_chroot.buildroot_pkgs = buildroot_pkgs 577 578 if repos is not None: 579 copr_chroot.repos = repos.replace("\n", " ") 580 581 if with_opts is not None: 582 copr_chroot.with_opts = with_opts 583 584 if without_opts is not None: 585 copr_chroot.without_opts = without_opts 586 587 if comps_name is not None: 588 copr_chroot.update_comps(comps) 589 copr_chroot.comps_name = comps_name 590 ActionsLogic.send_update_comps(copr_chroot) 591 592 if module_md_name is not None: 593 copr_chroot.update_module_md(module_md) 594 copr_chroot.module_md_name = module_md_name 595 ActionsLogic.send_update_module_md(copr_chroot) 596 597 db.session.add(copr_chroot)
598 599 @classmethod
600 - def update_from_names(cls, user, copr, names):
601 UsersLogic.raise_if_cant_update_copr( 602 user, copr, 603 "Only owners and admins may update their projects.") 604 current_chroots = copr.mock_chroots 605 new_chroots = cls.mock_chroots_from_names(names) 606 # add non-existing 607 for mock_chroot in new_chroots: 608 if mock_chroot not in current_chroots: 609 db.session.add( 610 models.CoprChroot(copr=copr, mock_chroot=mock_chroot)) 611 612 # delete no more present 613 to_remove = [] 614 for mock_chroot in current_chroots: 615 if mock_chroot not in new_chroots: 616 # can't delete here, it would change current_chroots and break 617 # iteration 618 to_remove.append(mock_chroot) 619 620 for mc in to_remove: 621 copr.mock_chroots.remove(mc)
622 623 @classmethod
624 - def remove_comps(cls, user, copr_chroot):
625 UsersLogic.raise_if_cant_update_copr( 626 user, copr_chroot.copr, 627 "Only owners and admins may update their projects.") 628 629 copr_chroot.comps_name = None 630 copr_chroot.comps_zlib = None 631 ActionsLogic.send_update_comps(copr_chroot) 632 db.session.add(copr_chroot)
633 634 @classmethod
635 - def remove_module_md(cls, user, copr_chroot):
636 UsersLogic.raise_if_cant_update_copr( 637 user, copr_chroot.copr, 638 "Only owners and admins may update their projects.") 639 640 copr_chroot.module_md_name = None 641 copr_chroot.module_md_zlib = None 642 ActionsLogic.send_update_module_md(copr_chroot) 643 db.session.add(copr_chroot)
644 645 @classmethod
646 - def remove_copr_chroot(cls, user, copr_chroot):
647 """ 648 :param models.CoprChroot chroot: 649 """ 650 UsersLogic.raise_if_cant_update_copr( 651 user, copr_chroot.copr, 652 "Only owners and admins may update their projects.") 653 654 db.session.delete(copr_chroot)
655
656 657 -class MockChrootsLogic(object):
658 @classmethod
659 - def get(cls, os_release, os_version, arch, active_only=False, noarch=False):
669 670 @classmethod
671 - def get_from_name(cls, chroot_name, active_only=False, noarch=False):
672 """ 673 chroot_name should be os-version-architecture, e.g. fedora-rawhide-x86_64 674 the architecture could be optional with noarch=True 675 676 Return MockChroot object for textual representation of chroot 677 """ 678 679 name_tuple = cls.tuple_from_name(chroot_name, noarch=noarch) 680 return cls.get(name_tuple[0], name_tuple[1], name_tuple[2], 681 active_only=active_only, noarch=noarch)
682 683 @classmethod
684 - def get_multiple(cls, active_only=False):
685 query = models.MockChroot.query 686 if active_only: 687 query = query.filter(models.MockChroot.is_active == True) 688 return query
689 690 @classmethod
691 - def add(cls, name):
692 name_tuple = cls.tuple_from_name(name) 693 if cls.get(*name_tuple).first(): 694 raise exceptions.DuplicateException( 695 "Mock chroot with this name already exists.") 696 new_chroot = models.MockChroot(os_release=name_tuple[0], 697 os_version=name_tuple[1], 698 arch=name_tuple[2]) 699 cls.new(new_chroot) 700 return new_chroot
701 702 @classmethod
703 - def new(cls, mock_chroot):
704 db.session.add(mock_chroot)
705 706 @classmethod
707 - def edit_by_name(cls, name, is_active):
708 name_tuple = cls.tuple_from_name(name) 709 mock_chroot = cls.get(*name_tuple).first() 710 if not mock_chroot: 711 raise exceptions.NotFoundException( 712 "Mock chroot with this name doesn't exist.") 713 714 mock_chroot.is_active = is_active 715 cls.update(mock_chroot) 716 return mock_chroot
717 718 @classmethod
719 - def update(cls, mock_chroot):
720 db.session.add(mock_chroot)
721 722 @classmethod
723 - def delete_by_name(cls, name):
724 name_tuple = cls.tuple_from_name(name) 725 mock_chroot = cls.get(*name_tuple).first() 726 if not mock_chroot: 727 raise exceptions.NotFoundException( 728 "Mock chroot with this name doesn't exist.") 729 730 cls.delete(mock_chroot)
731 732 @classmethod
733 - def delete(cls, mock_chroot):
734 db.session.delete(mock_chroot)
735 736 @classmethod
737 - def tuple_from_name(cls, name, noarch=False):
738 """ 739 input should be os-version-architecture, e.g. fedora-rawhide-x86_64 740 741 the architecture could be optional with noarch=True 742 743 returns ("os", "version", "arch") or ("os", "version", None) 744 """ 745 split_name = name.rsplit("-", 1) if noarch else name.rsplit("-", 2) 746 747 valid = False 748 if noarch and len(split_name) in [2, 3]: 749 valid = True 750 if not noarch and len(split_name) == 3: 751 valid = True 752 753 if not valid: 754 raise MalformedArgumentException( 755 "Chroot name is not valid") 756 757 if noarch and len(split_name) == 2: 758 split_name.append(None) 759 760 return tuple(split_name)
761