class Permission:
"""Class to represent a permission for a database role.
Attributes:
type: Type of permission (read or write).
schemas: List of schemas this permission applies to.
"""
def __init__(self, type: PermissionType | str, schemas: list[str] = None) -> None:
if not isinstance(type, PermissionType):
type = PermissionType(type)
self.type = type
self.schemas = schemas
def grant(
self,
role: str,
connection: psycopg.Connection,
commit: bool = False,
feedback: Optional["Feedback"] = None,
) -> None:
"""Grant the permission to the specified role.
Args:
role: The name of the role to grant the permission to.
connection: The database connection to execute the SQL statements.
commit: Whether to commit the transaction. Defaults to False.
feedback: Optional feedback object for progress reporting.
"""
if not isinstance(role, str):
raise TypeError("Role must be a string.")
if not self.schemas:
raise ValueError("Schemas must be defined for the permission.")
for schema in self.schemas:
if feedback and feedback.is_cancelled():
raise PumException("Permission grant cancelled by user")
# Detect if schema exists; if not, warn and continue
cursor = SqlContent("SELECT 1 FROM pg_namespace WHERE nspname = {schema}").execute(
connection=connection,
commit=False,
parameters={"schema": psycopg.sql.Literal(schema)},
)
if not cursor._pum_results or not cursor._pum_results[0]:
logger.warning(
f"Schema {schema} does not exist; skipping grant of {self.type.value} "
f"permission to role {role}."
)
continue
logger.debug(
f"Granting {self.type.value} permission on schema {schema} to role {role}."
)
if self.type == PermissionType.READ:
SqlContent("""
GRANT USAGE ON SCHEMA {schema} TO {role};
GRANT SELECT, REFERENCES, TRIGGER ON ALL TABLES IN SCHEMA {schema} TO {role};
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA {schema} TO {role};
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA {schema} TO {role};
GRANT EXECUTE ON ALL ROUTINES IN SCHEMA {schema} TO {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT SELECT, REFERENCES, TRIGGER ON TABLES TO {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT SELECT ON SEQUENCES TO {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT EXECUTE ON FUNCTIONS TO {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT EXECUTE ON ROUTINES TO {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT USAGE ON TYPES TO {role};
""").execute(
connection=connection,
commit=False,
parameters={
"schema": psycopg.sql.Identifier(schema),
"role": psycopg.sql.Identifier(role),
},
)
# Grant permissions on existing types
self._grant_existing_types(connection, schema, role, "USAGE")
elif self.type == PermissionType.WRITE:
SqlContent("""
GRANT ALL ON SCHEMA {schema} TO {role};
GRANT ALL ON ALL TABLES IN SCHEMA {schema} TO {role};
GRANT ALL ON ALL SEQUENCES IN SCHEMA {schema} TO {role};
GRANT ALL ON ALL FUNCTIONS IN SCHEMA {schema} TO {role};
GRANT ALL ON ALL ROUTINES IN SCHEMA {schema} TO {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON TABLES TO {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON SEQUENCES TO {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON FUNCTIONS TO {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON ROUTINES TO {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON TYPES TO {role};
""").execute(
connection=connection,
commit=False,
parameters={
"schema": psycopg.sql.Identifier(schema),
"role": psycopg.sql.Identifier(role),
},
)
# Grant permissions on existing types
self._grant_existing_types(connection, schema, role, "ALL")
else:
raise ValueError(f"Unknown permission type: {self.type}")
if commit:
if feedback:
feedback.lock_cancellation()
connection.commit()
def _grant_existing_types(
self, connection: psycopg.Connection, schema: str, role: str, privilege: str
) -> None:
"""Grant permissions on all existing types in a schema.
Args:
connection: The database connection.
schema: The schema name.
role: The role name.
privilege: The privilege to grant (USAGE or ALL).
"""
# Query for all types in the schema (excluding array types and internal types)
cursor = connection.cursor()
cursor.execute(
"""
SELECT t.typname
FROM pg_type t
JOIN pg_namespace n ON t.typnamespace = n.oid
WHERE n.nspname = %s
AND t.typtype IN ('e', 'c', 'd', 'b', 'r') -- enum, composite, domain, base, range
AND t.typname NOT LIKE '_%%' -- exclude array types
""",
(schema,),
)
types = cursor.fetchall()
# Grant permissions on each type
for (type_name,) in types:
grant_sql = psycopg.sql.SQL(
"GRANT {privilege} ON TYPE {schema}.{type_name} TO {role}"
).format(
privilege=psycopg.sql.SQL(privilege),
schema=psycopg.sql.Identifier(schema),
type_name=psycopg.sql.Identifier(type_name),
role=psycopg.sql.Identifier(role),
)
cursor.execute(grant_sql)
def __repr__(self) -> str:
"""Return a string representation of the Permission object."""
return f"<Permission: {self.type.value} on {self.schemas}>"