class Permission:
"""Class to represent a permission for a database role.
Attributes:
type: Type of permission (read or write).
schemas: List of schemas this permission applies to.
"""
def __init__(self, type: PermissionType | str, schemas: list[str] = None) -> None:
if not isinstance(type, PermissionType):
type = PermissionType(type)
self.type = type
self.schemas = schemas
def grant(
self,
role: str,
connection: psycopg.Connection,
commit: bool = False,
) -> None:
"""Grant the permission to the specified role.
Args:
role: The name of the role to grant the permission to.
connection: The database connection to execute the SQL statements.
commit: Whether to commit the transaction. Defaults to False.
"""
if not isinstance(role, str):
raise TypeError("Role must be a string.")
if not self.schemas:
raise ValueError("Schemas must be defined for the permission.")
for schema in self.schemas:
logger.info(f"Granting {self.type.value} permission on schema {schema} to role {role}.")
if self.type == PermissionType.READ:
SqlContent("""
GRANT USAGE ON SCHEMA {schema} TO {role};
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA {schema} TO {role};
GRANT SELECT, REFERENCES, TRIGGER ON ALL TABLES IN SCHEMA {schema} TO {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT SELECT, REFERENCES, TRIGGER ON TABLES TO {role};
""").execute(
connection=connection,
commit=False,
parameters={
"schema": psycopg.sql.Identifier(schema),
"role": psycopg.sql.Identifier(role),
},
)
elif self.type == PermissionType.WRITE:
SqlContent("""
GRANT ALL ON SCHEMA {schema} TO {role};
GRANT ALL ON ALL TABLES IN SCHEMA {schema} TO {role};
GRANT ALL ON ALL SEQUENCES IN SCHEMA {schema} TO {role};
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON TABLES TO {role};
""").execute(
connection=connection,
commit=False,
parameters={
"schema": psycopg.sql.Identifier(schema),
"role": psycopg.sql.Identifier(role),
},
)
else:
raise ValueError(f"Unknown permission type: {self.type}")
if commit:
connection.commit()