Skip to content

Role manager

RoleManager manages a collection of Role objects, allowing creation and permission management for multiple roles in the PostgreSQL database.

Version Added

1.3.0

Source code in pum/role_manager.py
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
class RoleManager:
    """
    RoleManager manages a collection of Role objects,
    allowing creation and permission management
    for multiple roles in the PostgreSQL database.

    Version Added:
        1.3.0
    """

    def __init__(self, roles=list[Role] | list[dict]) -> None:
        """Initialize the RoleManager class.:
        Args:
            roles: List of roles or dictionaries defining roles.
            Each role can be a dictionary with keys 'name', 'permissions', and optional 'description' and 'inherit'.
        """
        if isinstance(roles, list) and all(isinstance(role, dict) for role in roles):
            self.roles = {}
            for role in roles:
                _inherit = role.get("inherit")
                if _inherit is not None:
                    if _inherit not in self.roles:
                        raise ValueError(
                            f"Inherited role {_inherit} does not exist in the already defined roles. Pay attention to the order of the roles in the list."
                        )
                    role["inherit"] = self.roles[_inherit]
                self.roles[role["name"]] = Role(**role)
        elif isinstance(roles, list) and all(isinstance(role, Role) for role in roles):
            _roles = copy.deepcopy(roles)
            self.roles = {role.name: role for role in _roles}
        else:
            raise TypeError("Roles must be a list of dictionaries or Role instances.")

        for role in self.roles.values():
            if role.inherit is not None and role.inherit not in self.roles.values():
                raise ValueError(
                    f"Inherited role {role.inherit.name} does not exist in the defined roles."
                )

    def create_roles(
        self,
        connection: psycopg.Connection,
        *,
        suffix: str | None = None,
        grant: bool = False,
        commit: bool = False,
        feedback: Optional["Feedback"] = None,
    ) -> None:
        """Create roles in the database.

        When *suffix* is provided, DB-specific roles are created by appending
        the suffix to each configured role name (e.g. ``tww_user_lausanne``
        for suffix ``lausanne``). The generic (base) roles are also created
        and granted membership of the specific roles, so that the generic role
        inherits the specific one's permissions.

        When *suffix* is ``None`` (default), only the generic roles defined in
        the configuration are created.

        Args:
            connection: The database connection to execute the SQL statements.
            suffix: Optional suffix to append to role names for DB-specific
                roles. When provided, both the suffixed and generic roles are
                created, and inheritance is granted.
            grant: Whether to grant permissions to the roles. Defaults to False.
            commit: Whether to commit the transaction. Defaults to False.
            feedback: Optional feedback object for progress reporting.

        Version Changed:
            1.5.0: Added *suffix* parameter for DB-specific roles.
        """
        roles_list = list(self.roles.values())

        if suffix:
            for role in roles_list:
                if feedback and feedback.is_cancelled():
                    raise PumException("Role creation cancelled by user")

                specific_name = f"{role.name}_{suffix}"

                # Build a specific role with the suffixed name and same permissions
                specific_role = Role(
                    name=specific_name,
                    permissions=[
                        Permission(type=p.type, schemas=p.schemas) for p in role.permissions()
                    ],
                    description=(
                        f"{role.description} (specific to {suffix})" if role.description else None
                    ),
                )

                if feedback:
                    feedback.increment_step()
                    feedback.report_progress(f"Creating specific role: {specific_name}")

                specific_role.create(
                    connection=connection, commit=False, grant=grant, feedback=feedback
                )

                if feedback:
                    feedback.increment_step()
                    feedback.report_progress(f"Creating generic role: {role.name}")
                role.create(connection=connection, commit=False, grant=False, feedback=feedback)

                logger.debug(f"Granting specific role {specific_name} to generic role {role.name}.")
                SqlContent("GRANT {specific} TO {generic}").execute(
                    connection=connection,
                    commit=False,
                    parameters={
                        "specific": psycopg.sql.Identifier(specific_name),
                        "generic": psycopg.sql.Identifier(role.name),
                    },
                )
        else:
            for role in roles_list:
                if feedback and feedback.is_cancelled():
                    raise PumException("Role creation cancelled by user")
                if feedback:
                    feedback.increment_step()
                    feedback.report_progress(f"Creating role: {role.name}")
                role.create(connection=connection, commit=False, grant=grant, feedback=feedback)

        if commit:
            if feedback:
                feedback.lock_cancellation()
            connection.commit()

    def grant_permissions(
        self,
        connection: psycopg.Connection,
        commit: bool = False,
        feedback: Optional["Feedback"] = None,
    ) -> None:
        """Grant permissions to the roles in the database.
        Args:
            connection: The database connection to execute the SQL statements.
            commit: Whether to commit the transaction. Defaults to False.
            feedback: Optional feedback object for progress reporting.
        """
        roles_list = list(self.roles.values())
        for role in roles_list:
            if feedback and feedback.is_cancelled():
                from .exceptions import PumException

                raise PumException("Permission grant cancelled by user")
            if feedback:
                feedback.increment_step()
                feedback.report_progress(f"Granting permissions to role: {role.name}")
            for permission in role.permissions():
                permission.grant(
                    role=role.name, connection=connection, commit=False, feedback=feedback
                )
        role_names = ", ".join(r.name for r in roles_list)
        logger.info(f"Permissions granted to roles: {role_names}.")
        if commit:
            if feedback:
                feedback.lock_cancellation()
            connection.commit()

    def _resolve_roles(
        self,
        roles: list[str] | None = None,
    ) -> list[Role]:
        """Return the list of Role objects to act on.

        When *roles* is ``None`` all configured roles are returned.
        Otherwise only the roles whose names appear in the list are
        returned (in configuration order).  Raises ``PumException``
        if any requested name is not in the configuration.

        Args:
            roles: Optional role names to filter on.

        Returns:
            Filtered list of ``Role`` objects.
        """
        if roles is None:
            return list(self.roles.values())
        unknown = set(roles) - set(self.roles)
        if unknown:
            raise PumException(
                f"Unknown role(s): {', '.join(sorted(unknown))}. "
                f"Configured roles: {', '.join(self.roles)}"
            )
        return [self.roles[name] for name in roles]

    def revoke_permissions(
        self,
        connection: psycopg.Connection,
        *,
        roles: list[str] | None = None,
        suffix: str | None = None,
        commit: bool = False,
        feedback: Optional["Feedback"] = None,
    ) -> None:
        """Revoke previously granted permissions from roles.

        When *suffix* is provided, permissions are revoked from the
        DB-specific (suffixed) roles only.  Otherwise they are revoked
        from the generic roles.

        When *roles* is provided only those configured roles are acted
        on; otherwise all configured roles are affected.

        Args:
            connection: The database connection to execute the SQL statements.
            roles: Optional list of configured role names to revoke.
                When ``None`` (default), all configured roles are revoked.
            suffix: Optional suffix identifying DB-specific roles.
            commit: Whether to commit the transaction. Defaults to False.
            feedback: Optional feedback object for progress reporting.

        Version Added:
            1.5.0
        """
        target_roles = self._resolve_roles(roles)
        for role in target_roles:
            if feedback and feedback.is_cancelled():
                raise PumException("Permission revoke cancelled by user")

            role_name = f"{role.name}_{suffix}" if suffix else role.name

            if feedback:
                feedback.increment_step()
                feedback.report_progress(f"Revoking permissions from role: {role_name}")

            for perm in role.permissions():
                for schema in perm.schemas or []:
                    # Check if schema exists before revoking
                    cursor = SqlContent(
                        "SELECT 1 FROM pg_namespace WHERE nspname = {schema}"
                    ).execute(
                        connection=connection,
                        commit=False,
                        parameters={"schema": psycopg.sql.Literal(schema)},
                    )
                    if not cursor._pum_results or not cursor._pum_results[0]:
                        logger.warning(
                            f"Schema {schema} does not exist; skipping revoke for role {role_name}."
                        )
                        continue

                    logger.debug(
                        f"Revoking {perm.type.value} permission on schema {schema} from role {role_name}."
                    )
                    if perm.type == PermissionType.READ:
                        SqlContent("""
                            ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE SELECT, REFERENCES, TRIGGER ON TABLES FROM {role};
                            ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE SELECT ON SEQUENCES FROM {role};
                            ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE EXECUTE ON FUNCTIONS FROM {role};
                            ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE EXECUTE ON ROUTINES FROM {role};
                            ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE USAGE ON TYPES FROM {role};
                            REVOKE EXECUTE ON ALL ROUTINES IN SCHEMA {schema} FROM {role};
                            REVOKE EXECUTE ON ALL FUNCTIONS IN SCHEMA {schema} FROM {role};
                            REVOKE USAGE, SELECT ON ALL SEQUENCES IN SCHEMA {schema} FROM {role};
                            REVOKE SELECT, REFERENCES, TRIGGER ON ALL TABLES IN SCHEMA {schema} FROM {role};
                            REVOKE USAGE ON SCHEMA {schema} FROM {role};
                        """).execute(
                            connection=connection,
                            commit=False,
                            parameters={
                                "schema": psycopg.sql.Identifier(schema),
                                "role": psycopg.sql.Identifier(role_name),
                            },
                        )
                    elif perm.type == PermissionType.WRITE:
                        SqlContent("""
                            ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE ALL ON TABLES FROM {role};
                            ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE ALL ON SEQUENCES FROM {role};
                            ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE ALL ON FUNCTIONS FROM {role};
                            ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE ALL ON ROUTINES FROM {role};
                            ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE ALL ON TYPES FROM {role};
                            REVOKE ALL ON ALL ROUTINES IN SCHEMA {schema} FROM {role};
                            REVOKE ALL ON ALL FUNCTIONS IN SCHEMA {schema} FROM {role};
                            REVOKE ALL ON ALL SEQUENCES IN SCHEMA {schema} FROM {role};
                            REVOKE ALL ON ALL TABLES IN SCHEMA {schema} FROM {role};
                            REVOKE ALL ON SCHEMA {schema} FROM {role};
                        """).execute(
                            connection=connection,
                            commit=False,
                            parameters={
                                "schema": psycopg.sql.Identifier(schema),
                                "role": psycopg.sql.Identifier(role_name),
                            },
                        )

        # Revoke role memberships so inherited privileges are removed too.
        # E.g. if generic is a member of specific (GRANT specific TO generic),
        # we need to REVOKE specific FROM generic for the revoke to be effective.
        for role in target_roles:
            role_name = f"{role.name}_{suffix}" if suffix else role.name

            for parent_role in RoleManager.memberships_of(
                connection=connection, user_name=role_name
            ):
                logger.debug(f"Revoking membership: REVOKE {parent_role} FROM {role_name}")
                SqlContent("REVOKE {parent} FROM {member}").execute(
                    connection=connection,
                    commit=False,
                    parameters={
                        "parent": psycopg.sql.Identifier(parent_role),
                        "member": psycopg.sql.Identifier(role_name),
                    },
                )

        role_names = ", ".join(f"{r.name}_{suffix}" if suffix else r.name for r in target_roles)
        logger.info(f"Permissions revoked from roles: {role_names}.")
        if commit:
            if feedback:
                feedback.lock_cancellation()
            connection.commit()

    def drop_roles(
        self,
        connection: psycopg.Connection,
        *,
        roles: list[str] | None = None,
        suffix: str | None = None,
        commit: bool = False,
        feedback: Optional["Feedback"] = None,
    ) -> None:
        """Drop roles from the database.

        Permissions are revoked first (via ``revoke_permissions``), then roles
        are dropped.  When *suffix* is provided only the DB-specific roles are
        dropped; the generic roles are left untouched.  Without *suffix* only
        the generic roles are dropped.

        When *roles* is provided only those configured roles are acted
        on; otherwise all configured roles are affected.

        Args:
            connection: The database connection to execute the SQL statements.
            roles: Optional list of configured role names to drop.
                When ``None`` (default), all configured roles are dropped.
            suffix: Optional suffix identifying DB-specific roles.
            commit: Whether to commit the transaction. Defaults to False.
            feedback: Optional feedback object for progress reporting.

        Version Added:
            1.5.0

        Version Changed:
            1.5.0: Added *roles* parameter for per-role operations.
        """
        target_roles = self._resolve_roles(roles)

        # Revoke permissions first so the roles own nothing
        self.revoke_permissions(
            connection=connection, roles=roles, suffix=suffix, commit=False, feedback=feedback
        )

        for role in target_roles:
            if feedback and feedback.is_cancelled():
                raise PumException("Role drop cancelled by user")

            role_name = f"{role.name}_{suffix}" if suffix else role.name

            if feedback:
                feedback.increment_step()
                feedback.report_progress(f"Dropping role: {role_name}")

            logger.debug(f"Dropping role {role_name}.")
            SqlContent("DROP ROLE IF EXISTS {name}").execute(
                connection=connection,
                commit=False,
                parameters={"name": psycopg.sql.Identifier(role_name)},
            )

        role_names = ", ".join(f"{r.name}_{suffix}" if suffix else r.name for r in target_roles)
        logger.info(f"Roles dropped: {role_names}.")
        if commit:
            if feedback:
                feedback.lock_cancellation()
            connection.commit()

    def grant_to(
        self,
        connection: psycopg.Connection,
        *,
        to: str,
        roles: list[str] | None = None,
        suffix: str | None = None,
        commit: bool = False,
        feedback: Optional["Feedback"] = None,
    ) -> None:
        """Grant configured roles to a database user.

        Executes ``GRANT <role> TO <to>`` for each selected role, making
        *to* a member of those roles so it inherits their permissions.

        When *suffix* is provided the suffixed role names are used
        (e.g. ``tww_viewer_lausanne``).  Otherwise the generic names are
        used.

        Args:
            connection: The database connection to execute the SQL statements.
            to: The target database role that will receive membership.
            roles: Optional list of configured role names to grant.
                When ``None`` (default), all configured roles are granted.
            suffix: Optional suffix identifying DB-specific roles.
            commit: Whether to commit the transaction. Defaults to False.
            feedback: Optional feedback object for progress reporting.

        Version Added:
            1.5.0
        """
        target_roles = self._resolve_roles(roles)

        for role in target_roles:
            if feedback and feedback.is_cancelled():
                raise PumException("Grant-to cancelled by user")

            role_name = f"{role.name}_{suffix}" if suffix else role.name

            if feedback:
                feedback.increment_step()
                feedback.report_progress(f"Granting {role_name} to {to}")

            logger.debug(f"Granting role {role_name} to {to}.")
            SqlContent("GRANT {role} TO {target}").execute(
                connection=connection,
                commit=False,
                parameters={
                    "role": psycopg.sql.Identifier(role_name),
                    "target": psycopg.sql.Identifier(to),
                },
            )

        role_names = ", ".join(f"{r.name}_{suffix}" if suffix else r.name for r in target_roles)
        logger.info(f"Roles granted to {to}: {role_names}.")
        if commit:
            if feedback:
                feedback.lock_cancellation()
            connection.commit()

    def revoke_from(
        self,
        connection: psycopg.Connection,
        *,
        from_role: str,
        roles: list[str] | None = None,
        suffix: str | None = None,
        commit: bool = False,
        feedback: Optional["Feedback"] = None,
    ) -> None:
        """Revoke configured roles from a database user.

        Executes ``REVOKE <role> FROM <from_role>`` for each selected
        role, removing *from_role*'s membership.

        When *suffix* is provided the suffixed role names are used.
        Otherwise the generic names are used.

        Args:
            connection: The database connection to execute the SQL statements.
            from_role: The target database role to revoke membership from.
            roles: Optional list of configured role names to revoke.
                When ``None`` (default), all configured roles are revoked.
            suffix: Optional suffix identifying DB-specific roles.
            commit: Whether to commit the transaction. Defaults to False.
            feedback: Optional feedback object for progress reporting.

        Version Added:
            1.5.0
        """
        target_roles = self._resolve_roles(roles)

        for role in target_roles:
            if feedback and feedback.is_cancelled():
                raise PumException("Revoke-from cancelled by user")

            role_name = f"{role.name}_{suffix}" if suffix else role.name

            if feedback:
                feedback.increment_step()
                feedback.report_progress(f"Revoking {role_name} from {from_role}")

            logger.debug(f"Revoking role {role_name} from {from_role}.")
            SqlContent("REVOKE {role} FROM {target}").execute(
                connection=connection,
                commit=False,
                parameters={
                    "role": psycopg.sql.Identifier(role_name),
                    "target": psycopg.sql.Identifier(from_role),
                },
            )

        role_names = ", ".join(f"{r.name}_{suffix}" if suffix else r.name for r in target_roles)
        logger.info(f"Roles revoked from {from_role}: {role_names}.")
        if commit:
            if feedback:
                feedback.lock_cancellation()
            connection.commit()

    @staticmethod
    def create_login_role(
        connection: psycopg.Connection,
        name: str,
        *,
        password: str | None = None,
        commit: bool = False,
    ) -> None:
        """Create a PostgreSQL role with the LOGIN attribute.

        Args:
            connection: The database connection.
            name: The name of the role to create.
            password: Optional password for the role.
            commit: Whether to commit the transaction. Defaults to False.

        Version Added:
            1.5.0
        """
        if password:
            SqlContent("CREATE ROLE {role} LOGIN PASSWORD {pwd}").execute(
                connection=connection,
                commit=commit,
                parameters={
                    "role": psycopg.sql.Identifier(name),
                    "pwd": psycopg.sql.Literal(password),
                },
            )
        else:
            SqlContent("CREATE ROLE {role} LOGIN").execute(
                connection=connection,
                commit=commit,
                parameters={"role": psycopg.sql.Identifier(name)},
            )
        logger.info(f"Login role '{name}' created.")

    @staticmethod
    def drop_login_role(
        connection: psycopg.Connection,
        name: str,
        *,
        commit: bool = False,
    ) -> None:
        """Drop a PostgreSQL login role.

        Args:
            connection: The database connection.
            name: The name of the role to drop.
            commit: Whether to commit the transaction. Defaults to False.

        Version Added:
            1.5.0
        """
        SqlContent("DROP ROLE IF EXISTS {role}").execute(
            connection=connection,
            commit=commit,
            parameters={"role": psycopg.sql.Identifier(name)},
        )
        logger.info(f"Login role '{name}' dropped.")

    @staticmethod
    def login_roles(connection: psycopg.Connection) -> list[str]:
        """Return the names of all login roles that are not superusers.

        This is useful for listing roles that can be granted membership
        in module roles.  System roles (``pg_*``) are excluded.

        Args:
            connection: The database connection.

        Returns:
            Sorted list of login role names.

        Version Added:
            1.5.0
        """
        with connection.transaction():
            cursor = connection.cursor()
            cursor.execute(
                "SELECT rolname FROM pg_roles "
                "WHERE rolcanlogin AND NOT rolsuper AND rolname NOT LIKE 'pg\\_%' "
                "ORDER BY rolname"
            )
            return [row[0] for row in cursor.fetchall()]

    @staticmethod
    def members_of(connection: psycopg.Connection, role_name: str) -> list[str]:
        """Return the login role names that are members of *role_name*.

        Args:
            connection: The database connection.
            role_name: The group role whose members to look up.

        Returns:
            Sorted list of member login role names.

        Version Added:
            1.5.4
        """
        with connection.transaction():
            cursor = connection.cursor()
            cursor.execute(
                "SELECT m.rolname "
                "FROM pg_auth_members am "
                "JOIN pg_roles r ON r.oid = am.roleid "
                "JOIN pg_roles m ON m.oid = am.member "
                "WHERE r.rolname = %s AND m.rolcanlogin "
                "ORDER BY m.rolname",
                (role_name,),
            )
            return [row[0] for row in cursor.fetchall()]

    @staticmethod
    def memberships_of(connection: psycopg.Connection, user_name: str) -> list[str]:
        """Return the role names that *user_name* is a member of.

        This is the inverse of :meth:`members_of`: instead of asking
        "who belongs to this role?", it asks "which roles does this user
        belong to?".

        Args:
            connection: The database connection.
            user_name: The login role whose memberships to look up.

        Returns:
            Sorted list of role names the user is a member of.

        Version Added:
            1.5.0
        """
        with connection.transaction():
            cursor = connection.cursor()
            cursor.execute(
                "SELECT r.rolname "
                "FROM pg_auth_members am "
                "JOIN pg_roles r ON r.oid = am.roleid "
                "JOIN pg_roles m ON m.oid = am.member "
                "WHERE m.rolname = %s "
                "ORDER BY r.rolname",
                (user_name,),
            )
            return [row[0] for row in cursor.fetchall()]

    def roles_inventory(
        self,
        connection: psycopg.Connection,
        *,
        include_superusers: bool = False,
    ) -> "RoleInventory":
        """List all database roles related to the module's configured schemas.

        Returns the module's generic roles, any DB-specific (suffixed)
        variants discovered via naming convention, and any other
        database roles that have access to the configured schemas.
        For every role the method reports which schemas it can read or
        write, whether it is a superuser, and whether it can log in.

        Args:
            connection: The database connection to use.
            include_superusers: When ``True``, superusers are included in
                the results.  Defaults to ``False`` because superusers
                implicitly have access to everything.

        Returns:
            A ``RoleInventory`` containing the discovered roles.

        Version Added:
            1.5.0
        """
        configured_schemas = set()
        for role in self.roles.values():
            for perm in role.permissions():
                if perm.schemas:
                    configured_schemas.update(perm.schemas)

        with connection.transaction():
            cursor = connection.cursor()

            # Discover all roles matching each configured name or <name>_*
            role_statuses: list[RoleStatus] = []
            known_names: set[str] = set()
            for role in self.roles.values():
                # Find the generic role and any suffixed variants
                cursor.execute(
                    "SELECT rolname FROM pg_roles WHERE rolname = %s OR rolname LIKE %s ORDER BY rolname",
                    (role.name, f"{role.name}\\_%"),
                )
                found_names = [row[0] for row in cursor.fetchall()]

                for name in found_names:
                    role_statuses.append(
                        _build_role_status(connection, name, role, configured_schemas)
                    )
                    known_names.add(name)

            # Discover unknown roles with privileges on the configured schemas
            unknown_roles = _find_unknown_roles(
                connection,
                configured_schemas,
                known_names=known_names,
                include_superusers=include_superusers,
            )

            # Discover login roles that have no access to the configured schemas
            all_known = known_names | {r.name for r in unknown_roles}
            other_login = _find_other_login_roles(connection, configured_schemas, all_known)

            return RoleInventory(
                roles=role_statuses + unknown_roles,
                expected_roles=list(self.roles.keys()),
                other_login_roles=other_login,
            )

__init__

__init__(roles=list[Role] | list[dict]) -> None

Initialize the RoleManager class.: Args: roles: List of roles or dictionaries defining roles. Each role can be a dictionary with keys 'name', 'permissions', and optional 'description' and 'inherit'.

Source code in pum/role_manager.py
def __init__(self, roles=list[Role] | list[dict]) -> None:
    """Initialize the RoleManager class.:
    Args:
        roles: List of roles or dictionaries defining roles.
        Each role can be a dictionary with keys 'name', 'permissions', and optional 'description' and 'inherit'.
    """
    if isinstance(roles, list) and all(isinstance(role, dict) for role in roles):
        self.roles = {}
        for role in roles:
            _inherit = role.get("inherit")
            if _inherit is not None:
                if _inherit not in self.roles:
                    raise ValueError(
                        f"Inherited role {_inherit} does not exist in the already defined roles. Pay attention to the order of the roles in the list."
                    )
                role["inherit"] = self.roles[_inherit]
            self.roles[role["name"]] = Role(**role)
    elif isinstance(roles, list) and all(isinstance(role, Role) for role in roles):
        _roles = copy.deepcopy(roles)
        self.roles = {role.name: role for role in _roles}
    else:
        raise TypeError("Roles must be a list of dictionaries or Role instances.")

    for role in self.roles.values():
        if role.inherit is not None and role.inherit not in self.roles.values():
            raise ValueError(
                f"Inherited role {role.inherit.name} does not exist in the defined roles."
            )

create_login_role staticmethod

create_login_role(connection: Connection, name: str, *, password: str | None = None, commit: bool = False) -> None

Create a PostgreSQL role with the LOGIN attribute.

Parameters:

Name Type Description Default
connection Connection

The database connection.

required
name str

The name of the role to create.

required
password str | None

Optional password for the role.

None
commit bool

Whether to commit the transaction. Defaults to False.

False
Version Added

1.5.0

Source code in pum/role_manager.py
@staticmethod
def create_login_role(
    connection: psycopg.Connection,
    name: str,
    *,
    password: str | None = None,
    commit: bool = False,
) -> None:
    """Create a PostgreSQL role with the LOGIN attribute.

    Args:
        connection: The database connection.
        name: The name of the role to create.
        password: Optional password for the role.
        commit: Whether to commit the transaction. Defaults to False.

    Version Added:
        1.5.0
    """
    if password:
        SqlContent("CREATE ROLE {role} LOGIN PASSWORD {pwd}").execute(
            connection=connection,
            commit=commit,
            parameters={
                "role": psycopg.sql.Identifier(name),
                "pwd": psycopg.sql.Literal(password),
            },
        )
    else:
        SqlContent("CREATE ROLE {role} LOGIN").execute(
            connection=connection,
            commit=commit,
            parameters={"role": psycopg.sql.Identifier(name)},
        )
    logger.info(f"Login role '{name}' created.")

create_roles

create_roles(connection: Connection, *, suffix: str | None = None, grant: bool = False, commit: bool = False, feedback: Optional[Feedback] = None) -> None

Create roles in the database.

When suffix is provided, DB-specific roles are created by appending the suffix to each configured role name (e.g. tww_user_lausanne for suffix lausanne). The generic (base) roles are also created and granted membership of the specific roles, so that the generic role inherits the specific one's permissions.

When suffix is None (default), only the generic roles defined in the configuration are created.

Parameters:

Name Type Description Default
connection Connection

The database connection to execute the SQL statements.

required
suffix str | None

Optional suffix to append to role names for DB-specific roles. When provided, both the suffixed and generic roles are created, and inheritance is granted.

None
grant bool

Whether to grant permissions to the roles. Defaults to False.

False
commit bool

Whether to commit the transaction. Defaults to False.

False
feedback Optional[Feedback]

Optional feedback object for progress reporting.

None
Version Changed

1.5.0: Added suffix parameter for DB-specific roles.

Source code in pum/role_manager.py
def create_roles(
    self,
    connection: psycopg.Connection,
    *,
    suffix: str | None = None,
    grant: bool = False,
    commit: bool = False,
    feedback: Optional["Feedback"] = None,
) -> None:
    """Create roles in the database.

    When *suffix* is provided, DB-specific roles are created by appending
    the suffix to each configured role name (e.g. ``tww_user_lausanne``
    for suffix ``lausanne``). The generic (base) roles are also created
    and granted membership of the specific roles, so that the generic role
    inherits the specific one's permissions.

    When *suffix* is ``None`` (default), only the generic roles defined in
    the configuration are created.

    Args:
        connection: The database connection to execute the SQL statements.
        suffix: Optional suffix to append to role names for DB-specific
            roles. When provided, both the suffixed and generic roles are
            created, and inheritance is granted.
        grant: Whether to grant permissions to the roles. Defaults to False.
        commit: Whether to commit the transaction. Defaults to False.
        feedback: Optional feedback object for progress reporting.

    Version Changed:
        1.5.0: Added *suffix* parameter for DB-specific roles.
    """
    roles_list = list(self.roles.values())

    if suffix:
        for role in roles_list:
            if feedback and feedback.is_cancelled():
                raise PumException("Role creation cancelled by user")

            specific_name = f"{role.name}_{suffix}"

            # Build a specific role with the suffixed name and same permissions
            specific_role = Role(
                name=specific_name,
                permissions=[
                    Permission(type=p.type, schemas=p.schemas) for p in role.permissions()
                ],
                description=(
                    f"{role.description} (specific to {suffix})" if role.description else None
                ),
            )

            if feedback:
                feedback.increment_step()
                feedback.report_progress(f"Creating specific role: {specific_name}")

            specific_role.create(
                connection=connection, commit=False, grant=grant, feedback=feedback
            )

            if feedback:
                feedback.increment_step()
                feedback.report_progress(f"Creating generic role: {role.name}")
            role.create(connection=connection, commit=False, grant=False, feedback=feedback)

            logger.debug(f"Granting specific role {specific_name} to generic role {role.name}.")
            SqlContent("GRANT {specific} TO {generic}").execute(
                connection=connection,
                commit=False,
                parameters={
                    "specific": psycopg.sql.Identifier(specific_name),
                    "generic": psycopg.sql.Identifier(role.name),
                },
            )
    else:
        for role in roles_list:
            if feedback and feedback.is_cancelled():
                raise PumException("Role creation cancelled by user")
            if feedback:
                feedback.increment_step()
                feedback.report_progress(f"Creating role: {role.name}")
            role.create(connection=connection, commit=False, grant=grant, feedback=feedback)

    if commit:
        if feedback:
            feedback.lock_cancellation()
        connection.commit()

drop_login_role staticmethod

drop_login_role(connection: Connection, name: str, *, commit: bool = False) -> None

Drop a PostgreSQL login role.

Parameters:

Name Type Description Default
connection Connection

The database connection.

required
name str

The name of the role to drop.

required
commit bool

Whether to commit the transaction. Defaults to False.

False
Version Added

1.5.0

Source code in pum/role_manager.py
@staticmethod
def drop_login_role(
    connection: psycopg.Connection,
    name: str,
    *,
    commit: bool = False,
) -> None:
    """Drop a PostgreSQL login role.

    Args:
        connection: The database connection.
        name: The name of the role to drop.
        commit: Whether to commit the transaction. Defaults to False.

    Version Added:
        1.5.0
    """
    SqlContent("DROP ROLE IF EXISTS {role}").execute(
        connection=connection,
        commit=commit,
        parameters={"role": psycopg.sql.Identifier(name)},
    )
    logger.info(f"Login role '{name}' dropped.")

drop_roles

drop_roles(connection: Connection, *, roles: list[str] | None = None, suffix: str | None = None, commit: bool = False, feedback: Optional[Feedback] = None) -> None

Drop roles from the database.

Permissions are revoked first (via revoke_permissions), then roles are dropped. When suffix is provided only the DB-specific roles are dropped; the generic roles are left untouched. Without suffix only the generic roles are dropped.

When roles is provided only those configured roles are acted on; otherwise all configured roles are affected.

Parameters:

Name Type Description Default
connection Connection

The database connection to execute the SQL statements.

required
roles list[str] | None

Optional list of configured role names to drop. When None (default), all configured roles are dropped.

None
suffix str | None

Optional suffix identifying DB-specific roles.

None
commit bool

Whether to commit the transaction. Defaults to False.

False
feedback Optional[Feedback]

Optional feedback object for progress reporting.

None
Version Added

1.5.0

Version Changed

1.5.0: Added roles parameter for per-role operations.

Source code in pum/role_manager.py
def drop_roles(
    self,
    connection: psycopg.Connection,
    *,
    roles: list[str] | None = None,
    suffix: str | None = None,
    commit: bool = False,
    feedback: Optional["Feedback"] = None,
) -> None:
    """Drop roles from the database.

    Permissions are revoked first (via ``revoke_permissions``), then roles
    are dropped.  When *suffix* is provided only the DB-specific roles are
    dropped; the generic roles are left untouched.  Without *suffix* only
    the generic roles are dropped.

    When *roles* is provided only those configured roles are acted
    on; otherwise all configured roles are affected.

    Args:
        connection: The database connection to execute the SQL statements.
        roles: Optional list of configured role names to drop.
            When ``None`` (default), all configured roles are dropped.
        suffix: Optional suffix identifying DB-specific roles.
        commit: Whether to commit the transaction. Defaults to False.
        feedback: Optional feedback object for progress reporting.

    Version Added:
        1.5.0

    Version Changed:
        1.5.0: Added *roles* parameter for per-role operations.
    """
    target_roles = self._resolve_roles(roles)

    # Revoke permissions first so the roles own nothing
    self.revoke_permissions(
        connection=connection, roles=roles, suffix=suffix, commit=False, feedback=feedback
    )

    for role in target_roles:
        if feedback and feedback.is_cancelled():
            raise PumException("Role drop cancelled by user")

        role_name = f"{role.name}_{suffix}" if suffix else role.name

        if feedback:
            feedback.increment_step()
            feedback.report_progress(f"Dropping role: {role_name}")

        logger.debug(f"Dropping role {role_name}.")
        SqlContent("DROP ROLE IF EXISTS {name}").execute(
            connection=connection,
            commit=False,
            parameters={"name": psycopg.sql.Identifier(role_name)},
        )

    role_names = ", ".join(f"{r.name}_{suffix}" if suffix else r.name for r in target_roles)
    logger.info(f"Roles dropped: {role_names}.")
    if commit:
        if feedback:
            feedback.lock_cancellation()
        connection.commit()

grant_permissions

grant_permissions(connection: Connection, commit: bool = False, feedback: Optional[Feedback] = None) -> None

Grant permissions to the roles in the database. Args: connection: The database connection to execute the SQL statements. commit: Whether to commit the transaction. Defaults to False. feedback: Optional feedback object for progress reporting.

Source code in pum/role_manager.py
def grant_permissions(
    self,
    connection: psycopg.Connection,
    commit: bool = False,
    feedback: Optional["Feedback"] = None,
) -> None:
    """Grant permissions to the roles in the database.
    Args:
        connection: The database connection to execute the SQL statements.
        commit: Whether to commit the transaction. Defaults to False.
        feedback: Optional feedback object for progress reporting.
    """
    roles_list = list(self.roles.values())
    for role in roles_list:
        if feedback and feedback.is_cancelled():
            from .exceptions import PumException

            raise PumException("Permission grant cancelled by user")
        if feedback:
            feedback.increment_step()
            feedback.report_progress(f"Granting permissions to role: {role.name}")
        for permission in role.permissions():
            permission.grant(
                role=role.name, connection=connection, commit=False, feedback=feedback
            )
    role_names = ", ".join(r.name for r in roles_list)
    logger.info(f"Permissions granted to roles: {role_names}.")
    if commit:
        if feedback:
            feedback.lock_cancellation()
        connection.commit()

grant_to

grant_to(connection: Connection, *, to: str, roles: list[str] | None = None, suffix: str | None = None, commit: bool = False, feedback: Optional[Feedback] = None) -> None

Grant configured roles to a database user.

Executes GRANT <role> TO <to> for each selected role, making to a member of those roles so it inherits their permissions.

When suffix is provided the suffixed role names are used (e.g. tww_viewer_lausanne). Otherwise the generic names are used.

Parameters:

Name Type Description Default
connection Connection

The database connection to execute the SQL statements.

required
to str

The target database role that will receive membership.

required
roles list[str] | None

Optional list of configured role names to grant. When None (default), all configured roles are granted.

None
suffix str | None

Optional suffix identifying DB-specific roles.

None
commit bool

Whether to commit the transaction. Defaults to False.

False
feedback Optional[Feedback]

Optional feedback object for progress reporting.

None
Version Added

1.5.0

Source code in pum/role_manager.py
def grant_to(
    self,
    connection: psycopg.Connection,
    *,
    to: str,
    roles: list[str] | None = None,
    suffix: str | None = None,
    commit: bool = False,
    feedback: Optional["Feedback"] = None,
) -> None:
    """Grant configured roles to a database user.

    Executes ``GRANT <role> TO <to>`` for each selected role, making
    *to* a member of those roles so it inherits their permissions.

    When *suffix* is provided the suffixed role names are used
    (e.g. ``tww_viewer_lausanne``).  Otherwise the generic names are
    used.

    Args:
        connection: The database connection to execute the SQL statements.
        to: The target database role that will receive membership.
        roles: Optional list of configured role names to grant.
            When ``None`` (default), all configured roles are granted.
        suffix: Optional suffix identifying DB-specific roles.
        commit: Whether to commit the transaction. Defaults to False.
        feedback: Optional feedback object for progress reporting.

    Version Added:
        1.5.0
    """
    target_roles = self._resolve_roles(roles)

    for role in target_roles:
        if feedback and feedback.is_cancelled():
            raise PumException("Grant-to cancelled by user")

        role_name = f"{role.name}_{suffix}" if suffix else role.name

        if feedback:
            feedback.increment_step()
            feedback.report_progress(f"Granting {role_name} to {to}")

        logger.debug(f"Granting role {role_name} to {to}.")
        SqlContent("GRANT {role} TO {target}").execute(
            connection=connection,
            commit=False,
            parameters={
                "role": psycopg.sql.Identifier(role_name),
                "target": psycopg.sql.Identifier(to),
            },
        )

    role_names = ", ".join(f"{r.name}_{suffix}" if suffix else r.name for r in target_roles)
    logger.info(f"Roles granted to {to}: {role_names}.")
    if commit:
        if feedback:
            feedback.lock_cancellation()
        connection.commit()

login_roles staticmethod

login_roles(connection: Connection) -> list[str]

Return the names of all login roles that are not superusers.

This is useful for listing roles that can be granted membership in module roles. System roles (pg_*) are excluded.

Parameters:

Name Type Description Default
connection Connection

The database connection.

required

Returns:

Type Description
list[str]

Sorted list of login role names.

Version Added

1.5.0

Source code in pum/role_manager.py
@staticmethod
def login_roles(connection: psycopg.Connection) -> list[str]:
    """Return the names of all login roles that are not superusers.

    This is useful for listing roles that can be granted membership
    in module roles.  System roles (``pg_*``) are excluded.

    Args:
        connection: The database connection.

    Returns:
        Sorted list of login role names.

    Version Added:
        1.5.0
    """
    with connection.transaction():
        cursor = connection.cursor()
        cursor.execute(
            "SELECT rolname FROM pg_roles "
            "WHERE rolcanlogin AND NOT rolsuper AND rolname NOT LIKE 'pg\\_%' "
            "ORDER BY rolname"
        )
        return [row[0] for row in cursor.fetchall()]

members_of staticmethod

members_of(connection: Connection, role_name: str) -> list[str]

Return the login role names that are members of role_name.

Parameters:

Name Type Description Default
connection Connection

The database connection.

required
role_name str

The group role whose members to look up.

required

Returns:

Type Description
list[str]

Sorted list of member login role names.

Version Added

1.5.4

Source code in pum/role_manager.py
@staticmethod
def members_of(connection: psycopg.Connection, role_name: str) -> list[str]:
    """Return the login role names that are members of *role_name*.

    Args:
        connection: The database connection.
        role_name: The group role whose members to look up.

    Returns:
        Sorted list of member login role names.

    Version Added:
        1.5.4
    """
    with connection.transaction():
        cursor = connection.cursor()
        cursor.execute(
            "SELECT m.rolname "
            "FROM pg_auth_members am "
            "JOIN pg_roles r ON r.oid = am.roleid "
            "JOIN pg_roles m ON m.oid = am.member "
            "WHERE r.rolname = %s AND m.rolcanlogin "
            "ORDER BY m.rolname",
            (role_name,),
        )
        return [row[0] for row in cursor.fetchall()]

memberships_of staticmethod

memberships_of(connection: Connection, user_name: str) -> list[str]

Return the role names that user_name is a member of.

This is the inverse of :meth:members_of: instead of asking "who belongs to this role?", it asks "which roles does this user belong to?".

Parameters:

Name Type Description Default
connection Connection

The database connection.

required
user_name str

The login role whose memberships to look up.

required

Returns:

Type Description
list[str]

Sorted list of role names the user is a member of.

Version Added

1.5.0

Source code in pum/role_manager.py
@staticmethod
def memberships_of(connection: psycopg.Connection, user_name: str) -> list[str]:
    """Return the role names that *user_name* is a member of.

    This is the inverse of :meth:`members_of`: instead of asking
    "who belongs to this role?", it asks "which roles does this user
    belong to?".

    Args:
        connection: The database connection.
        user_name: The login role whose memberships to look up.

    Returns:
        Sorted list of role names the user is a member of.

    Version Added:
        1.5.0
    """
    with connection.transaction():
        cursor = connection.cursor()
        cursor.execute(
            "SELECT r.rolname "
            "FROM pg_auth_members am "
            "JOIN pg_roles r ON r.oid = am.roleid "
            "JOIN pg_roles m ON m.oid = am.member "
            "WHERE m.rolname = %s "
            "ORDER BY r.rolname",
            (user_name,),
        )
        return [row[0] for row in cursor.fetchall()]

revoke_from

revoke_from(connection: Connection, *, from_role: str, roles: list[str] | None = None, suffix: str | None = None, commit: bool = False, feedback: Optional[Feedback] = None) -> None

Revoke configured roles from a database user.

Executes REVOKE <role> FROM <from_role> for each selected role, removing from_role's membership.

When suffix is provided the suffixed role names are used. Otherwise the generic names are used.

Parameters:

Name Type Description Default
connection Connection

The database connection to execute the SQL statements.

required
from_role str

The target database role to revoke membership from.

required
roles list[str] | None

Optional list of configured role names to revoke. When None (default), all configured roles are revoked.

None
suffix str | None

Optional suffix identifying DB-specific roles.

None
commit bool

Whether to commit the transaction. Defaults to False.

False
feedback Optional[Feedback]

Optional feedback object for progress reporting.

None
Version Added

1.5.0

Source code in pum/role_manager.py
def revoke_from(
    self,
    connection: psycopg.Connection,
    *,
    from_role: str,
    roles: list[str] | None = None,
    suffix: str | None = None,
    commit: bool = False,
    feedback: Optional["Feedback"] = None,
) -> None:
    """Revoke configured roles from a database user.

    Executes ``REVOKE <role> FROM <from_role>`` for each selected
    role, removing *from_role*'s membership.

    When *suffix* is provided the suffixed role names are used.
    Otherwise the generic names are used.

    Args:
        connection: The database connection to execute the SQL statements.
        from_role: The target database role to revoke membership from.
        roles: Optional list of configured role names to revoke.
            When ``None`` (default), all configured roles are revoked.
        suffix: Optional suffix identifying DB-specific roles.
        commit: Whether to commit the transaction. Defaults to False.
        feedback: Optional feedback object for progress reporting.

    Version Added:
        1.5.0
    """
    target_roles = self._resolve_roles(roles)

    for role in target_roles:
        if feedback and feedback.is_cancelled():
            raise PumException("Revoke-from cancelled by user")

        role_name = f"{role.name}_{suffix}" if suffix else role.name

        if feedback:
            feedback.increment_step()
            feedback.report_progress(f"Revoking {role_name} from {from_role}")

        logger.debug(f"Revoking role {role_name} from {from_role}.")
        SqlContent("REVOKE {role} FROM {target}").execute(
            connection=connection,
            commit=False,
            parameters={
                "role": psycopg.sql.Identifier(role_name),
                "target": psycopg.sql.Identifier(from_role),
            },
        )

    role_names = ", ".join(f"{r.name}_{suffix}" if suffix else r.name for r in target_roles)
    logger.info(f"Roles revoked from {from_role}: {role_names}.")
    if commit:
        if feedback:
            feedback.lock_cancellation()
        connection.commit()

revoke_permissions

revoke_permissions(connection: Connection, *, roles: list[str] | None = None, suffix: str | None = None, commit: bool = False, feedback: Optional[Feedback] = None) -> None

Revoke previously granted permissions from roles.

When suffix is provided, permissions are revoked from the DB-specific (suffixed) roles only. Otherwise they are revoked from the generic roles.

When roles is provided only those configured roles are acted on; otherwise all configured roles are affected.

Parameters:

Name Type Description Default
connection Connection

The database connection to execute the SQL statements.

required
roles list[str] | None

Optional list of configured role names to revoke. When None (default), all configured roles are revoked.

None
suffix str | None

Optional suffix identifying DB-specific roles.

None
commit bool

Whether to commit the transaction. Defaults to False.

False
feedback Optional[Feedback]

Optional feedback object for progress reporting.

None
Version Added

1.5.0

Source code in pum/role_manager.py
def revoke_permissions(
    self,
    connection: psycopg.Connection,
    *,
    roles: list[str] | None = None,
    suffix: str | None = None,
    commit: bool = False,
    feedback: Optional["Feedback"] = None,
) -> None:
    """Revoke previously granted permissions from roles.

    When *suffix* is provided, permissions are revoked from the
    DB-specific (suffixed) roles only.  Otherwise they are revoked
    from the generic roles.

    When *roles* is provided only those configured roles are acted
    on; otherwise all configured roles are affected.

    Args:
        connection: The database connection to execute the SQL statements.
        roles: Optional list of configured role names to revoke.
            When ``None`` (default), all configured roles are revoked.
        suffix: Optional suffix identifying DB-specific roles.
        commit: Whether to commit the transaction. Defaults to False.
        feedback: Optional feedback object for progress reporting.

    Version Added:
        1.5.0
    """
    target_roles = self._resolve_roles(roles)
    for role in target_roles:
        if feedback and feedback.is_cancelled():
            raise PumException("Permission revoke cancelled by user")

        role_name = f"{role.name}_{suffix}" if suffix else role.name

        if feedback:
            feedback.increment_step()
            feedback.report_progress(f"Revoking permissions from role: {role_name}")

        for perm in role.permissions():
            for schema in perm.schemas or []:
                # Check if schema exists before revoking
                cursor = SqlContent(
                    "SELECT 1 FROM pg_namespace WHERE nspname = {schema}"
                ).execute(
                    connection=connection,
                    commit=False,
                    parameters={"schema": psycopg.sql.Literal(schema)},
                )
                if not cursor._pum_results or not cursor._pum_results[0]:
                    logger.warning(
                        f"Schema {schema} does not exist; skipping revoke for role {role_name}."
                    )
                    continue

                logger.debug(
                    f"Revoking {perm.type.value} permission on schema {schema} from role {role_name}."
                )
                if perm.type == PermissionType.READ:
                    SqlContent("""
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE SELECT, REFERENCES, TRIGGER ON TABLES FROM {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE SELECT ON SEQUENCES FROM {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE EXECUTE ON FUNCTIONS FROM {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE EXECUTE ON ROUTINES FROM {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE USAGE ON TYPES FROM {role};
                        REVOKE EXECUTE ON ALL ROUTINES IN SCHEMA {schema} FROM {role};
                        REVOKE EXECUTE ON ALL FUNCTIONS IN SCHEMA {schema} FROM {role};
                        REVOKE USAGE, SELECT ON ALL SEQUENCES IN SCHEMA {schema} FROM {role};
                        REVOKE SELECT, REFERENCES, TRIGGER ON ALL TABLES IN SCHEMA {schema} FROM {role};
                        REVOKE USAGE ON SCHEMA {schema} FROM {role};
                    """).execute(
                        connection=connection,
                        commit=False,
                        parameters={
                            "schema": psycopg.sql.Identifier(schema),
                            "role": psycopg.sql.Identifier(role_name),
                        },
                    )
                elif perm.type == PermissionType.WRITE:
                    SqlContent("""
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE ALL ON TABLES FROM {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE ALL ON SEQUENCES FROM {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE ALL ON FUNCTIONS FROM {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE ALL ON ROUTINES FROM {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} REVOKE ALL ON TYPES FROM {role};
                        REVOKE ALL ON ALL ROUTINES IN SCHEMA {schema} FROM {role};
                        REVOKE ALL ON ALL FUNCTIONS IN SCHEMA {schema} FROM {role};
                        REVOKE ALL ON ALL SEQUENCES IN SCHEMA {schema} FROM {role};
                        REVOKE ALL ON ALL TABLES IN SCHEMA {schema} FROM {role};
                        REVOKE ALL ON SCHEMA {schema} FROM {role};
                    """).execute(
                        connection=connection,
                        commit=False,
                        parameters={
                            "schema": psycopg.sql.Identifier(schema),
                            "role": psycopg.sql.Identifier(role_name),
                        },
                    )

    # Revoke role memberships so inherited privileges are removed too.
    # E.g. if generic is a member of specific (GRANT specific TO generic),
    # we need to REVOKE specific FROM generic for the revoke to be effective.
    for role in target_roles:
        role_name = f"{role.name}_{suffix}" if suffix else role.name

        for parent_role in RoleManager.memberships_of(
            connection=connection, user_name=role_name
        ):
            logger.debug(f"Revoking membership: REVOKE {parent_role} FROM {role_name}")
            SqlContent("REVOKE {parent} FROM {member}").execute(
                connection=connection,
                commit=False,
                parameters={
                    "parent": psycopg.sql.Identifier(parent_role),
                    "member": psycopg.sql.Identifier(role_name),
                },
            )

    role_names = ", ".join(f"{r.name}_{suffix}" if suffix else r.name for r in target_roles)
    logger.info(f"Permissions revoked from roles: {role_names}.")
    if commit:
        if feedback:
            feedback.lock_cancellation()
        connection.commit()

roles_inventory

roles_inventory(connection: Connection, *, include_superusers: bool = False) -> RoleInventory

List all database roles related to the module's configured schemas.

Returns the module's generic roles, any DB-specific (suffixed) variants discovered via naming convention, and any other database roles that have access to the configured schemas. For every role the method reports which schemas it can read or write, whether it is a superuser, and whether it can log in.

Parameters:

Name Type Description Default
connection Connection

The database connection to use.

required
include_superusers bool

When True, superusers are included in the results. Defaults to False because superusers implicitly have access to everything.

False

Returns:

Type Description
RoleInventory

A RoleInventory containing the discovered roles.

Version Added

1.5.0

Source code in pum/role_manager.py
def roles_inventory(
    self,
    connection: psycopg.Connection,
    *,
    include_superusers: bool = False,
) -> "RoleInventory":
    """List all database roles related to the module's configured schemas.

    Returns the module's generic roles, any DB-specific (suffixed)
    variants discovered via naming convention, and any other
    database roles that have access to the configured schemas.
    For every role the method reports which schemas it can read or
    write, whether it is a superuser, and whether it can log in.

    Args:
        connection: The database connection to use.
        include_superusers: When ``True``, superusers are included in
            the results.  Defaults to ``False`` because superusers
            implicitly have access to everything.

    Returns:
        A ``RoleInventory`` containing the discovered roles.

    Version Added:
        1.5.0
    """
    configured_schemas = set()
    for role in self.roles.values():
        for perm in role.permissions():
            if perm.schemas:
                configured_schemas.update(perm.schemas)

    with connection.transaction():
        cursor = connection.cursor()

        # Discover all roles matching each configured name or <name>_*
        role_statuses: list[RoleStatus] = []
        known_names: set[str] = set()
        for role in self.roles.values():
            # Find the generic role and any suffixed variants
            cursor.execute(
                "SELECT rolname FROM pg_roles WHERE rolname = %s OR rolname LIKE %s ORDER BY rolname",
                (role.name, f"{role.name}\\_%"),
            )
            found_names = [row[0] for row in cursor.fetchall()]

            for name in found_names:
                role_statuses.append(
                    _build_role_status(connection, name, role, configured_schemas)
                )
                known_names.add(name)

        # Discover unknown roles with privileges on the configured schemas
        unknown_roles = _find_unknown_roles(
            connection,
            configured_schemas,
            known_names=known_names,
            include_superusers=include_superusers,
        )

        # Discover login roles that have no access to the configured schemas
        all_known = known_names | {r.name for r in unknown_roles}
        other_login = _find_other_login_roles(connection, configured_schemas, all_known)

        return RoleInventory(
            roles=role_statuses + unknown_roles,
            expected_roles=list(self.roles.keys()),
            other_login_roles=other_login,
        )