README.md
Rendering markdown...
"""
CVE-2026-26198 — Vulnerable Pattern
This module recreates the vulnerable code pattern from Ormar ORM's aggregate
query handling. The actual vulnerability was in ormar's SelectAction class,
which passed user-supplied column names directly to sqlalchemy.text().
We simulate this pattern using raw SQLAlchemy to demonstrate the issue
without requiring a specific vulnerable version of Ormar to be installed.
DO NOT use this pattern in production code.
"""
import sqlite3
from dataclasses import dataclass
# ---------------------------------------------------------------------------
# Simulated ORM layer (mirrors Ormar's vulnerable internals)
# ---------------------------------------------------------------------------
class VulnerableQuerySet:
"""
Simulates Ormar's QuerySet with the vulnerable min()/max() methods.
The critical flaw: min() and max() accept any string as a column name
and embed it directly into the SQL query without validation. This is
exactly what Ormar's SelectAction.get_text_clause() did:
def get_text_clause(self):
return sqlalchemy.text(f"{alias}{self.field_name}")
# ^^^^^^^^^^^^^^^^
# Unsanitized user input!
"""
def __init__(self, db_path: str, table_name: str, model_fields: list[str]):
self.db_path = db_path
self.table_name = table_name
self.model_fields = model_fields # Known valid fields
def _execute(self, sql: str) -> any:
"""Execute a query and return the scalar result."""
conn = sqlite3.connect(self.db_path)
try:
cursor = conn.execute(sql)
row = cursor.fetchone()
return row[0] if row else None
finally:
conn.close()
# --- VULNERABLE: No validation on column parameter ---
def max(self, column: str) -> any:
"""
VULNERABLE: Passes column directly into SQL without any check.
In Ormar, this went through SelectAction.get_text_clause() which
called sqlalchemy.text(field_name) — a raw SQL text expression.
"""
# This is the dangerous line — column is attacker-controlled
sql = f"SELECT max({column}) FROM {self.table_name}"
return self._execute(sql)
def min(self, column: str) -> any:
"""VULNERABLE: Same issue as max()."""
sql = f"SELECT min({column}) FROM {self.table_name}"
return self._execute(sql)
# --- SAFE: sum() and avg() had validation (partial fix in Ormar) ---
def sum(self, column: str) -> any:
"""
SAFE: Has is_numeric type check (like Ormar's sum/avg).
Ormar's sum() and avg() validated that the field existed and was
numeric. This is the inconsistency that made the bug so sneaky —
developers assumed all aggregate methods had the same protection.
"""
if column not in self.model_fields:
raise ValueError(
f"Column '{column}' is not a valid field. "
f"Available fields: {self.model_fields}"
)
sql = f"SELECT sum({column}) FROM {self.table_name}"
return self._execute(sql)
# ---------------------------------------------------------------------------
# Demo app setup
# ---------------------------------------------------------------------------
def create_demo_database(db_path: str) -> None:
"""
Set up a demo database simulating an e-commerce application.
Two tables:
- products: Public data (name, price) — what the API exposes
- users: Sensitive data (email, password_hash) — should be hidden
"""
conn = sqlite3.connect(db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
price REAL NOT NULL,
stock INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
username TEXT NOT NULL,
email TEXT NOT NULL,
password_hash TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'user'
);
-- Sample product data
INSERT OR IGNORE INTO products (id, name, price, stock) VALUES
(1, 'Wireless Mouse', 29.99, 150),
(2, 'Mechanical Keyboard', 89.99, 75),
(3, 'USB-C Hub', 49.99, 200),
(4, '4K Monitor', 399.99, 30),
(5, 'Webcam HD', 69.99, 100);
-- Sensitive user data (should NEVER be accessible via product queries)
INSERT OR IGNORE INTO users (id, username, email, password_hash, role) VALUES
(1, 'admin', '[email protected]', 'pbkdf2:sha256:fakehash_admin_secret', 'admin'),
(2, 'alice', '[email protected]', 'pbkdf2:sha256:fakehash_alice_pw123', 'user'),
(3, 'bob', '[email protected]', 'pbkdf2:sha256:fakehash_bob_secure', 'user');
""")
conn.commit()
conn.close()
@dataclass
class ProductQuerySet:
"""A product-focused query interface — should only access the products table."""
db_path: str
def get_vulnerable_queryset(self) -> VulnerableQuerySet:
return VulnerableQuerySet(
db_path=self.db_path,
table_name="products",
model_fields=["id", "name", "price", "stock"],
)