Skip to content

SqlContent

Class to handle SQL content preparation and execution.

Source code in pum/sql_content.py
class SqlContent:
    """Class to handle SQL content preparation and execution."""

    def __init__(self, sql: str | psycopg.sql.SQL | Path) -> None:
        """Initialize the SqlContent class.

        Args:
            sql: The SQL statement to execute or a path to a SQL file.

        """
        self.sql = sql

    def validate(self, parameters: dict | None) -> bool:
        """Validate the SQL content.
        This is done by checking if the SQL content is not empty.

        Args:
            parameters: The parameters to pass to the SQL files.

        Returns:
            bool: True if valid, False otherwise.

        """
        if not self.sql:
            raise PumSqlError("SQL content is empty.")
        self._prepare_sql(parameters)
        return True

    def execute(
        self,
        connection: psycopg.Connection,
        *,
        parameters: dict | None = None,
        commit: bool = False,
    ) -> psycopg.Cursor:
        """Execute a SQL statement with optional parameters.

        Args:
            connection: The database connection to execute the SQL statement.
            parameters: Parameters to bind to the SQL statement. Defaults to ().
            commit: Whether to commit the transaction. Defaults to False.

        """
        cursor = connection.cursor()

        for sql_code in self._prepare_sql(parameters):
            try:
                statement = sql_code.as_string(connection)
            except (psycopg.errors.SyntaxError, psycopg.errors.ProgrammingError) as e:
                raise PumSqlError(
                    f"SQL preparation failed for the following code: {statement} {e}"
                ) from e
            try:
                logger.debug(f"Executing SQL statement: {statement}")
                cursor.execute(statement)
            except (psycopg.errors.SyntaxError, psycopg.errors.ProgrammingError) as e:
                raise PumSqlError(
                    f"SQL execution failed for the following code: {statement} {e}"
                ) from e
        if commit:
            connection.commit()

        return cursor

    def _prepare_sql(self, parameters: dict | None) -> list[psycopg.sql.SQL]:
        """Prepare SQL for execution.

        Args:
            sql: The SQL statement to execute or a path to a SQL file.
            parameters: Parameters to bind to the SQL statement. Defaults to ().

        Returns:
            list: A list of prepared SQL statements.

        Raises:
            PumSqlError: If SQL preparation fails.

        """
        if isinstance(self.sql, Path):
            logger.info(
                f"Checking SQL from file: {self.sql} with parameters: {parameters}",
            )
            sql_code = sql_chunks_from_file(self.sql)
        elif isinstance(self.sql, str):
            sql_code = [psycopg.sql.SQL(self.sql)]
        else:
            sql_code = [self.sql]

        def format_sql(
            statement: psycopg.sql.SQL, parameters: dict | None = None
        ) -> psycopg.sql.SQL:
            for key, value in (parameters or {}).items():
                if (
                    not isinstance(value, psycopg.sql.Literal)
                    and not isinstance(value, psycopg.sql.Identifier)
                    and not isinstance(value, psycopg.sql.Composed)
                ):
                    raise PumSqlError(
                        f"Invalid parameter type for key '{key}': {type(value)}. "
                        "Parameters must be psycopg.sql.Literal or psycopg.sql.Identifier."
                    )
            try:
                return statement.format(**parameters)
            except TypeError:
                # if parameters is None, we can ignore this error
                return statement
            except KeyError as e:
                raise PumSqlError(
                    f"SQL preparation failed for the following code: missing parameter: {statement} {e}"
                ) from e

        return [format_sql(statement, parameters) for statement in sql_code]

    @staticmethod
    def prepare_parameters(parameters: dict | None):
        """
        Prepares a dictionary of parameters for use in SQL queries by converting each value to a psycopg.sql.Literal.

        Args:
            parameters: A dictionary of parameters to be converted, or None.

        Returns:
            dict: A new dictionary with the same keys as `parameters`, where each value is wrapped in psycopg.sql.Literal.
        """
        parameters_literals = {}
        if parameters:
            for key, value in parameters.items():
                parameters_literals[key] = psycopg.sql.Literal(value)
        return parameters_literals

__init__

__init__(sql: str | SQL | Path) -> None

Initialize the SqlContent class.

Parameters:

Name Type Description Default
sql str | SQL | Path

The SQL statement to execute or a path to a SQL file.

required
Source code in pum/sql_content.py
def __init__(self, sql: str | psycopg.sql.SQL | Path) -> None:
    """Initialize the SqlContent class.

    Args:
        sql: The SQL statement to execute or a path to a SQL file.

    """
    self.sql = sql

execute

execute(connection: Connection, *, parameters: dict | None = None, commit: bool = False) -> psycopg.Cursor

Execute a SQL statement with optional parameters.

Parameters:

Name Type Description Default
connection Connection

The database connection to execute the SQL statement.

required
parameters dict | None

Parameters to bind to the SQL statement. Defaults to ().

None
commit bool

Whether to commit the transaction. Defaults to False.

False
Source code in pum/sql_content.py
def execute(
    self,
    connection: psycopg.Connection,
    *,
    parameters: dict | None = None,
    commit: bool = False,
) -> psycopg.Cursor:
    """Execute a SQL statement with optional parameters.

    Args:
        connection: The database connection to execute the SQL statement.
        parameters: Parameters to bind to the SQL statement. Defaults to ().
        commit: Whether to commit the transaction. Defaults to False.

    """
    cursor = connection.cursor()

    for sql_code in self._prepare_sql(parameters):
        try:
            statement = sql_code.as_string(connection)
        except (psycopg.errors.SyntaxError, psycopg.errors.ProgrammingError) as e:
            raise PumSqlError(
                f"SQL preparation failed for the following code: {statement} {e}"
            ) from e
        try:
            logger.debug(f"Executing SQL statement: {statement}")
            cursor.execute(statement)
        except (psycopg.errors.SyntaxError, psycopg.errors.ProgrammingError) as e:
            raise PumSqlError(
                f"SQL execution failed for the following code: {statement} {e}"
            ) from e
    if commit:
        connection.commit()

    return cursor

prepare_parameters staticmethod

prepare_parameters(parameters: dict | None)

Prepares a dictionary of parameters for use in SQL queries by converting each value to a psycopg.sql.Literal.

Parameters:

Name Type Description Default
parameters dict | None

A dictionary of parameters to be converted, or None.

required

Returns:

Name Type Description
dict

A new dictionary with the same keys as parameters, where each value is wrapped in psycopg.sql.Literal.

Source code in pum/sql_content.py
@staticmethod
def prepare_parameters(parameters: dict | None):
    """
    Prepares a dictionary of parameters for use in SQL queries by converting each value to a psycopg.sql.Literal.

    Args:
        parameters: A dictionary of parameters to be converted, or None.

    Returns:
        dict: A new dictionary with the same keys as `parameters`, where each value is wrapped in psycopg.sql.Literal.
    """
    parameters_literals = {}
    if parameters:
        for key, value in parameters.items():
            parameters_literals[key] = psycopg.sql.Literal(value)
    return parameters_literals

validate

validate(parameters: dict | None) -> bool

Validate the SQL content. This is done by checking if the SQL content is not empty.

Parameters:

Name Type Description Default
parameters dict | None

The parameters to pass to the SQL files.

required

Returns:

Name Type Description
bool bool

True if valid, False otherwise.

Source code in pum/sql_content.py
def validate(self, parameters: dict | None) -> bool:
    """Validate the SQL content.
    This is done by checking if the SQL content is not empty.

    Args:
        parameters: The parameters to pass to the SQL files.

    Returns:
        bool: True if valid, False otherwise.

    """
    if not self.sql:
        raise PumSqlError("SQL content is empty.")
    self._prepare_sql(parameters)
    return True