Skip to content

Permission

Class to represent a permission for a database role.

Attributes:

Name Type Description
type

Type of permission (read or write).

schemas

List of schemas this permission applies to.

Source code in pum/role_manager.py
class Permission:
    """Class to represent a permission for a database role.

    Attributes:
        type: Type of permission (read or write).
        schemas: List of schemas this permission applies to.
    """

    def __init__(self, type: PermissionType | str, schemas: list[str] = None) -> None:
        if not isinstance(type, PermissionType):
            type = PermissionType(type)
        self.type = type
        self.schemas = schemas

    def grant(
        self,
        role: str,
        connection: psycopg.Connection,
        commit: bool = False,
        feedback: Optional["Feedback"] = None,
    ) -> None:
        """Grant the permission to the specified role.
        Args:
            role: The name of the role to grant the permission to.
            connection: The database connection to execute the SQL statements.
            commit: Whether to commit the transaction. Defaults to False.
            feedback: Optional feedback object for progress reporting.
        """
        if not isinstance(role, str):
            raise TypeError("Role must be a string.")

        if not self.schemas:
            raise ValueError("Schemas must be defined for the permission.")

        for schema in self.schemas:
            if feedback and feedback.is_cancelled():
                raise PumException("Permission grant cancelled by user")

            # Detect if schema exists; if not, warn and continue
            cursor = SqlContent("SELECT 1 FROM pg_namespace WHERE nspname = {schema}").execute(
                connection=connection,
                commit=False,
                parameters={"schema": psycopg.sql.Literal(schema)},
            )
            if not cursor._pum_results or not cursor._pum_results[0]:
                logger.warning(
                    f"Schema {schema} does not exist; skipping grant of {self.type.value} "
                    f"permission to role {role}."
                )
                continue

            logger.debug(
                f"Granting {self.type.value} permission on schema {schema} to role {role}."
            )
            if self.type == PermissionType.READ:
                SqlContent("""
                        GRANT USAGE ON SCHEMA {schema} TO {role};
                        GRANT SELECT, REFERENCES, TRIGGER ON ALL TABLES IN SCHEMA {schema} TO {role};
                        GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA {schema} TO {role};
                        GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA {schema} TO {role};
                        GRANT EXECUTE ON ALL ROUTINES IN SCHEMA {schema} TO {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT SELECT, REFERENCES, TRIGGER ON TABLES TO {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT SELECT ON SEQUENCES TO {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT EXECUTE ON FUNCTIONS TO {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT EXECUTE ON ROUTINES TO {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT USAGE ON TYPES TO {role};
                           """).execute(
                    connection=connection,
                    commit=False,
                    parameters={
                        "schema": psycopg.sql.Identifier(schema),
                        "role": psycopg.sql.Identifier(role),
                    },
                )
                # Grant permissions on existing types
                self._grant_existing_types(connection, schema, role, "USAGE")
            elif self.type == PermissionType.WRITE:
                SqlContent("""
                        GRANT ALL ON SCHEMA {schema} TO {role};
                        GRANT ALL ON ALL TABLES IN SCHEMA {schema} TO {role};
                        GRANT ALL ON ALL SEQUENCES IN SCHEMA {schema} TO {role};
                        GRANT ALL ON ALL FUNCTIONS IN SCHEMA {schema} TO {role};
                        GRANT ALL ON ALL ROUTINES IN SCHEMA {schema} TO {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON TABLES TO {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON SEQUENCES TO {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON FUNCTIONS TO {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON ROUTINES TO {role};
                        ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON TYPES TO {role};
                           """).execute(
                    connection=connection,
                    commit=False,
                    parameters={
                        "schema": psycopg.sql.Identifier(schema),
                        "role": psycopg.sql.Identifier(role),
                    },
                )
                # Grant permissions on existing types
                self._grant_existing_types(connection, schema, role, "ALL")
            else:
                raise ValueError(f"Unknown permission type: {self.type}")

        if commit:
            if feedback:
                feedback.lock_cancellation()
            connection.commit()

    def _grant_existing_types(
        self, connection: psycopg.Connection, schema: str, role: str, privilege: str
    ) -> None:
        """Grant permissions on all existing types in a schema.

        Args:
            connection: The database connection.
            schema: The schema name.
            role: The role name.
            privilege: The privilege to grant (USAGE or ALL).

        """
        # Query for all types in the schema (excluding array types and internal types)
        cursor = connection.cursor()
        cursor.execute(
            """
            SELECT t.typname
            FROM pg_type t
            JOIN pg_namespace n ON t.typnamespace = n.oid
            WHERE n.nspname = %s
              AND t.typtype IN ('e', 'c', 'd', 'b', 'r')  -- enum, composite, domain, base, range
              AND t.typname NOT LIKE '_%%'  -- exclude array types
            """,
            (schema,),
        )
        types = cursor.fetchall()

        # Grant permissions on each type
        for (type_name,) in types:
            grant_sql = psycopg.sql.SQL(
                "GRANT {privilege} ON TYPE {schema}.{type_name} TO {role}"
            ).format(
                privilege=psycopg.sql.SQL(privilege),
                schema=psycopg.sql.Identifier(schema),
                type_name=psycopg.sql.Identifier(type_name),
                role=psycopg.sql.Identifier(role),
            )
            cursor.execute(grant_sql)

    def __repr__(self) -> str:
        """Return a string representation of the Permission object."""
        return f"<Permission: {self.type.value} on {self.schemas}>"

__repr__

__repr__() -> str

Return a string representation of the Permission object.

Source code in pum/role_manager.py
def __repr__(self) -> str:
    """Return a string representation of the Permission object."""
    return f"<Permission: {self.type.value} on {self.schemas}>"

grant

grant(role: str, connection: Connection, commit: bool = False, feedback: Optional[Feedback] = None) -> None

Grant the permission to the specified role. Args: role: The name of the role to grant the permission to. connection: The database connection to execute the SQL statements. commit: Whether to commit the transaction. Defaults to False. feedback: Optional feedback object for progress reporting.

Source code in pum/role_manager.py
def grant(
    self,
    role: str,
    connection: psycopg.Connection,
    commit: bool = False,
    feedback: Optional["Feedback"] = None,
) -> None:
    """Grant the permission to the specified role.
    Args:
        role: The name of the role to grant the permission to.
        connection: The database connection to execute the SQL statements.
        commit: Whether to commit the transaction. Defaults to False.
        feedback: Optional feedback object for progress reporting.
    """
    if not isinstance(role, str):
        raise TypeError("Role must be a string.")

    if not self.schemas:
        raise ValueError("Schemas must be defined for the permission.")

    for schema in self.schemas:
        if feedback and feedback.is_cancelled():
            raise PumException("Permission grant cancelled by user")

        # Detect if schema exists; if not, warn and continue
        cursor = SqlContent("SELECT 1 FROM pg_namespace WHERE nspname = {schema}").execute(
            connection=connection,
            commit=False,
            parameters={"schema": psycopg.sql.Literal(schema)},
        )
        if not cursor._pum_results or not cursor._pum_results[0]:
            logger.warning(
                f"Schema {schema} does not exist; skipping grant of {self.type.value} "
                f"permission to role {role}."
            )
            continue

        logger.debug(
            f"Granting {self.type.value} permission on schema {schema} to role {role}."
        )
        if self.type == PermissionType.READ:
            SqlContent("""
                    GRANT USAGE ON SCHEMA {schema} TO {role};
                    GRANT SELECT, REFERENCES, TRIGGER ON ALL TABLES IN SCHEMA {schema} TO {role};
                    GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA {schema} TO {role};
                    GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA {schema} TO {role};
                    GRANT EXECUTE ON ALL ROUTINES IN SCHEMA {schema} TO {role};
                    ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT SELECT, REFERENCES, TRIGGER ON TABLES TO {role};
                    ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT SELECT ON SEQUENCES TO {role};
                    ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT EXECUTE ON FUNCTIONS TO {role};
                    ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT EXECUTE ON ROUTINES TO {role};
                    ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT USAGE ON TYPES TO {role};
                       """).execute(
                connection=connection,
                commit=False,
                parameters={
                    "schema": psycopg.sql.Identifier(schema),
                    "role": psycopg.sql.Identifier(role),
                },
            )
            # Grant permissions on existing types
            self._grant_existing_types(connection, schema, role, "USAGE")
        elif self.type == PermissionType.WRITE:
            SqlContent("""
                    GRANT ALL ON SCHEMA {schema} TO {role};
                    GRANT ALL ON ALL TABLES IN SCHEMA {schema} TO {role};
                    GRANT ALL ON ALL SEQUENCES IN SCHEMA {schema} TO {role};
                    GRANT ALL ON ALL FUNCTIONS IN SCHEMA {schema} TO {role};
                    GRANT ALL ON ALL ROUTINES IN SCHEMA {schema} TO {role};
                    ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON TABLES TO {role};
                    ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON SEQUENCES TO {role};
                    ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON FUNCTIONS TO {role};
                    ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON ROUTINES TO {role};
                    ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON TYPES TO {role};
                       """).execute(
                connection=connection,
                commit=False,
                parameters={
                    "schema": psycopg.sql.Identifier(schema),
                    "role": psycopg.sql.Identifier(role),
                },
            )
            # Grant permissions on existing types
            self._grant_existing_types(connection, schema, role, "ALL")
        else:
            raise ValueError(f"Unknown permission type: {self.type}")

    if commit:
        if feedback:
            feedback.lock_cancellation()
        connection.commit()