README.md
Rendering markdown...
"""
CVE-2026-26198 — Test Suite
Tests that verify:
1. The vulnerability exists in the unpatched code
2. The fix blocks all injection attempts
3. Normal queries still work correctly
4. Edge cases are handled (empty fields, special names, prefixed columns)
Run with: python -m pytest test_vulnerability.py -v
"""
import os
import sqlite3
import tempfile
import pytest
from vulnerable_app import VulnerableQuerySet, create_demo_database
from patched_app import PatchedQuerySet, ColumnValidationError
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def db_path():
"""Create a temporary database for each test."""
fd, path = tempfile.mkstemp(suffix=".db")
os.close(fd)
create_demo_database(path)
yield path
os.unlink(path)
@pytest.fixture
def vulnerable_qs(db_path):
return VulnerableQuerySet(
db_path=db_path,
table_name="products",
model_fields=["id", "name", "price", "stock"],
)
@pytest.fixture
def patched_qs(db_path):
return PatchedQuerySet(
db_path=db_path,
table_name="products",
model_fields=["id", "name", "price", "stock"],
)
# ---------------------------------------------------------------------------
# Part 1: Prove the vulnerability exists
# ---------------------------------------------------------------------------
class TestVulnerabilityExists:
"""Demonstrate that unpatched min()/max() allow SQL injection."""
def test_max_leaks_user_emails(self, vulnerable_qs):
"""Injecting a subquery into max() returns data from the users table."""
payload = "(SELECT group_concat(email) FROM users)"
result = vulnerable_qs.max(payload)
# The attack succeeds: we get user emails from a product query
assert result is not None
assert "[email protected]" in result
assert "[email protected]" in result
def test_min_leaks_password_hash(self, vulnerable_qs):
"""Injecting into min() extracts sensitive credential data."""
payload = "(SELECT password_hash FROM users WHERE role='admin')"
result = vulnerable_qs.min(payload)
assert result is not None
assert "pbkdf2" in result # The password hash was extracted
def test_max_leaks_user_roles(self, vulnerable_qs):
"""Full user roster with privilege levels can be extracted."""
payload = "(SELECT group_concat(username || ':' || role) FROM users)"
result = vulnerable_qs.max(payload)
assert "admin:admin" in result
assert "alice:user" in result
def test_sum_is_already_protected(self, vulnerable_qs):
"""sum() rejects unknown columns — showing the inconsistency."""
with pytest.raises(ValueError, match="not a valid field"):
vulnerable_qs.sum("(SELECT 1 FROM users)")
# ---------------------------------------------------------------------------
# Part 2: Prove the fix blocks injection
# ---------------------------------------------------------------------------
class TestFixBlocksInjection:
"""Verify that patched min()/max() reject all injection attempts."""
def test_max_rejects_subquery(self, patched_qs):
"""The classic attack vector: subquery injection."""
with pytest.raises(ColumnValidationError, match="Invalid column name"):
patched_qs.max("(SELECT email FROM users)")
def test_min_rejects_subquery(self, patched_qs):
with pytest.raises(ColumnValidationError, match="Invalid column name"):
patched_qs.min("(SELECT password_hash FROM users)")
def test_rejects_union_injection(self, patched_qs):
"""UNION-based injection attempt."""
with pytest.raises(ColumnValidationError):
patched_qs.max("price) UNION SELECT password_hash FROM users--")
def test_rejects_semicolon_injection(self, patched_qs):
"""Statement termination attack."""
with pytest.raises(ColumnValidationError):
patched_qs.max("price; DROP TABLE users;--")
def test_rejects_comment_injection(self, patched_qs):
"""SQL comment-based injection."""
with pytest.raises(ColumnValidationError):
patched_qs.max("price/**/OR/**/1=1")
def test_rejects_nonexistent_field(self, patched_qs):
"""Even normal-looking but nonexistent fields are rejected."""
with pytest.raises(ColumnValidationError, match="Unknown column"):
patched_qs.max("nonexistent_field")
def test_rejects_empty_string(self, patched_qs):
with pytest.raises(ColumnValidationError):
patched_qs.max("")
def test_rejects_spaces_in_column(self, patched_qs):
with pytest.raises(ColumnValidationError):
patched_qs.max("price OR 1=1")
def test_all_aggregate_methods_protected(self, patched_qs):
"""Every aggregate method should validate — not just min/max."""
payload = "(SELECT 1 FROM users)"
for method_name in ["min", "max", "sum", "avg"]:
method = getattr(patched_qs, method_name)
with pytest.raises(ColumnValidationError):
method(payload)
# ---------------------------------------------------------------------------
# Part 3: Normal operations still work
# ---------------------------------------------------------------------------
class TestNormalOperations:
"""Ensure the fix doesn't break legitimate queries."""
def test_max_price(self, patched_qs):
result = patched_qs.max("price")
assert result == 399.99 # 4K Monitor
def test_min_price(self, patched_qs):
result = patched_qs.min("price")
assert result == 29.99 # Wireless Mouse
def test_sum_stock(self, patched_qs):
result = patched_qs.sum("stock")
assert result == 555 # 150 + 75 + 200 + 30 + 100
def test_avg_price(self, patched_qs):
result = patched_qs.avg("price")
assert round(result, 2) == 127.99 # Average of all prices
def test_max_stock(self, patched_qs):
result = patched_qs.max("stock")
assert result == 200 # USB-C Hub
def test_min_stock(self, patched_qs):
result = patched_qs.min("stock")
assert result == 30 # 4K Monitor
# ---------------------------------------------------------------------------
# Part 4: Edge cases
# ---------------------------------------------------------------------------
class TestEdgeCases:
"""Boundary conditions and tricky inputs."""
def test_column_with_ormar_prefix(self, db_path):
"""Ormar uses table__field notation for related fields."""
qs = PatchedQuerySet(
db_path=db_path,
table_name="products",
model_fields=["id", "name", "price", "stock"],
)
# The prefix should be stripped, and 'price' should be valid
result = qs.max("products__price")
assert result == 399.99
def test_sql_keyword_as_injection(self, patched_qs):
"""SQL keywords that aren't valid column names should be rejected."""
for keyword in ["SELECT", "DROP", "DELETE", "INSERT", "UPDATE"]:
with pytest.raises(ColumnValidationError, match="Unknown column"):
patched_qs.max(keyword)
def test_numeric_string_rejected(self, patched_qs):
"""Pure numeric input should fail the identifier regex."""
with pytest.raises(ColumnValidationError):
patched_qs.max("12345")
def test_special_chars_rejected(self, patched_qs):
"""Special characters that could break SQL should be rejected."""
for char in ["'", '"', ";", "(", ")", "-", "/", "*", "\\", "\n"]:
with pytest.raises(ColumnValidationError):
patched_qs.max(f"price{char}")