5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / patched_app.py PY
"""
CVE-2026-26198 — Patched Implementation

This module demonstrates the fix: validate column names against the model's
known fields BEFORE passing them to any SQL construction.

The fix applies the same validation that sum()/avg() already had to min()
and max(), ensuring consistency across all aggregate methods.

Two approaches are shown:
1. Whitelist validation (recommended, simple)
2. Parameterized column reference (defense-in-depth)
"""

import re
import sqlite3


class ColumnValidationError(Exception):
    """Raised when a column name fails validation."""


class PatchedQuerySet:
    """
    Fixed QuerySet that validates all column names before query construction.

    The key change: every aggregate method now calls _validate_column()
    before the column name can reach any SQL expression.
    """

    def __init__(self, db_path: str, table_name: str, model_fields: list[str]):
        self.db_path = db_path
        self.table_name = table_name
        self.model_fields = model_fields

    def _execute(self, sql: str) -> any:
        conn = sqlite3.connect(self.db_path)
        try:
            cursor = conn.execute(sql)
            row = cursor.fetchone()
            return row[0] if row else None
        finally:
            conn.close()

    def _validate_column(self, column: str) -> str:
        """
        Validate that the column name refers to an actual model field.

        This is the core of the fix. The approach is a WHITELIST: we only
        allow column names that exist in the model's field definitions.

        Why whitelist instead of blacklist?
        - Blacklisting SQL keywords is brittle (new keywords, encoding tricks)
        - Whitelisting is simple and definitive: if it's not a known field, reject it
        - This matches what Ormar's sum()/avg() already did with is_numeric

        Additional hardening: we also reject anything that doesn't look like
        a valid identifier, as a defense-in-depth measure.
        """
        # Strip optional table prefix (Ormar uses "tablename__fieldname" syntax)
        parts = column.split("__")
        field_name = parts[-1]

        # Defense-in-depth: column names must be valid SQL identifiers
        # This catches injection attempts even if the whitelist is misconfigured
        if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', field_name):
            raise ColumnValidationError(
                f"Invalid column name: {column!r}. "
                f"Column names must be valid identifiers."
            )

        # Primary defense: whitelist against known model fields
        if field_name not in self.model_fields:
            raise ColumnValidationError(
                f"Unknown column: {column!r}. "
                f"Valid columns are: {', '.join(self.model_fields)}"
            )

        return field_name

    # --- ALL aggregate methods now validate before query construction ---

    def max(self, column: str) -> any:
        """
        PATCHED: Validates column name before building the query.

        The fix ensures that only known field names can reach the SQL layer.
        Injection payloads like "(SELECT password FROM users)" are rejected
        because they don't match any field in the model.
        """
        safe_column = self._validate_column(column)
        sql = f"SELECT max({safe_column}) FROM {self.table_name}"
        return self._execute(sql)

    def min(self, column: str) -> any:
        """PATCHED: Same validation as max()."""
        safe_column = self._validate_column(column)
        sql = f"SELECT min({safe_column}) FROM {self.table_name}"
        return self._execute(sql)

    def sum(self, column: str) -> any:
        """Already had validation, now uses the shared _validate_column()."""
        safe_column = self._validate_column(column)
        sql = f"SELECT sum({safe_column}) FROM {self.table_name}"
        return self._execute(sql)

    def avg(self, column: str) -> any:
        """Already had validation, now uses the shared _validate_column()."""
        safe_column = self._validate_column(column)
        sql = f"SELECT avg({safe_column}) FROM {self.table_name}"
        return self._execute(sql)